You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2017/12/01 18:38:39 UTC
[2/3] hadoop git commit: HDFS-11576. Block recovery will fail
indefinitely if recovery time > heartbeat interval. Contributed by Lukas
Majercak
HDFS-11576. Block recovery will fail indefinitely if recovery time > heartbeat interval. Contributed by Lukas Majercak
(cherry picked from commit 5304698dc8c5667c33e6ed9c4a827ef57172a723)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/482fd5a8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/482fd5a8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/482fd5a8
Branch: refs/heads/branch-3.0
Commit: 482fd5a880994f37fc3ad9e0cc2d127737b70aef
Parents: def87db
Author: Chris Douglas <cd...@apache.org>
Authored: Fri Dec 1 10:29:30 2017 -0800
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Dec 1 10:30:08 2017 -0800
----------------------------------------------------------------------
.../apache/hadoop/test/GenericTestUtils.java | 10 +-
.../server/blockmanagement/BlockManager.java | 40 ++++++
.../blockmanagement/PendingRecoveryBlocks.java | 143 +++++++++++++++++++
.../hdfs/server/namenode/FSNamesystem.java | 40 +++---
.../TestPendingRecoveryBlocks.java | 87 +++++++++++
.../hdfs/server/datanode/TestBlockRecovery.java | 108 ++++++++++++++
.../namenode/ha/TestPipelinesFailover.java | 5 +-
7 files changed, 413 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482fd5a8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
index 4cb9f8b..72c8d41 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
@@ -624,10 +624,16 @@ public abstract class GenericTestUtils {
* conditions.
*/
public static class SleepAnswer implements Answer<Object> {
+ private final int minSleepTime;
private final int maxSleepTime;
private static Random r = new Random();
-
+
public SleepAnswer(int maxSleepTime) {
+ this(0, maxSleepTime);
+ }
+
+ public SleepAnswer(int minSleepTime, int maxSleepTime) {
+ this.minSleepTime = minSleepTime;
this.maxSleepTime = maxSleepTime;
}
@@ -635,7 +641,7 @@ public abstract class GenericTestUtils {
public Object answer(InvocationOnMock invocation) throws Throwable {
boolean interrupted = false;
try {
- Thread.sleep(r.nextInt(maxSleepTime));
+ Thread.sleep(r.nextInt(maxSleepTime) + minSleepTime);
} catch (InterruptedException ie) {
interrupted = true;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482fd5a8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 4986027..1cdb159 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -164,6 +164,8 @@ public class BlockManager implements BlockStatsMXBean {
private static final String QUEUE_REASON_FUTURE_GENSTAMP =
"generation stamp is in the future";
+ private static final long BLOCK_RECOVERY_TIMEOUT_MULTIPLIER = 30;
+
private final Namesystem namesystem;
private final BlockManagerSafeMode bmSafeMode;
@@ -353,6 +355,9 @@ public class BlockManager implements BlockStatsMXBean {
@VisibleForTesting
final PendingReconstructionBlocks pendingReconstruction;
+ /** Stores information about block recovery attempts. */
+ private final PendingRecoveryBlocks pendingRecoveryBlocks;
+
/** The maximum number of replicas allowed for a block */
public final short maxReplication;
/**
@@ -549,6 +554,12 @@ public class BlockManager implements BlockStatsMXBean {
}
this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
+ long heartbeatIntervalSecs = conf.getTimeDuration(
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
+ long blockRecoveryTimeout = getBlockRecoveryTimeout(heartbeatIntervalSecs);
+ pendingRecoveryBlocks = new PendingRecoveryBlocks(blockRecoveryTimeout);
+
this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
@@ -4736,6 +4747,25 @@ public class BlockManager implements BlockStatsMXBean {
}
}
+ /**
+ * Notification of a successful block recovery.
+ * @param block for which the recovery succeeded
+ */
+ public void successfulBlockRecovery(BlockInfo block) {
+ pendingRecoveryBlocks.remove(block);
+ }
+
+ /**
+ * Checks whether a recovery attempt has been made for the given block.
+ * If so, checks whether that attempt has timed out.
+ * @param b block for which recovery is being attempted
+ * @return true if no recovery attempt has been made or
+ * the previous attempt timed out
+ */
+ public boolean addBlockRecoveryAttempt(BlockInfo b) {
+ return pendingRecoveryBlocks.add(b);
+ }
+
@VisibleForTesting
public void flushBlockOps() throws IOException {
runBlockOp(new Callable<Void>(){
@@ -4863,4 +4893,14 @@ public class BlockManager implements BlockStatsMXBean {
}
return i;
}
+
+ private static long getBlockRecoveryTimeout(long heartbeatIntervalSecs) {
+ return TimeUnit.SECONDS.toMillis(heartbeatIntervalSecs *
+ BLOCK_RECOVERY_TIMEOUT_MULTIPLIER);
+ }
+
+ @VisibleForTesting
+ public void setBlockRecoveryTimeout(long blockRecoveryTimeout) {
+ pendingRecoveryBlocks.setRecoveryTimeoutInterval(blockRecoveryTimeout);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482fd5a8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
new file mode 100644
index 0000000..3f5f27c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * PendingRecoveryBlocks tracks recovery attempts for each block and their
+ * timeouts to ensure we do not have multiple recoveries at the same time
+ * and retry only after the timeout for a recovery has expired.
+ */
+class PendingRecoveryBlocks {
+ private static final Logger LOG = BlockManager.LOG;
+
+ /** List of recovery attempts per block and the time they expire. */
+ private final LightWeightHashSet<BlockRecoveryAttempt> recoveryTimeouts =
+ new LightWeightHashSet<>();
+
+ /** The timeout for issuing a block recovery again.
+ * (it should be larger than the time to recover a block)
+ */
+ private long recoveryTimeoutInterval;
+
+ PendingRecoveryBlocks(long timeout) {
+ this.recoveryTimeoutInterval = timeout;
+ }
+
+ /**
+ * Remove recovery attempt for the given block.
+ * @param block whose recovery attempt to remove.
+ */
+ synchronized void remove(BlockInfo block) {
+ recoveryTimeouts.remove(new BlockRecoveryAttempt(block));
+ }
+
+ /**
+ * Checks whether a recovery attempt has been made for the given block.
+ * If so, checks whether that attempt has timed out.
+ * @param block block for which recovery is being attempted
+ * @return true if no recovery attempt has been made or
+ * the previous attempt timed out
+ */
+ synchronized boolean add(BlockInfo block) {
+ boolean added = false;
+ long curTime = getTime();
+ BlockRecoveryAttempt recoveryAttempt =
+ recoveryTimeouts.getElement(new BlockRecoveryAttempt(block));
+
+ if (recoveryAttempt == null) {
+ BlockRecoveryAttempt newAttempt = new BlockRecoveryAttempt(
+ block, curTime + recoveryTimeoutInterval);
+ added = recoveryTimeouts.add(newAttempt);
+ } else if (recoveryAttempt.hasTimedOut(curTime)) {
+ // Previous attempt timed out, reset the timeout
+ recoveryAttempt.setTimeout(curTime + recoveryTimeoutInterval);
+ added = true;
+ } else {
+ long timeoutIn = TimeUnit.MILLISECONDS.toSeconds(
+ recoveryAttempt.timeoutAt - curTime);
+ LOG.info("Block recovery attempt for " + block + " rejected, as the " +
+ "previous attempt times out in " + timeoutIn + " seconds.");
+ }
+ return added;
+ }
+
+ /**
+ * Check whether the given block is under recovery.
+ * @param b block for which to check
+ * @return true if the given block is being recovered
+ */
+ synchronized boolean isUnderRecovery(BlockInfo b) {
+ BlockRecoveryAttempt recoveryAttempt =
+ recoveryTimeouts.getElement(new BlockRecoveryAttempt(b));
+ return recoveryAttempt != null;
+ }
+
+ long getTime() {
+ return Time.monotonicNow();
+ }
+
+ @VisibleForTesting
+ synchronized void setRecoveryTimeoutInterval(long recoveryTimeoutInterval) {
+ this.recoveryTimeoutInterval = recoveryTimeoutInterval;
+ }
+
+ /**
+ * Tracks timeout for block recovery attempt of a given block.
+ */
+ private static class BlockRecoveryAttempt {
+ private final BlockInfo blockInfo;
+ private long timeoutAt;
+
+ private BlockRecoveryAttempt(BlockInfo blockInfo) {
+ this(blockInfo, 0);
+ }
+
+ BlockRecoveryAttempt(BlockInfo blockInfo, long timeoutAt) {
+ this.blockInfo = blockInfo;
+ this.timeoutAt = timeoutAt;
+ }
+
+ boolean hasTimedOut(long currentTime) {
+ return currentTime > timeoutAt;
+ }
+
+ void setTimeout(long newTimeoutAt) {
+ this.timeoutAt = newTimeoutAt;
+ }
+
+ @Override
+ public int hashCode() {
+ return blockInfo.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof BlockRecoveryAttempt) {
+ return this.blockInfo.equals(((BlockRecoveryAttempt) obj).blockInfo);
+ }
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482fd5a8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 0f61621..558fef7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3299,25 +3299,30 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
+ "Removed empty last block and closed file " + src);
return true;
}
- // start recovery of the last block for this file
- long blockRecoveryId = nextGenerationStamp(
- blockManager.isLegacyBlock(lastBlock));
- lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
- if(copyOnTruncate) {
- lastBlock.setGenerationStamp(blockRecoveryId);
- } else if(truncateRecovery) {
- recoveryBlock.setGenerationStamp(blockRecoveryId);
- }
- uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
- leaseManager.renewLease(lease);
- // Cannot close file right now, since the last block requires recovery.
- // This may potentially cause infinite loop in lease recovery
- // if there are no valid replicas on data-nodes.
- NameNode.stateChangeLog.warn(
- "DIR* NameSystem.internalReleaseLease: " +
+ // Start recovery of the last block for this file
+ // Only do so if there is no ongoing recovery for this block,
+ // or the previous recovery for this block timed out.
+ if (blockManager.addBlockRecoveryAttempt(lastBlock)) {
+ long blockRecoveryId = nextGenerationStamp(
+ blockManager.isLegacyBlock(lastBlock));
+ if(copyOnTruncate) {
+ lastBlock.setGenerationStamp(blockRecoveryId);
+ } else if(truncateRecovery) {
+ recoveryBlock.setGenerationStamp(blockRecoveryId);
+ }
+ uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
+
+ // Cannot close file right now, since the last block requires recovery.
+ // This may potentially cause infinite loop in lease recovery
+ // if there are no valid replicas on data-nodes.
+ NameNode.stateChangeLog.warn(
+ "DIR* NameSystem.internalReleaseLease: " +
"File " + src + " has not been closed." +
- " Lease recovery is in progress. " +
+ " Lease recovery is in progress. " +
"RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
+ }
+ lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
+ leaseManager.renewLease(lease);
break;
}
return false;
@@ -3585,6 +3590,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// If this commit does not want to close the file, persist blocks
FSDirWriteFileOp.persistBlocks(dir, src, iFile, false);
}
+ blockManager.successfulBlockRecovery(storedBlock);
} finally {
writeUnlock("commitBlockSynchronization");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482fd5a8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
new file mode 100644
index 0000000..baad89f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class contains unit tests for PendingRecoveryBlocks.java functionality.
+ */
+public class TestPendingRecoveryBlocks {
+
+ private PendingRecoveryBlocks pendingRecoveryBlocks;
+ private final long recoveryTimeout = 1000L;
+
+ private final BlockInfo blk1 = getBlock(1);
+ private final BlockInfo blk2 = getBlock(2);
+ private final BlockInfo blk3 = getBlock(3);
+
+ @Before
+ public void setUp() {
+ pendingRecoveryBlocks =
+ Mockito.spy(new PendingRecoveryBlocks(recoveryTimeout));
+ }
+
+ BlockInfo getBlock(long blockId) {
+ return new BlockInfoContiguous(new Block(blockId), (short) 0);
+ }
+
+ @Test
+ public void testAddDifferentBlocks() {
+ assertTrue(pendingRecoveryBlocks.add(blk1));
+ assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk1));
+ assertTrue(pendingRecoveryBlocks.add(blk2));
+ assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk2));
+ assertTrue(pendingRecoveryBlocks.add(blk3));
+ assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk3));
+ }
+
+ @Test
+ public void testAddAndRemoveBlocks() {
+ // Add blocks
+ assertTrue(pendingRecoveryBlocks.add(blk1));
+ assertTrue(pendingRecoveryBlocks.add(blk2));
+
+ // Remove blk1
+ pendingRecoveryBlocks.remove(blk1);
+
+ // Adding back blk1 should succeed
+ assertTrue(pendingRecoveryBlocks.add(blk1));
+ }
+
+ @Test
+ public void testAddBlockWithPreviousRecoveryTimedOut() {
+ // Add blk
+ Mockito.doReturn(0L).when(pendingRecoveryBlocks).getTime();
+ assertTrue(pendingRecoveryBlocks.add(blk1));
+
+ // Should fail, has not timed out yet
+ Mockito.doReturn(recoveryTimeout / 2).when(pendingRecoveryBlocks).getTime();
+ assertFalse(pendingRecoveryBlocks.add(blk1));
+
+ // Should succeed after timing out
+ Mockito.doReturn(recoveryTimeout * 2).when(pendingRecoveryBlocks).getTime();
+ assertTrue(pendingRecoveryBlocks.add(blk1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482fd5a8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 311d5a6..208447d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -18,7 +18,10 @@
package org.apache.hadoop.hdfs.server.datanode;
+import org.apache.hadoop.hdfs.AppendTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
+
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -43,6 +46,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -94,6 +98,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.SleepAnswer;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
@@ -1035,4 +1040,107 @@ public class TestBlockRecovery {
Assert.fail("Thread failure: " + failureReason);
}
}
+
+ /**
+ * Test for block recovery taking longer than the heartbeat interval.
+ */
+ @Test(timeout = 300000L)
+ public void testRecoverySlowerThanHeartbeat() throws Exception {
+ tearDown(); // Stop the Mocked DN started in startup()
+
+ SleepAnswer delayer = new SleepAnswer(3000, 6000);
+ testRecoveryWithDatanodeDelayed(delayer);
+ }
+
+ /**
+ * Test for block recovery timeout. All recovery attempts will be delayed
+ * and the first attempt will be lost to trigger recovery timeout and retry.
+ */
+ @Test(timeout = 300000L)
+ public void testRecoveryTimeout() throws Exception {
+ tearDown(); // Stop the Mocked DN started in startup()
+ final Random r = new Random();
+
+ // Make sure first commitBlockSynchronization call from the DN gets lost
+ // for the recovery timeout to expire and new recovery attempt
+ // to be started.
+ SleepAnswer delayer = new SleepAnswer(3000) {
+ private final AtomicBoolean callRealMethod = new AtomicBoolean();
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ boolean interrupted = false;
+ try {
+ Thread.sleep(r.nextInt(3000) + 6000);
+ } catch (InterruptedException ie) {
+ interrupted = true;
+ }
+ try {
+ if (callRealMethod.get()) {
+ return invocation.callRealMethod();
+ }
+ callRealMethod.set(true);
+ return null;
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ };
+ testRecoveryWithDatanodeDelayed(delayer);
+ }
+
+ private void testRecoveryWithDatanodeDelayed(
+ GenericTestUtils.SleepAnswer recoveryDelayer) throws Exception {
+ Configuration configuration = new HdfsConfiguration();
+ configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ MiniDFSCluster cluster = null;
+
+ try {
+ cluster = new MiniDFSCluster.Builder(configuration)
+ .numDataNodes(2).build();
+ cluster.waitActive();
+ final FSNamesystem ns = cluster.getNamesystem();
+ final NameNode nn = cluster.getNameNode();
+ final DistributedFileSystem dfs = cluster.getFileSystem();
+ ns.getBlockManager().setBlockRecoveryTimeout(
+ TimeUnit.SECONDS.toMillis(10));
+
+ // Create a file and never close the output stream to trigger recovery
+ FSDataOutputStream out = dfs.create(new Path("/testSlowRecovery"),
+ (short) 2);
+ out.write(AppendTestUtil.randomBytes(0, 4096));
+ out.hsync();
+
+ List<DataNode> dataNodes = cluster.getDataNodes();
+ for (DataNode datanode : dataNodes) {
+ DatanodeProtocolClientSideTranslatorPB nnSpy =
+ InternalDataNodeTestUtils.spyOnBposToNN(datanode, nn);
+
+ Mockito.doAnswer(recoveryDelayer).when(nnSpy).
+ commitBlockSynchronization(
+ Mockito.any(ExtendedBlock.class), Mockito.anyInt(),
+ Mockito.anyLong(), Mockito.anyBoolean(),
+ Mockito.anyBoolean(), Mockito.anyObject(),
+ Mockito.anyObject());
+ }
+
+ // Make sure hard lease expires to trigger replica recovery
+ cluster.setLeasePeriod(100L, 100L);
+
+ // Wait for recovery to succeed
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return ns.getCompleteBlocksTotal() > 0;
+ }
+ }, 300, 300000);
+
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482fd5a8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
index dc7f47a..a565578 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
@@ -278,12 +279,14 @@ public class TestPipelinesFailover {
// Disable permissions so that another user can recover the lease.
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-
+
FSDataOutputStream stm = null;
final MiniDFSCluster cluster = newMiniCluster(conf, 3);
try {
cluster.waitActive();
cluster.transitionToActive(0);
+ cluster.getNamesystem().getBlockManager().setBlockRecoveryTimeout(
+ TimeUnit.SECONDS.toMillis(1));
Thread.sleep(500);
LOG.info("Starting with NN 0 active");
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org