You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/02/22 21:31:52 UTC
svn commit: r1292494 - in
/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/datanode/
src/main/java/org/apache/hadoop/hdfs/server/namenode/
src/test/java/org/apache/hadoop/hdfs/server/name...
Author: todd
Date: Wed Feb 22 20:31:52 2012
New Revision: 1292494
URL: http://svn.apache.org/viewvc?rev=1292494&view=rev
Log:
HDFS-2929. Stress test and fixes for block synchronization. Contributed by Todd Lipcon.
Added:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1292494&r1=1292493&r2=1292494&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt Wed Feb 22 20:31:52 2012
@@ -220,3 +220,5 @@ HDFS-2586. Add protobuf service and impl
HDFS-2952. NN should not start with upgrade option or with a pending an unfinalized upgrade. (atm)
HDFS-2974. MiniDFSCluster does not delete standby NN name dirs during format. (atm)
+
+HDFS-2929. Stress test and fixes for block synchronization (todd)
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1292494&r1=1292493&r2=1292494&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Feb 22 20:31:52 2012
@@ -1804,6 +1804,13 @@ public class DataNode extends Configured
long newLength) throws IOException {
ReplicaInfo r = data.updateReplicaUnderRecovery(oldBlock,
recoveryId, newLength);
+ // Notify the namenode of the updated block info. This is important
+ // for HA, since otherwise the standby node may lose track of the
+ // block locations until the next block report.
+ ExtendedBlock newBlock = new ExtendedBlock(oldBlock);
+ newBlock.setGenerationStamp(recoveryId);
+ newBlock.setNumBytes(newLength);
+ notifyNamenodeReceivedBlock(newBlock, "");
return new ExtendedBlock(oldBlock.getBlockPoolId(), r);
}
@@ -1930,7 +1937,6 @@ public class DataNode extends Configured
// or their replicas have 0 length.
// The block can be deleted.
if (syncList.isEmpty()) {
- // TODO: how does this work in HA??
nn.commitBlockSynchronization(block, recoveryId, 0,
true, true, DatanodeID.EMPTY_ARRAY);
return;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1292494&r1=1292493&r2=1292494&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Feb 22 20:31:52 2012
@@ -2826,12 +2826,9 @@ public class FSNamesystem implements Nam
writeLock();
try {
checkOperation(OperationCategory.WRITE);
- if (haContext.getState().equals(NameNode.STANDBY_STATE)) {
- // TODO(HA) we'll never get here, since we check for WRITE operation above!
- // Need to implement tests, etc, for this - block recovery spanning
- // failover.
- }
-
+ // If a DN tries to commit to the standby, the recovery will
+ // fail, and the next retry will succeed on the new NN.
+
if (isInSafeMode()) {
throw new SafeModeException(
"Cannot commitBlockSynchronization while in safe mode",
Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java?rev=1292494&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java (added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java Wed Feb 22 20:31:52 2012
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+
+/**
+ * Utility class to start an HA cluster, and then start threads
+ * to periodically fail back and forth, accelerate block deletion
+ * processing, etc.
+ */
+public class HAStressTestHarness {
+ Configuration conf;
+ private MiniDFSCluster cluster;
+ static final int BLOCK_SIZE = 1024;
+ TestContext testCtx = new TestContext();
+
+ public HAStressTestHarness() {
+ conf = new Configuration();
+ conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+ // Increase max streams so that we re-replicate quickly.
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000);
+ }
+
+ /**
+ * Start and return the MiniDFSCluster.
+ */
+ public MiniDFSCluster startCluster() throws IOException {
+ cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology())
+ .numDataNodes(3)
+ .build();
+ return cluster;
+ }
+
+ /**
+ * Return a filesystem with client-failover configured for the
+ * cluster.
+ */
+ public FileSystem getFailoverFs() throws IOException, URISyntaxException {
+ return HATestUtil.configureFailoverFs(cluster, conf);
+ }
+
+ /**
+ * Add a thread which periodically triggers deletion reports,
+ * heartbeats, and NN-side block work.
+ * @param interval millisecond period on which to run
+ */
+ public void addReplicationTriggerThread(final int interval) {
+
+ testCtx.addThread(new RepeatingTestThread(testCtx) {
+
+ @Override
+ public void doAnAction() throws Exception {
+ for (DataNode dn : cluster.getDataNodes()) {
+ DataNodeAdapter.triggerDeletionReport(dn);
+ DataNodeAdapter.triggerHeartbeat(dn);
+ }
+ for (int i = 0; i < 2; i++) {
+ NameNode nn = cluster.getNameNode(i);
+ BlockManagerTestUtil.computeAllPendingWork(
+ nn.getNamesystem().getBlockManager());
+ }
+ Thread.sleep(interval);
+ }
+ });
+ }
+
+ /**
+ * Add a thread which periodically triggers failover back and forth between
+ * the two namenodes.
+ */
+ public void addFailoverThread(final int msBetweenFailovers) {
+ testCtx.addThread(new RepeatingTestThread(testCtx) {
+
+ @Override
+ public void doAnAction() throws Exception {
+ System.err.println("==============================\n" +
+ "Failing over from 0->1\n" +
+ "==================================");
+ cluster.transitionToStandby(0);
+ cluster.transitionToActive(1);
+
+ Thread.sleep(msBetweenFailovers);
+ System.err.println("==============================\n" +
+ "Failing over from 1->0\n" +
+ "==================================");
+
+ cluster.transitionToStandby(1);
+ cluster.transitionToActive(0);
+ Thread.sleep(msBetweenFailovers);
+ }
+ });
+ }
+
+ /**
+ * Start all of the threads which have been added.
+ */
+ public void startThreads() {
+ this.testCtx.startThreads();
+ }
+
+ /**
+ * Stop threads, propagating any exceptions that might have been thrown.
+ */
+ public void stopThreads() throws Exception {
+ this.testCtx.stop();
+ }
+
+ /**
+ * Shutdown the minicluster, as well as any of the running threads.
+ */
+ public void shutdown() throws Exception {
+ this.testCtx.stop();
+ if (cluster != null) {
+ this.cluster.shutdown();
+ cluster = null;
+ }
+ }
+}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java?rev=1292494&r1=1292493&r2=1292494&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java Wed Feb 22 20:31:52 2012
@@ -22,19 +22,13 @@ import java.util.concurrent.TimeoutExcep
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
@@ -111,28 +105,16 @@ public class TestDNFencingWithReplicatio
@Test
public void testFencingStress() throws Exception {
- Configuration conf = new Configuration();
- conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
- conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
- conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
- conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
- // Increase max streams so that we re-replicate quickly.
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000);
+ HAStressTestHarness harness = new HAStressTestHarness();
+ harness.conf.setInt(
+ DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
-
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .nnTopology(MiniDFSNNTopology.simpleHATopology())
- .numDataNodes(3)
- .build();
+ final MiniDFSCluster cluster = harness.startCluster();
try {
cluster.waitActive();
cluster.transitionToActive(0);
- final NameNode nn1 = cluster.getNameNode(0);
- final NameNode nn2 = cluster.getNameNode(1);
-
- FileSystem fs = HATestUtil.configureFailoverFs(
- cluster, conf);
+ FileSystem fs = harness.getFailoverFs();
TestContext togglers = new TestContext();
for (int i = 0; i < NUM_THREADS; i++) {
Path p = new Path("/test-" + i);
@@ -143,51 +125,14 @@ public class TestDNFencingWithReplicatio
// Start a separate thread which will make sure that replication
// happens quickly by triggering deletion reports and replication
// work calculation frequently.
- TestContext triggerCtx = new TestContext();
- triggerCtx.addThread(new RepeatingTestThread(triggerCtx) {
-
- @Override
- public void doAnAction() throws Exception {
- for (DataNode dn : cluster.getDataNodes()) {
- DataNodeAdapter.triggerDeletionReport(dn);
- DataNodeAdapter.triggerHeartbeat(dn);
- }
- for (int i = 0; i < 2; i++) {
- NameNode nn = cluster.getNameNode(i);
- BlockManagerTestUtil.computeAllPendingWork(
- nn.getNamesystem().getBlockManager());
- }
- Thread.sleep(500);
- }
- });
-
- triggerCtx.addThread(new RepeatingTestThread(triggerCtx) {
-
- @Override
- public void doAnAction() throws Exception {
- System.err.println("==============================\n" +
- "Failing over from 0->1\n" +
- "==================================");
- cluster.transitionToStandby(0);
- cluster.transitionToActive(1);
-
- Thread.sleep(5000);
- System.err.println("==============================\n" +
- "Failing over from 1->0\n" +
- "==================================");
-
- cluster.transitionToStandby(1);
- cluster.transitionToActive(0);
- Thread.sleep(5000);
- }
- });
-
- triggerCtx.startThreads();
+ harness.addReplicationTriggerThread(500);
+ harness.addFailoverThread(5000);
+ harness.startThreads();
togglers.startThreads();
togglers.waitFor(RUNTIME);
togglers.stop();
- triggerCtx.stop();
+ harness.stopThreads();
// CHeck that the files can be read without throwing
for (int i = 0; i < NUM_THREADS; i++) {
@@ -196,7 +141,7 @@ public class TestDNFencingWithReplicatio
}
} finally {
System.err.println("===========================\n\n\n\n");
- cluster.shutdown();
+ harness.shutdown();
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java?rev=1292494&r1=1292493&r2=1292494&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java Wed Feb 22 20:31:52 2012
@@ -18,9 +18,10 @@
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.*;
-import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,19 +32,35 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import org.apache.hadoop.hdfs.TestDFSClientFailover;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
+import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+
import org.apache.log4j.Level;
-import org.junit.Ignore;
+
import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.base.Supplier;
/**
* Test cases regarding pipeline recovery during NN failover.
@@ -64,6 +81,9 @@ public class TestPipelinesFailover {
new Path("/test-file");
private static final int BLOCK_SIZE = 4096;
private static final int BLOCK_AND_A_HALF = BLOCK_SIZE * 3 / 2;
+
+ private static final int STRESS_NUM_THREADS = 25;
+ private static final int STRESS_RUNTIME = 40000;
/**
* Tests continuing a write pipeline over a failover.
@@ -216,22 +236,271 @@ public class TestPipelinesFailover {
cluster.transitionToActive(1);
assertTrue(fs.exists(TEST_PATH));
-
- FileSystem fsOtherUser = UserGroupInformation.createUserForTesting(
- "otheruser", new String[] { "othergroup"})
- .doAs(new PrivilegedExceptionAction<FileSystem>() {
- @Override
- public FileSystem run() throws Exception {
- return HATestUtil.configureFailoverFs(cluster, conf);
- }
- });
- ((DistributedFileSystem)fsOtherUser).recoverLease(TEST_PATH);
+
+ FileSystem fsOtherUser = createFsAsOtherUser(cluster, conf);
+ loopRecoverLease(fsOtherUser, TEST_PATH);
AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF);
+
+ // Fail back to ensure that the block locations weren't lost on the
+ // original node.
+ cluster.transitionToStandby(1);
+ cluster.transitionToActive(0);
+ AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF);
+ } finally {
+ IOUtils.closeStream(stm);
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test the scenario where the NN fails over after issuing a block
+ * synchronization request, but before it is committed. The
+ * DN running the recovery should then fail to commit the synchronization
+ * and a later retry will succeed.
+ */
+ @Test(timeout=30000)
+ public void testFailoverRightBeforeCommitSynchronization() throws Exception {
+ final Configuration conf = new Configuration();
+ // Disable permissions so that another user can recover the lease.
+ conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
+ conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+
+ FSDataOutputStream stm = null;
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology())
+ .numDataNodes(3)
+ .build();
+ try {
+ cluster.waitActive();
+ cluster.transitionToActive(0);
+ Thread.sleep(500);
+
+ LOG.info("Starting with NN 0 active");
+ FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
+ stm = fs.create(TEST_PATH);
+
+ // write a half block
+ AppendTestUtil.write(stm, 0, BLOCK_SIZE / 2);
+ stm.hflush();
+
+ // Look into the block manager on the active node for the block
+ // under construction.
+
+ NameNode nn0 = cluster.getNameNode(0);
+ ExtendedBlock blk = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+ DatanodeDescriptor expectedPrimary = getExpectedPrimaryNode(nn0, blk);
+ LOG.info("Expecting block recovery to be triggered on DN " +
+ expectedPrimary);
+
+ // Find the corresponding DN daemon, and spy on its connection to the
+ // active.
+ DataNode primaryDN = cluster.getDataNode(expectedPrimary.getIpcPort());
+ DatanodeProtocolClientSideTranslatorPB nnSpy =
+ DataNodeAdapter.spyOnBposToNN(primaryDN, nn0);
+
+ // Delay the commitBlockSynchronization call
+ DelayAnswer delayer = new DelayAnswer(LOG);
+ Mockito.doAnswer(delayer).when(nnSpy).commitBlockSynchronization(
+ Mockito.eq(blk),
+ Mockito.anyInt(), // new genstamp
+ Mockito.anyLong(), // new length
+ Mockito.eq(true), // close file
+ Mockito.eq(false), // delete block
+ (DatanodeID[]) Mockito.anyObject()); // new targets
+
+ DistributedFileSystem fsOtherUser = createFsAsOtherUser(cluster, conf);
+ assertFalse(fsOtherUser.recoverLease(TEST_PATH));
+
+ LOG.info("Waiting for commitBlockSynchronization call from primary");
+ delayer.waitForCall();
+
+ LOG.info("Failing over to NN 1");
+
+ cluster.transitionToStandby(0);
+ cluster.transitionToActive(1);
+
+ // Let the commitBlockSynchronization call go through, and check that
+ // it failed with the correct exception.
+ delayer.proceed();
+ delayer.waitForResult();
+ Throwable t = delayer.getThrown();
+ if (t == null) {
+ fail("commitBlockSynchronization call did not fail on standby");
+ }
+ GenericTestUtils.assertExceptionContains(
+ "Operation category WRITE is not supported",
+ t);
+
+ // Now, if we try again to recover the block, it should succeed on the new
+ // active.
+ loopRecoverLease(fsOtherUser, TEST_PATH);
+
+ AppendTestUtil.check(fs, TEST_PATH, BLOCK_SIZE/2);
} finally {
IOUtils.closeStream(stm);
cluster.shutdown();
}
}
+
+ /**
+ * Stress test for pipeline/lease recovery. Starts a number of
+ * threads, each of which creates a file and has another client
+ * break the lease. While these threads run, failover proceeds
+ * back and forth between two namenodes.
+ */
+ @Test(timeout=STRESS_RUNTIME*3)
+ public void testPipelineRecoveryStress() throws Exception {
+ HAStressTestHarness harness = new HAStressTestHarness();
+ // Disable permissions so that another user can recover the lease.
+ harness.conf.setBoolean(
+ DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
+
+ final MiniDFSCluster cluster = harness.startCluster();
+ try {
+ cluster.waitActive();
+ cluster.transitionToActive(0);
+
+ FileSystem fs = harness.getFailoverFs();
+ DistributedFileSystem fsAsOtherUser = createFsAsOtherUser(
+ cluster, harness.conf);
+
+ TestContext testers = new TestContext();
+ for (int i = 0; i < STRESS_NUM_THREADS; i++) {
+ Path p = new Path("/test-" + i);
+ testers.addThread(new PipelineTestThread(
+ testers, fs, fsAsOtherUser, p));
+ }
+
+ // Start a separate thread which will make sure that replication
+ // happens quickly by triggering deletion reports and replication
+ // work calculation frequently.
+ harness.addReplicationTriggerThread(500);
+ harness.addFailoverThread(5000);
+ harness.startThreads();
+ testers.startThreads();
+
+ testers.waitFor(STRESS_RUNTIME);
+ testers.stop();
+ harness.stopThreads();
+ } finally {
+ System.err.println("===========================\n\n\n\n");
+ harness.shutdown();
+ }
+ }
+
+ /**
+ * Test thread which creates a file, has another fake user recover
+ * the lease on the file, and then ensures that the file's contents
+ * are properly readable. If any of these steps fails, propagates
+ * an exception back to the test context, causing the test case
+ * to fail.
+ */
+ private static class PipelineTestThread extends RepeatingTestThread {
+ private final FileSystem fs;
+ private final FileSystem fsOtherUser;
+ private final Path path;
+
+
+ public PipelineTestThread(TestContext ctx,
+ FileSystem fs, FileSystem fsOtherUser, Path p) {
+ super(ctx);
+ this.fs = fs;
+ this.fsOtherUser = fsOtherUser;
+ this.path = p;
+ }
+
+ @Override
+ public void doAnAction() throws Exception {
+ FSDataOutputStream stm = fs.create(path, true);
+ try {
+ AppendTestUtil.write(stm, 0, 100);
+ stm.hflush();
+ loopRecoverLease(fsOtherUser, path);
+ AppendTestUtil.check(fs, path, 100);
+ } finally {
+ try {
+ stm.close();
+ } catch (IOException e) {
+ // should expect this since we lost the lease
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Pipeline test thread for " + path;
+ }
+ }
+
+
+ /**
+ * @return the node which is expected to run the recovery of the
+ * given block, which is known to be under construction inside the
+ * given NameNOde.
+ */
+ private DatanodeDescriptor getExpectedPrimaryNode(NameNode nn,
+ ExtendedBlock blk) {
+ BlockManager bm0 = nn.getNamesystem().getBlockManager();
+ BlockInfo storedBlock = bm0.getStoredBlock(blk.getLocalBlock());
+ assertTrue("Block " + blk + " should be under construction, " +
+ "got: " + storedBlock,
+ storedBlock instanceof BlockInfoUnderConstruction);
+ BlockInfoUnderConstruction ucBlock =
+ (BlockInfoUnderConstruction)storedBlock;
+ // We expect that the first indexed replica will be the one
+ // to be in charge of the synchronization / recovery protocol.
+ DatanodeDescriptor expectedPrimary = ucBlock.getExpectedLocations()[0];
+ return expectedPrimary;
+ }
+
+ private DistributedFileSystem createFsAsOtherUser(
+ final MiniDFSCluster cluster, final Configuration conf)
+ throws IOException, InterruptedException {
+ return (DistributedFileSystem) UserGroupInformation.createUserForTesting(
+ "otheruser", new String[] { "othergroup"})
+ .doAs(new PrivilegedExceptionAction<FileSystem>() {
+ @Override
+ public FileSystem run() throws Exception {
+ return HATestUtil.configureFailoverFs(
+ cluster, conf);
+ }
+ });
+ }
+
+ /**
+ * Try to cover the lease on the given file for up to 30
+ * seconds.
+ * @param fsOtherUser the filesystem to use for the recoverLease call
+ * @param testPath the path on which to run lease recovery
+ * @throws TimeoutException if lease recover does not succeed within 30
+ * seconds
+ * @throws InterruptedException if the thread is interrupted
+ */
+ private static void loopRecoverLease(
+ final FileSystem fsOtherUser, final Path testPath)
+ throws TimeoutException, InterruptedException {
+ try {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ boolean success;
+ try {
+ success = ((DistributedFileSystem)fsOtherUser)
+ .recoverLease(testPath);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ if (!success) {
+ LOG.info("Waiting to recover lease successfully");
+ }
+ return success;
+ }
+ }, 1000, 30000);
+ } catch (TimeoutException e) {
+ throw new TimeoutException("Timed out recovering lease for " +
+ testPath);
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java?rev=1292494&r1=1292493&r2=1292494&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java Wed Feb 22 20:31:52 2012
@@ -110,7 +110,11 @@ public abstract class GenericTestUtils {
private final CountDownLatch fireLatch = new CountDownLatch(1);
private final CountDownLatch waitLatch = new CountDownLatch(1);
-
+ private final CountDownLatch resultLatch = new CountDownLatch(1);
+
+ // Result fields set after proceed() is called.
+ private volatile Throwable thrown;
+ private volatile Object returnValue;
public DelayAnswer(Log log) {
this.LOG = log;
@@ -145,7 +149,40 @@ public abstract class GenericTestUtils {
}
protected Object passThrough(InvocationOnMock invocation) throws Throwable {
- return invocation.callRealMethod();
+ try {
+ Object ret = invocation.callRealMethod();
+ returnValue = ret;
+ return ret;
+ } catch (Throwable t) {
+ thrown = t;
+ throw t;
+ } finally {
+ resultLatch.countDown();
+ }
+ }
+
+ /**
+ * After calling proceed(), this will wait until the call has
+ * completed and a result has been returned to the caller.
+ */
+ public void waitForResult() throws InterruptedException {
+ resultLatch.await();
+ }
+
+ /**
+ * After the call has gone through, return any exception that
+ * was thrown, or null if no exception was thrown.
+ */
+ public Throwable getThrown() {
+ return thrown;
+ }
+
+ /**
+ * After the call has gone through, return the call's return value,
+ * or null in case it was void or an exception was thrown.
+ */
+ public Object getReturnValue() {
+ return returnValue;
}
}