You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/02/22 21:31:52 UTC

svn commit: r1292494 - in /hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/test/java/org/apache/hadoop/hdfs/server/name...

Author: todd
Date: Wed Feb 22 20:31:52 2012
New Revision: 1292494

URL: http://svn.apache.org/viewvc?rev=1292494&view=rev
Log:
HDFS-2929. Stress test and fixes for block synchronization. Contributed by Todd Lipcon.

Added:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java
Modified:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1292494&r1=1292493&r2=1292494&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt Wed Feb 22 20:31:52 2012
@@ -220,3 +220,5 @@ HDFS-2586. Add protobuf service and impl
 HDFS-2952. NN should not start with upgrade option or with a pending an unfinalized upgrade. (atm)
 
 HDFS-2974. MiniDFSCluster does not delete standby NN name dirs during format. (atm)
+
+HDFS-2929. Stress test and fixes for block synchronization (todd)

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1292494&r1=1292493&r2=1292494&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Feb 22 20:31:52 2012
@@ -1804,6 +1804,13 @@ public class DataNode extends Configured
                                           long newLength) throws IOException {
     ReplicaInfo r = data.updateReplicaUnderRecovery(oldBlock,
         recoveryId, newLength);
+    // Notify the namenode of the updated block info. This is important
+    // for HA, since otherwise the standby node may lose track of the
+    // block locations until the next block report.
+    ExtendedBlock newBlock = new ExtendedBlock(oldBlock);
+    newBlock.setGenerationStamp(recoveryId);
+    newBlock.setNumBytes(newLength);
+    notifyNamenodeReceivedBlock(newBlock, "");
     return new ExtendedBlock(oldBlock.getBlockPoolId(), r);
   }
 
@@ -1930,7 +1937,6 @@ public class DataNode extends Configured
     // or their replicas have 0 length.
     // The block can be deleted.
     if (syncList.isEmpty()) {
-      // TODO: how does this work in HA??
       nn.commitBlockSynchronization(block, recoveryId, 0,
           true, true, DatanodeID.EMPTY_ARRAY);
       return;

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1292494&r1=1292493&r2=1292494&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Feb 22 20:31:52 2012
@@ -2826,12 +2826,9 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (haContext.getState().equals(NameNode.STANDBY_STATE)) {
-        // TODO(HA) we'll never get here, since we check for WRITE operation above!
-        // Need to implement tests, etc, for this - block recovery spanning
-        // failover.
-      }
-
+      // If a DN tries to commit to the standby, the recovery will
+      // fail, and the next retry will succeed on the new NN.
+  
       if (isInSafeMode()) {
         throw new SafeModeException(
           "Cannot commitBlockSynchronization while in safe mode",

Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java?rev=1292494&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java (added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java Wed Feb 22 20:31:52 2012
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+
+/**
+ * Utility class to start an HA cluster, and then start threads
+ * to periodically fail back and forth, accelerate block deletion
+ * processing, etc.
+ */
+public class HAStressTestHarness {
+  Configuration conf;
+  private MiniDFSCluster cluster;
+  static final int BLOCK_SIZE = 1024;
+  TestContext testCtx = new TestContext();
+  
+  public HAStressTestHarness() {
+    conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+    // Increase max streams so that we re-replicate quickly.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000);
+  }
+
+  /**
+   * Start and return the MiniDFSCluster.
+   */
+  public MiniDFSCluster startCluster() throws IOException {
+    cluster = new MiniDFSCluster.Builder(conf)
+      .nnTopology(MiniDFSNNTopology.simpleHATopology())
+      .numDataNodes(3)
+      .build();
+    return cluster;
+  }
+
+  /**
+   * Return a filesystem with client-failover configured for the
+   * cluster.
+   */
+  public FileSystem getFailoverFs() throws IOException, URISyntaxException {
+    return HATestUtil.configureFailoverFs(cluster, conf);
+  }
+
+  /**
+   * Add a thread which periodically triggers deletion reports,
+   * heartbeats, and NN-side block work.
+   * @param interval millisecond period on which to run
+   */
+  public void addReplicationTriggerThread(final int interval) {
+
+    testCtx.addThread(new RepeatingTestThread(testCtx) {
+      
+      @Override
+      public void doAnAction() throws Exception {
+        for (DataNode dn : cluster.getDataNodes()) {
+          DataNodeAdapter.triggerDeletionReport(dn);
+          DataNodeAdapter.triggerHeartbeat(dn);
+        }
+        for (int i = 0; i < 2; i++) {
+          NameNode nn = cluster.getNameNode(i);
+          BlockManagerTestUtil.computeAllPendingWork(
+              nn.getNamesystem().getBlockManager());
+        }
+        Thread.sleep(interval);
+      }
+    });
+  }
+
+  /**
+   * Add a thread which periodically triggers failover back and forth between
+   * the two namenodes.
+   */
+  public void addFailoverThread(final int msBetweenFailovers) {
+    testCtx.addThread(new RepeatingTestThread(testCtx) {
+      
+      @Override
+      public void doAnAction() throws Exception {
+        System.err.println("==============================\n" +
+            "Failing over from 0->1\n" +
+            "==================================");
+        cluster.transitionToStandby(0);
+        cluster.transitionToActive(1);
+        
+        Thread.sleep(msBetweenFailovers);
+        System.err.println("==============================\n" +
+            "Failing over from 1->0\n" +
+            "==================================");
+
+        cluster.transitionToStandby(1);
+        cluster.transitionToActive(0);
+        Thread.sleep(msBetweenFailovers);
+      }
+    });
+  }
+
+  /**
+   * Start all of the threads which have been added.
+   */
+  public void startThreads() {
+    this.testCtx.startThreads();
+  }
+
+  /**
+   * Stop threads, propagating any exceptions that might have been thrown.
+   */
+  public void stopThreads() throws Exception {
+    this.testCtx.stop();
+  }
+
+  /**
+   * Shutdown the minicluster, as well as any of the running threads.
+   */
+  public void shutdown() throws Exception {
+    this.testCtx.stop();
+    if (cluster != null) {
+      this.cluster.shutdown();
+      cluster = null;
+    }
+  }
+}

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java?rev=1292494&r1=1292493&r2=1292494&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java Wed Feb 22 20:31:52 2012
@@ -22,19 +22,13 @@ import java.util.concurrent.TimeoutExcep
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
@@ -111,28 +105,16 @@ public class TestDNFencingWithReplicatio
   
   @Test
   public void testFencingStress() throws Exception {
-    Configuration conf = new Configuration();
-    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
-    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
-    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
-    // Increase max streams so that we re-replicate quickly.
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000);
+    HAStressTestHarness harness = new HAStressTestHarness();
+    harness.conf.setInt(
+        DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
 
-    
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-      .nnTopology(MiniDFSNNTopology.simpleHATopology())
-      .numDataNodes(3)
-      .build();
+    final MiniDFSCluster cluster = harness.startCluster();
     try {
       cluster.waitActive();
       cluster.transitionToActive(0);
       
-      final NameNode nn1 = cluster.getNameNode(0);
-      final NameNode nn2 = cluster.getNameNode(1);
-      
-      FileSystem fs = HATestUtil.configureFailoverFs(
-          cluster, conf);
+      FileSystem fs = harness.getFailoverFs();
       TestContext togglers = new TestContext();
       for (int i = 0; i < NUM_THREADS; i++) {
         Path p = new Path("/test-" + i);
@@ -143,51 +125,14 @@ public class TestDNFencingWithReplicatio
       // Start a separate thread which will make sure that replication
       // happens quickly by triggering deletion reports and replication
       // work calculation frequently.
-      TestContext triggerCtx = new TestContext();
-      triggerCtx.addThread(new RepeatingTestThread(triggerCtx) {
-        
-        @Override
-        public void doAnAction() throws Exception {
-          for (DataNode dn : cluster.getDataNodes()) {
-            DataNodeAdapter.triggerDeletionReport(dn);
-            DataNodeAdapter.triggerHeartbeat(dn);
-          }
-          for (int i = 0; i < 2; i++) {
-            NameNode nn = cluster.getNameNode(i);
-            BlockManagerTestUtil.computeAllPendingWork(
-                nn.getNamesystem().getBlockManager());
-          }
-          Thread.sleep(500);
-        }
-      });
-      
-      triggerCtx.addThread(new RepeatingTestThread(triggerCtx) {
-        
-        @Override
-        public void doAnAction() throws Exception {
-          System.err.println("==============================\n" +
-              "Failing over from 0->1\n" +
-              "==================================");
-          cluster.transitionToStandby(0);
-          cluster.transitionToActive(1);
-          
-          Thread.sleep(5000);
-          System.err.println("==============================\n" +
-              "Failing over from 1->0\n" +
-              "==================================");
-
-          cluster.transitionToStandby(1);
-          cluster.transitionToActive(0);
-          Thread.sleep(5000);
-        }
-      });
-      
-      triggerCtx.startThreads();
+      harness.addReplicationTriggerThread(500);
+      harness.addFailoverThread(5000);
+      harness.startThreads();
       togglers.startThreads();
       
       togglers.waitFor(RUNTIME);
       togglers.stop();
-      triggerCtx.stop();
+      harness.stopThreads();
 
       // CHeck that the files can be read without throwing
       for (int i = 0; i < NUM_THREADS; i++) {
@@ -196,7 +141,7 @@ public class TestDNFencingWithReplicatio
       }
     } finally {
       System.err.println("===========================\n\n\n\n");
-      cluster.shutdown();
+      harness.shutdown();
     }
 
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java?rev=1292494&r1=1292493&r2=1292494&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java Wed Feb 22 20:31:52 2012
@@ -18,9 +18,10 @@
 package org.apache.hadoop.hdfs.server.namenode.ha;
 
 import static org.junit.Assert.*;
-import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,19 +32,35 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.AppendTestUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import org.apache.hadoop.hdfs.TestDFSClientFailover;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
+import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+
 import org.apache.log4j.Level;
-import org.junit.Ignore;
+
 import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.base.Supplier;
 
 /**
  * Test cases regarding pipeline recovery during NN failover.
@@ -64,6 +81,9 @@ public class TestPipelinesFailover {
     new Path("/test-file");
   private static final int BLOCK_SIZE = 4096;
   private static final int BLOCK_AND_A_HALF = BLOCK_SIZE * 3 / 2;
+  
+  private static final int STRESS_NUM_THREADS = 25;
+  private static final int STRESS_RUNTIME = 40000;
 
   /**
    * Tests continuing a write pipeline over a failover.
@@ -216,22 +236,271 @@ public class TestPipelinesFailover {
       cluster.transitionToActive(1);
       
       assertTrue(fs.exists(TEST_PATH));
-      
-      FileSystem fsOtherUser = UserGroupInformation.createUserForTesting(
-          "otheruser", new String[] { "othergroup"})
-          .doAs(new PrivilegedExceptionAction<FileSystem>() {
-            @Override
-            public FileSystem run() throws Exception {
-              return HATestUtil.configureFailoverFs(cluster, conf);
-            }
-          });
-      ((DistributedFileSystem)fsOtherUser).recoverLease(TEST_PATH);
+
+      FileSystem fsOtherUser = createFsAsOtherUser(cluster, conf);
+      loopRecoverLease(fsOtherUser, TEST_PATH);
       
       AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF);
+      
+      // Fail back to ensure that the block locations weren't lost on the
+      // original node.
+      cluster.transitionToStandby(1);
+      cluster.transitionToActive(0);
+      AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF);      
+    } finally {
+      IOUtils.closeStream(stm);
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test the scenario where the NN fails over after issuing a block
+   * synchronization request, but before it is committed. The
+   * DN running the recovery should then fail to commit the synchronization
+   * and a later retry will succeed.
+   */
+  @Test(timeout=30000)
+  public void testFailoverRightBeforeCommitSynchronization() throws Exception {
+    final Configuration conf = new Configuration();
+    // Disable permissions so that another user can recover the lease.
+    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    
+    FSDataOutputStream stm = null;
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .nnTopology(MiniDFSNNTopology.simpleHATopology())
+      .numDataNodes(3)
+      .build();
+    try {
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+      Thread.sleep(500);
+
+      LOG.info("Starting with NN 0 active");
+      FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
+      stm = fs.create(TEST_PATH);
+      
+      // write a half block
+      AppendTestUtil.write(stm, 0, BLOCK_SIZE / 2);
+      stm.hflush();
+      
+      // Look into the block manager on the active node for the block
+      // under construction.
+      
+      NameNode nn0 = cluster.getNameNode(0);
+      ExtendedBlock blk = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+      DatanodeDescriptor expectedPrimary = getExpectedPrimaryNode(nn0, blk);
+      LOG.info("Expecting block recovery to be triggered on DN " +
+          expectedPrimary);
+      
+      // Find the corresponding DN daemon, and spy on its connection to the
+      // active.
+      DataNode primaryDN = cluster.getDataNode(expectedPrimary.getIpcPort());
+      DatanodeProtocolClientSideTranslatorPB nnSpy =
+          DataNodeAdapter.spyOnBposToNN(primaryDN, nn0);
+      
+      // Delay the commitBlockSynchronization call
+      DelayAnswer delayer = new DelayAnswer(LOG);
+      Mockito.doAnswer(delayer).when(nnSpy).commitBlockSynchronization(
+          Mockito.eq(blk),
+          Mockito.anyInt(), // new genstamp
+          Mockito.anyLong(), // new length
+          Mockito.eq(true), // close file
+          Mockito.eq(false), // delete block
+          (DatanodeID[]) Mockito.anyObject()); // new targets
+
+      DistributedFileSystem fsOtherUser = createFsAsOtherUser(cluster, conf);
+      assertFalse(fsOtherUser.recoverLease(TEST_PATH));
+      
+      LOG.info("Waiting for commitBlockSynchronization call from primary");
+      delayer.waitForCall();
+
+      LOG.info("Failing over to NN 1");
+      
+      cluster.transitionToStandby(0);
+      cluster.transitionToActive(1);
+      
+      // Let the commitBlockSynchronization call go through, and check that
+      // it failed with the correct exception.
+      delayer.proceed();
+      delayer.waitForResult();
+      Throwable t = delayer.getThrown();
+      if (t == null) {
+        fail("commitBlockSynchronization call did not fail on standby");
+      }
+      GenericTestUtils.assertExceptionContains(
+          "Operation category WRITE is not supported",
+          t);
+      
+      // Now, if we try again to recover the block, it should succeed on the new
+      // active.
+      loopRecoverLease(fsOtherUser, TEST_PATH);
+      
+      AppendTestUtil.check(fs, TEST_PATH, BLOCK_SIZE/2);
     } finally {
       IOUtils.closeStream(stm);
       cluster.shutdown();
     }
   }
+  
+  /**
+   * Stress test for pipeline/lease recovery. Starts a number of
+   * threads, each of which creates a file and has another client
+   * break the lease. While these threads run, failover proceeds
+   * back and forth between two namenodes.
+   */
+  @Test(timeout=STRESS_RUNTIME*3)
+  public void testPipelineRecoveryStress() throws Exception {
+    HAStressTestHarness harness = new HAStressTestHarness();
+    // Disable permissions so that another user can recover the lease.
+    harness.conf.setBoolean(
+        DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
+
+    final MiniDFSCluster cluster = harness.startCluster();
+    try {
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+      
+      FileSystem fs = harness.getFailoverFs();
+      DistributedFileSystem fsAsOtherUser = createFsAsOtherUser(
+          cluster, harness.conf);
+      
+      TestContext testers = new TestContext();
+      for (int i = 0; i < STRESS_NUM_THREADS; i++) {
+        Path p = new Path("/test-" + i);
+        testers.addThread(new PipelineTestThread(
+            testers, fs, fsAsOtherUser, p));
+      }
+      
+      // Start a separate thread which will make sure that replication
+      // happens quickly by triggering deletion reports and replication
+      // work calculation frequently.
+      harness.addReplicationTriggerThread(500);
+      harness.addFailoverThread(5000);
+      harness.startThreads();
+      testers.startThreads();
+      
+      testers.waitFor(STRESS_RUNTIME);
+      testers.stop();
+      harness.stopThreads();
+    } finally {
+      System.err.println("===========================\n\n\n\n");
+      harness.shutdown();
+    }
+  }
+
+  /**
+   * Test thread which creates a file, has another fake user recover
+   * the lease on the file, and then ensures that the file's contents
+   * are properly readable. If any of these steps fails, propagates
+   * an exception back to the test context, causing the test case
+   * to fail.
+   */
+  private static class PipelineTestThread extends RepeatingTestThread {
+    private final FileSystem fs;
+    private final FileSystem fsOtherUser;
+    private final Path path;
+    
+
+    public PipelineTestThread(TestContext ctx,
+        FileSystem fs, FileSystem fsOtherUser, Path p) {
+      super(ctx);
+      this.fs = fs;
+      this.fsOtherUser = fsOtherUser;
+      this.path = p;
+    }
+
+    @Override
+    public void doAnAction() throws Exception {
+      FSDataOutputStream stm = fs.create(path, true);
+      try {
+        AppendTestUtil.write(stm, 0, 100);
+        stm.hflush();
+        loopRecoverLease(fsOtherUser, path);
+        AppendTestUtil.check(fs, path, 100);
+      } finally {
+        try {
+          stm.close();
+        } catch (IOException e) {
+          // should expect this since we lost the lease
+        }
+      }
+    }
+    
+    @Override
+    public String toString() {
+      return "Pipeline test thread for " + path;
+    }
+  }
+
 
+
+  /**
+   * @return the node which is expected to run the recovery of the
+   * given block, which is known to be under construction inside the
+   * given NameNOde.
+   */
+  private DatanodeDescriptor getExpectedPrimaryNode(NameNode nn,
+      ExtendedBlock blk) {
+    BlockManager bm0 = nn.getNamesystem().getBlockManager();
+    BlockInfo storedBlock = bm0.getStoredBlock(blk.getLocalBlock());
+    assertTrue("Block " + blk + " should be under construction, " +
+        "got: " + storedBlock,
+        storedBlock instanceof BlockInfoUnderConstruction);
+    BlockInfoUnderConstruction ucBlock =
+      (BlockInfoUnderConstruction)storedBlock;
+    // We expect that the first indexed replica will be the one
+    // to be in charge of the synchronization / recovery protocol.
+    DatanodeDescriptor expectedPrimary = ucBlock.getExpectedLocations()[0];
+    return expectedPrimary;
+  }
+
+  private DistributedFileSystem createFsAsOtherUser(
+      final MiniDFSCluster cluster, final Configuration conf)
+      throws IOException, InterruptedException {
+    return (DistributedFileSystem) UserGroupInformation.createUserForTesting(
+        "otheruser", new String[] { "othergroup"})
+    .doAs(new PrivilegedExceptionAction<FileSystem>() {
+      @Override
+      public FileSystem run() throws Exception {
+        return HATestUtil.configureFailoverFs(
+            cluster, conf);
+      }
+    });
+  }
+  
+  /**
+   * Try to cover the lease on the given file for up to 30
+   * seconds.
+   * @param fsOtherUser the filesystem to use for the recoverLease call
+   * @param testPath the path on which to run lease recovery
+   * @throws TimeoutException if lease recover does not succeed within 30
+   * seconds
+   * @throws InterruptedException if the thread is interrupted
+   */
+  private static void loopRecoverLease(
+      final FileSystem fsOtherUser, final Path testPath)
+      throws TimeoutException, InterruptedException {
+    try {
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          boolean success;
+          try {
+            success = ((DistributedFileSystem)fsOtherUser)
+              .recoverLease(testPath);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+          if (!success) {
+            LOG.info("Waiting to recover lease successfully");
+          }
+          return success;
+        }
+      }, 1000, 30000);
+    } catch (TimeoutException e) {
+      throw new TimeoutException("Timed out recovering lease for " +
+          testPath);
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java?rev=1292494&r1=1292493&r2=1292494&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java Wed Feb 22 20:31:52 2012
@@ -110,7 +110,11 @@ public abstract class GenericTestUtils {
     
     private final CountDownLatch fireLatch = new CountDownLatch(1);
     private final CountDownLatch waitLatch = new CountDownLatch(1);
-  
+    private final CountDownLatch resultLatch = new CountDownLatch(1);
+    
+    // Result fields set after proceed() is called.
+    private volatile Throwable thrown;
+    private volatile Object returnValue;
     
     public DelayAnswer(Log log) {
       this.LOG = log;
@@ -145,7 +149,40 @@ public abstract class GenericTestUtils {
     }
 
     protected Object passThrough(InvocationOnMock invocation) throws Throwable {
-      return invocation.callRealMethod();
+      try {
+        Object ret = invocation.callRealMethod();
+        returnValue = ret;
+        return ret;
+      } catch (Throwable t) {
+        thrown = t;
+        throw t;
+      } finally {
+        resultLatch.countDown();
+      }
+    }
+    
+    /**
+     * After calling proceed(), this will wait until the call has
+     * completed and a result has been returned to the caller.
+     */
+    public void waitForResult() throws InterruptedException {
+      resultLatch.await();
+    }
+    
+    /**
+     * After the call has gone through, return any exception that
+     * was thrown, or null if no exception was thrown.
+     */
+    public Throwable getThrown() {
+      return thrown;
+    }
+    
+    /**
+     * After the call has gone through, return the call's return value,
+     * or null in case it was void or an exception was thrown.
+     */
+    public Object getReturnValue() {
+      return returnValue;
     }
   }