You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2017/12/01 19:20:07 UTC
[1/3] hadoop git commit: Revert "HDFS-11576. Block recovery will fail
indefinitely if recovery time > heartbeat interval. Contributed by Lukas
Majercak"
Repository: hadoop
Updated Branches:
refs/heads/branch-3.0 482fd5a88 -> 6266c5268
refs/heads/branch-3.0.0 be664bd64 -> 60adf0b8b
refs/heads/trunk 7225ec0ce -> 53bbef380
Revert "HDFS-11576. Block recovery will fail indefinitely if recovery time > heartbeat interval. Contributed by Lukas Majercak"
This reverts commit 482fd5a880994f37fc3ad9e0cc2d127737b70aef.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6266c526
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6266c526
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6266c526
Branch: refs/heads/branch-3.0
Commit: 6266c52680ec95a93b18ae79aaaa692754e8898d
Parents: 482fd5a
Author: Chris Douglas <cd...@apache.org>
Authored: Fri Dec 1 11:19:17 2017 -0800
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Dec 1 11:19:17 2017 -0800
----------------------------------------------------------------------
.../apache/hadoop/test/GenericTestUtils.java | 10 +-
.../server/blockmanagement/BlockManager.java | 40 ------
.../blockmanagement/PendingRecoveryBlocks.java | 143 -------------------
.../hdfs/server/namenode/FSNamesystem.java | 40 +++---
.../TestPendingRecoveryBlocks.java | 87 -----------
.../hdfs/server/datanode/TestBlockRecovery.java | 108 --------------
.../namenode/ha/TestPipelinesFailover.java | 5 +-
7 files changed, 20 insertions(+), 413 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6266c526/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
index 72c8d41..4cb9f8b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
@@ -624,16 +624,10 @@ public abstract class GenericTestUtils {
* conditions.
*/
public static class SleepAnswer implements Answer<Object> {
- private final int minSleepTime;
private final int maxSleepTime;
private static Random r = new Random();
-
+
public SleepAnswer(int maxSleepTime) {
- this(0, maxSleepTime);
- }
-
- public SleepAnswer(int minSleepTime, int maxSleepTime) {
- this.minSleepTime = minSleepTime;
this.maxSleepTime = maxSleepTime;
}
@@ -641,7 +635,7 @@ public abstract class GenericTestUtils {
public Object answer(InvocationOnMock invocation) throws Throwable {
boolean interrupted = false;
try {
- Thread.sleep(r.nextInt(maxSleepTime) + minSleepTime);
+ Thread.sleep(r.nextInt(maxSleepTime));
} catch (InterruptedException ie) {
interrupted = true;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6266c526/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 1cdb159..4986027 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -164,8 +164,6 @@ public class BlockManager implements BlockStatsMXBean {
private static final String QUEUE_REASON_FUTURE_GENSTAMP =
"generation stamp is in the future";
- private static final long BLOCK_RECOVERY_TIMEOUT_MULTIPLIER = 30;
-
private final Namesystem namesystem;
private final BlockManagerSafeMode bmSafeMode;
@@ -355,9 +353,6 @@ public class BlockManager implements BlockStatsMXBean {
@VisibleForTesting
final PendingReconstructionBlocks pendingReconstruction;
- /** Stores information about block recovery attempts. */
- private final PendingRecoveryBlocks pendingRecoveryBlocks;
-
/** The maximum number of replicas allowed for a block */
public final short maxReplication;
/**
@@ -554,12 +549,6 @@ public class BlockManager implements BlockStatsMXBean {
}
this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
- long heartbeatIntervalSecs = conf.getTimeDuration(
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
- long blockRecoveryTimeout = getBlockRecoveryTimeout(heartbeatIntervalSecs);
- pendingRecoveryBlocks = new PendingRecoveryBlocks(blockRecoveryTimeout);
-
this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
@@ -4747,25 +4736,6 @@ public class BlockManager implements BlockStatsMXBean {
}
}
- /**
- * Notification of a successful block recovery.
- * @param block for which the recovery succeeded
- */
- public void successfulBlockRecovery(BlockInfo block) {
- pendingRecoveryBlocks.remove(block);
- }
-
- /**
- * Checks whether a recovery attempt has been made for the given block.
- * If so, checks whether that attempt has timed out.
- * @param b block for which recovery is being attempted
- * @return true if no recovery attempt has been made or
- * the previous attempt timed out
- */
- public boolean addBlockRecoveryAttempt(BlockInfo b) {
- return pendingRecoveryBlocks.add(b);
- }
-
@VisibleForTesting
public void flushBlockOps() throws IOException {
runBlockOp(new Callable<Void>(){
@@ -4893,14 +4863,4 @@ public class BlockManager implements BlockStatsMXBean {
}
return i;
}
-
- private static long getBlockRecoveryTimeout(long heartbeatIntervalSecs) {
- return TimeUnit.SECONDS.toMillis(heartbeatIntervalSecs *
- BLOCK_RECOVERY_TIMEOUT_MULTIPLIER);
- }
-
- @VisibleForTesting
- public void setBlockRecoveryTimeout(long blockRecoveryTimeout) {
- pendingRecoveryBlocks.setRecoveryTimeoutInterval(blockRecoveryTimeout);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6266c526/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
deleted file mode 100644
index 3f5f27c..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdfs.util.LightWeightHashSet;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * PendingRecoveryBlocks tracks recovery attempts for each block and their
- * timeouts to ensure we do not have multiple recoveries at the same time
- * and retry only after the timeout for a recovery has expired.
- */
-class PendingRecoveryBlocks {
- private static final Logger LOG = BlockManager.LOG;
-
- /** List of recovery attempts per block and the time they expire. */
- private final LightWeightHashSet<BlockRecoveryAttempt> recoveryTimeouts =
- new LightWeightHashSet<>();
-
- /** The timeout for issuing a block recovery again.
- * (it should be larger than the time to recover a block)
- */
- private long recoveryTimeoutInterval;
-
- PendingRecoveryBlocks(long timeout) {
- this.recoveryTimeoutInterval = timeout;
- }
-
- /**
- * Remove recovery attempt for the given block.
- * @param block whose recovery attempt to remove.
- */
- synchronized void remove(BlockInfo block) {
- recoveryTimeouts.remove(new BlockRecoveryAttempt(block));
- }
-
- /**
- * Checks whether a recovery attempt has been made for the given block.
- * If so, checks whether that attempt has timed out.
- * @param block block for which recovery is being attempted
- * @return true if no recovery attempt has been made or
- * the previous attempt timed out
- */
- synchronized boolean add(BlockInfo block) {
- boolean added = false;
- long curTime = getTime();
- BlockRecoveryAttempt recoveryAttempt =
- recoveryTimeouts.getElement(new BlockRecoveryAttempt(block));
-
- if (recoveryAttempt == null) {
- BlockRecoveryAttempt newAttempt = new BlockRecoveryAttempt(
- block, curTime + recoveryTimeoutInterval);
- added = recoveryTimeouts.add(newAttempt);
- } else if (recoveryAttempt.hasTimedOut(curTime)) {
- // Previous attempt timed out, reset the timeout
- recoveryAttempt.setTimeout(curTime + recoveryTimeoutInterval);
- added = true;
- } else {
- long timeoutIn = TimeUnit.MILLISECONDS.toSeconds(
- recoveryAttempt.timeoutAt - curTime);
- LOG.info("Block recovery attempt for " + block + " rejected, as the " +
- "previous attempt times out in " + timeoutIn + " seconds.");
- }
- return added;
- }
-
- /**
- * Check whether the given block is under recovery.
- * @param b block for which to check
- * @return true if the given block is being recovered
- */
- synchronized boolean isUnderRecovery(BlockInfo b) {
- BlockRecoveryAttempt recoveryAttempt =
- recoveryTimeouts.getElement(new BlockRecoveryAttempt(b));
- return recoveryAttempt != null;
- }
-
- long getTime() {
- return Time.monotonicNow();
- }
-
- @VisibleForTesting
- synchronized void setRecoveryTimeoutInterval(long recoveryTimeoutInterval) {
- this.recoveryTimeoutInterval = recoveryTimeoutInterval;
- }
-
- /**
- * Tracks timeout for block recovery attempt of a given block.
- */
- private static class BlockRecoveryAttempt {
- private final BlockInfo blockInfo;
- private long timeoutAt;
-
- private BlockRecoveryAttempt(BlockInfo blockInfo) {
- this(blockInfo, 0);
- }
-
- BlockRecoveryAttempt(BlockInfo blockInfo, long timeoutAt) {
- this.blockInfo = blockInfo;
- this.timeoutAt = timeoutAt;
- }
-
- boolean hasTimedOut(long currentTime) {
- return currentTime > timeoutAt;
- }
-
- void setTimeout(long newTimeoutAt) {
- this.timeoutAt = newTimeoutAt;
- }
-
- @Override
- public int hashCode() {
- return blockInfo.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof BlockRecoveryAttempt) {
- return this.blockInfo.equals(((BlockRecoveryAttempt) obj).blockInfo);
- }
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6266c526/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 558fef7..0f61621 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3299,30 +3299,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
+ "Removed empty last block and closed file " + src);
return true;
}
- // Start recovery of the last block for this file
- // Only do so if there is no ongoing recovery for this block,
- // or the previous recovery for this block timed out.
- if (blockManager.addBlockRecoveryAttempt(lastBlock)) {
- long blockRecoveryId = nextGenerationStamp(
- blockManager.isLegacyBlock(lastBlock));
- if(copyOnTruncate) {
- lastBlock.setGenerationStamp(blockRecoveryId);
- } else if(truncateRecovery) {
- recoveryBlock.setGenerationStamp(blockRecoveryId);
- }
- uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
-
- // Cannot close file right now, since the last block requires recovery.
- // This may potentially cause infinite loop in lease recovery
- // if there are no valid replicas on data-nodes.
- NameNode.stateChangeLog.warn(
- "DIR* NameSystem.internalReleaseLease: " +
- "File " + src + " has not been closed." +
- " Lease recovery is in progress. " +
- "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
- }
+ // start recovery of the last block for this file
+ long blockRecoveryId = nextGenerationStamp(
+ blockManager.isLegacyBlock(lastBlock));
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
+ if(copyOnTruncate) {
+ lastBlock.setGenerationStamp(blockRecoveryId);
+ } else if(truncateRecovery) {
+ recoveryBlock.setGenerationStamp(blockRecoveryId);
+ }
+ uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
leaseManager.renewLease(lease);
+ // Cannot close file right now, since the last block requires recovery.
+ // This may potentially cause infinite loop in lease recovery
+ // if there are no valid replicas on data-nodes.
+ NameNode.stateChangeLog.warn(
+ "DIR* NameSystem.internalReleaseLease: " +
+ "File " + src + " has not been closed." +
+ " Lease recovery is in progress. " +
+ "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
break;
}
return false;
@@ -3590,7 +3585,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// If this commit does not want to close the file, persist blocks
FSDirWriteFileOp.persistBlocks(dir, src, iFile, false);
}
- blockManager.successfulBlockRecovery(storedBlock);
} finally {
writeUnlock("commitBlockSynchronization");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6266c526/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
deleted file mode 100644
index baad89f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * This class contains unit tests for PendingRecoveryBlocks.java functionality.
- */
-public class TestPendingRecoveryBlocks {
-
- private PendingRecoveryBlocks pendingRecoveryBlocks;
- private final long recoveryTimeout = 1000L;
-
- private final BlockInfo blk1 = getBlock(1);
- private final BlockInfo blk2 = getBlock(2);
- private final BlockInfo blk3 = getBlock(3);
-
- @Before
- public void setUp() {
- pendingRecoveryBlocks =
- Mockito.spy(new PendingRecoveryBlocks(recoveryTimeout));
- }
-
- BlockInfo getBlock(long blockId) {
- return new BlockInfoContiguous(new Block(blockId), (short) 0);
- }
-
- @Test
- public void testAddDifferentBlocks() {
- assertTrue(pendingRecoveryBlocks.add(blk1));
- assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk1));
- assertTrue(pendingRecoveryBlocks.add(blk2));
- assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk2));
- assertTrue(pendingRecoveryBlocks.add(blk3));
- assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk3));
- }
-
- @Test
- public void testAddAndRemoveBlocks() {
- // Add blocks
- assertTrue(pendingRecoveryBlocks.add(blk1));
- assertTrue(pendingRecoveryBlocks.add(blk2));
-
- // Remove blk1
- pendingRecoveryBlocks.remove(blk1);
-
- // Adding back blk1 should succeed
- assertTrue(pendingRecoveryBlocks.add(blk1));
- }
-
- @Test
- public void testAddBlockWithPreviousRecoveryTimedOut() {
- // Add blk
- Mockito.doReturn(0L).when(pendingRecoveryBlocks).getTime();
- assertTrue(pendingRecoveryBlocks.add(blk1));
-
- // Should fail, has not timed out yet
- Mockito.doReturn(recoveryTimeout / 2).when(pendingRecoveryBlocks).getTime();
- assertFalse(pendingRecoveryBlocks.add(blk1));
-
- // Should succeed after timing out
- Mockito.doReturn(recoveryTimeout * 2).when(pendingRecoveryBlocks).getTime();
- assertTrue(pendingRecoveryBlocks.add(blk1));
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6266c526/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 208447d..311d5a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -18,10 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
-import org.apache.hadoop.hdfs.AppendTestUtil;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
-
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -46,7 +43,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -98,7 +94,6 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.GenericTestUtils.SleepAnswer;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
@@ -1040,107 +1035,4 @@ public class TestBlockRecovery {
Assert.fail("Thread failure: " + failureReason);
}
}
-
- /**
- * Test for block recovery taking longer than the heartbeat interval.
- */
- @Test(timeout = 300000L)
- public void testRecoverySlowerThanHeartbeat() throws Exception {
- tearDown(); // Stop the Mocked DN started in startup()
-
- SleepAnswer delayer = new SleepAnswer(3000, 6000);
- testRecoveryWithDatanodeDelayed(delayer);
- }
-
- /**
- * Test for block recovery timeout. All recovery attempts will be delayed
- * and the first attempt will be lost to trigger recovery timeout and retry.
- */
- @Test(timeout = 300000L)
- public void testRecoveryTimeout() throws Exception {
- tearDown(); // Stop the Mocked DN started in startup()
- final Random r = new Random();
-
- // Make sure first commitBlockSynchronization call from the DN gets lost
- // for the recovery timeout to expire and new recovery attempt
- // to be started.
- SleepAnswer delayer = new SleepAnswer(3000) {
- private final AtomicBoolean callRealMethod = new AtomicBoolean();
-
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- boolean interrupted = false;
- try {
- Thread.sleep(r.nextInt(3000) + 6000);
- } catch (InterruptedException ie) {
- interrupted = true;
- }
- try {
- if (callRealMethod.get()) {
- return invocation.callRealMethod();
- }
- callRealMethod.set(true);
- return null;
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- }
- };
- testRecoveryWithDatanodeDelayed(delayer);
- }
-
- private void testRecoveryWithDatanodeDelayed(
- GenericTestUtils.SleepAnswer recoveryDelayer) throws Exception {
- Configuration configuration = new HdfsConfiguration();
- configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
- MiniDFSCluster cluster = null;
-
- try {
- cluster = new MiniDFSCluster.Builder(configuration)
- .numDataNodes(2).build();
- cluster.waitActive();
- final FSNamesystem ns = cluster.getNamesystem();
- final NameNode nn = cluster.getNameNode();
- final DistributedFileSystem dfs = cluster.getFileSystem();
- ns.getBlockManager().setBlockRecoveryTimeout(
- TimeUnit.SECONDS.toMillis(10));
-
- // Create a file and never close the output stream to trigger recovery
- FSDataOutputStream out = dfs.create(new Path("/testSlowRecovery"),
- (short) 2);
- out.write(AppendTestUtil.randomBytes(0, 4096));
- out.hsync();
-
- List<DataNode> dataNodes = cluster.getDataNodes();
- for (DataNode datanode : dataNodes) {
- DatanodeProtocolClientSideTranslatorPB nnSpy =
- InternalDataNodeTestUtils.spyOnBposToNN(datanode, nn);
-
- Mockito.doAnswer(recoveryDelayer).when(nnSpy).
- commitBlockSynchronization(
- Mockito.any(ExtendedBlock.class), Mockito.anyInt(),
- Mockito.anyLong(), Mockito.anyBoolean(),
- Mockito.anyBoolean(), Mockito.anyObject(),
- Mockito.anyObject());
- }
-
- // Make sure hard lease expires to trigger replica recovery
- cluster.setLeasePeriod(100L, 100L);
-
- // Wait for recovery to succeed
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- return ns.getCompleteBlocksTotal() > 0;
- }
- }, 300, 300000);
-
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6266c526/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
index a565578..dc7f47a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Random;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
@@ -279,14 +278,12 @@ public class TestPipelinesFailover {
// Disable permissions so that another user can recover the lease.
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-
+
FSDataOutputStream stm = null;
final MiniDFSCluster cluster = newMiniCluster(conf, 3);
try {
cluster.waitActive();
cluster.transitionToActive(0);
- cluster.getNamesystem().getBlockManager().setBlockRecoveryTimeout(
- TimeUnit.SECONDS.toMillis(1));
Thread.sleep(500);
LOG.info("Starting with NN 0 active");
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[3/3] hadoop git commit: Revert "HDFS-11576. Block recovery will fail
indefinitely if recovery time > heartbeat interval. Contributed by Lukas
Majercak"
Posted by cd...@apache.org.
Revert "HDFS-11576. Block recovery will fail indefinitely if recovery time > heartbeat interval. Contributed by Lukas Majercak"
This reverts commit 5304698dc8c5667c33e6ed9c4a827ef57172a723.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/53bbef38
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/53bbef38
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/53bbef38
Branch: refs/heads/trunk
Commit: 53bbef3802194b7a0a3ce5cd3c91def9e88856e3
Parents: 7225ec0
Author: Chris Douglas <cd...@apache.org>
Authored: Fri Dec 1 11:19:01 2017 -0800
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Dec 1 11:19:38 2017 -0800
----------------------------------------------------------------------
.../apache/hadoop/test/GenericTestUtils.java | 10 +-
.../server/blockmanagement/BlockManager.java | 40 ------
.../blockmanagement/PendingRecoveryBlocks.java | 143 -------------------
.../hdfs/server/namenode/FSNamesystem.java | 40 +++---
.../TestPendingRecoveryBlocks.java | 87 -----------
.../hdfs/server/datanode/TestBlockRecovery.java | 108 --------------
.../namenode/ha/TestPipelinesFailover.java | 5 +-
7 files changed, 20 insertions(+), 413 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bbef38/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
index cdde48c..0db6c73 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
@@ -641,16 +641,10 @@ public abstract class GenericTestUtils {
* conditions.
*/
public static class SleepAnswer implements Answer<Object> {
- private final int minSleepTime;
private final int maxSleepTime;
private static Random r = new Random();
-
+
public SleepAnswer(int maxSleepTime) {
- this(0, maxSleepTime);
- }
-
- public SleepAnswer(int minSleepTime, int maxSleepTime) {
- this.minSleepTime = minSleepTime;
this.maxSleepTime = maxSleepTime;
}
@@ -658,7 +652,7 @@ public abstract class GenericTestUtils {
public Object answer(InvocationOnMock invocation) throws Throwable {
boolean interrupted = false;
try {
- Thread.sleep(r.nextInt(maxSleepTime) + minSleepTime);
+ Thread.sleep(r.nextInt(maxSleepTime));
} catch (InterruptedException ie) {
interrupted = true;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bbef38/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 1cdb159..4986027 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -164,8 +164,6 @@ public class BlockManager implements BlockStatsMXBean {
private static final String QUEUE_REASON_FUTURE_GENSTAMP =
"generation stamp is in the future";
- private static final long BLOCK_RECOVERY_TIMEOUT_MULTIPLIER = 30;
-
private final Namesystem namesystem;
private final BlockManagerSafeMode bmSafeMode;
@@ -355,9 +353,6 @@ public class BlockManager implements BlockStatsMXBean {
@VisibleForTesting
final PendingReconstructionBlocks pendingReconstruction;
- /** Stores information about block recovery attempts. */
- private final PendingRecoveryBlocks pendingRecoveryBlocks;
-
/** The maximum number of replicas allowed for a block */
public final short maxReplication;
/**
@@ -554,12 +549,6 @@ public class BlockManager implements BlockStatsMXBean {
}
this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
- long heartbeatIntervalSecs = conf.getTimeDuration(
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
- long blockRecoveryTimeout = getBlockRecoveryTimeout(heartbeatIntervalSecs);
- pendingRecoveryBlocks = new PendingRecoveryBlocks(blockRecoveryTimeout);
-
this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
@@ -4747,25 +4736,6 @@ public class BlockManager implements BlockStatsMXBean {
}
}
- /**
- * Notification of a successful block recovery.
- * @param block for which the recovery succeeded
- */
- public void successfulBlockRecovery(BlockInfo block) {
- pendingRecoveryBlocks.remove(block);
- }
-
- /**
- * Checks whether a recovery attempt has been made for the given block.
- * If so, checks whether that attempt has timed out.
- * @param b block for which recovery is being attempted
- * @return true if no recovery attempt has been made or
- * the previous attempt timed out
- */
- public boolean addBlockRecoveryAttempt(BlockInfo b) {
- return pendingRecoveryBlocks.add(b);
- }
-
@VisibleForTesting
public void flushBlockOps() throws IOException {
runBlockOp(new Callable<Void>(){
@@ -4893,14 +4863,4 @@ public class BlockManager implements BlockStatsMXBean {
}
return i;
}
-
- private static long getBlockRecoveryTimeout(long heartbeatIntervalSecs) {
- return TimeUnit.SECONDS.toMillis(heartbeatIntervalSecs *
- BLOCK_RECOVERY_TIMEOUT_MULTIPLIER);
- }
-
- @VisibleForTesting
- public void setBlockRecoveryTimeout(long blockRecoveryTimeout) {
- pendingRecoveryBlocks.setRecoveryTimeoutInterval(blockRecoveryTimeout);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bbef38/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
deleted file mode 100644
index 3f5f27c..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdfs.util.LightWeightHashSet;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * PendingRecoveryBlocks tracks recovery attempts for each block and their
- * timeouts to ensure we do not have multiple recoveries at the same time
- * and retry only after the timeout for a recovery has expired.
- */
-class PendingRecoveryBlocks {
- private static final Logger LOG = BlockManager.LOG;
-
- /** List of recovery attempts per block and the time they expire. */
- private final LightWeightHashSet<BlockRecoveryAttempt> recoveryTimeouts =
- new LightWeightHashSet<>();
-
- /** The timeout for issuing a block recovery again.
- * (it should be larger than the time to recover a block)
- */
- private long recoveryTimeoutInterval;
-
- PendingRecoveryBlocks(long timeout) {
- this.recoveryTimeoutInterval = timeout;
- }
-
- /**
- * Remove recovery attempt for the given block.
- * @param block whose recovery attempt to remove.
- */
- synchronized void remove(BlockInfo block) {
- recoveryTimeouts.remove(new BlockRecoveryAttempt(block));
- }
-
- /**
- * Checks whether a recovery attempt has been made for the given block.
- * If so, checks whether that attempt has timed out.
- * @param block block for which recovery is being attempted
- * @return true if no recovery attempt has been made or
- * the previous attempt timed out
- */
- synchronized boolean add(BlockInfo block) {
- boolean added = false;
- long curTime = getTime();
- BlockRecoveryAttempt recoveryAttempt =
- recoveryTimeouts.getElement(new BlockRecoveryAttempt(block));
-
- if (recoveryAttempt == null) {
- BlockRecoveryAttempt newAttempt = new BlockRecoveryAttempt(
- block, curTime + recoveryTimeoutInterval);
- added = recoveryTimeouts.add(newAttempt);
- } else if (recoveryAttempt.hasTimedOut(curTime)) {
- // Previous attempt timed out, reset the timeout
- recoveryAttempt.setTimeout(curTime + recoveryTimeoutInterval);
- added = true;
- } else {
- long timeoutIn = TimeUnit.MILLISECONDS.toSeconds(
- recoveryAttempt.timeoutAt - curTime);
- LOG.info("Block recovery attempt for " + block + " rejected, as the " +
- "previous attempt times out in " + timeoutIn + " seconds.");
- }
- return added;
- }
-
- /**
- * Check whether the given block is under recovery.
- * @param b block for which to check
- * @return true if the given block is being recovered
- */
- synchronized boolean isUnderRecovery(BlockInfo b) {
- BlockRecoveryAttempt recoveryAttempt =
- recoveryTimeouts.getElement(new BlockRecoveryAttempt(b));
- return recoveryAttempt != null;
- }
-
- long getTime() {
- return Time.monotonicNow();
- }
-
- @VisibleForTesting
- synchronized void setRecoveryTimeoutInterval(long recoveryTimeoutInterval) {
- this.recoveryTimeoutInterval = recoveryTimeoutInterval;
- }
-
- /**
- * Tracks timeout for block recovery attempt of a given block.
- */
- private static class BlockRecoveryAttempt {
- private final BlockInfo blockInfo;
- private long timeoutAt;
-
- private BlockRecoveryAttempt(BlockInfo blockInfo) {
- this(blockInfo, 0);
- }
-
- BlockRecoveryAttempt(BlockInfo blockInfo, long timeoutAt) {
- this.blockInfo = blockInfo;
- this.timeoutAt = timeoutAt;
- }
-
- boolean hasTimedOut(long currentTime) {
- return currentTime > timeoutAt;
- }
-
- void setTimeout(long newTimeoutAt) {
- this.timeoutAt = newTimeoutAt;
- }
-
- @Override
- public int hashCode() {
- return blockInfo.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof BlockRecoveryAttempt) {
- return this.blockInfo.equals(((BlockRecoveryAttempt) obj).blockInfo);
- }
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bbef38/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 6a890e2..d3d9cdc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3318,30 +3318,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
+ "Removed empty last block and closed file " + src);
return true;
}
- // Start recovery of the last block for this file
- // Only do so if there is no ongoing recovery for this block,
- // or the previous recovery for this block timed out.
- if (blockManager.addBlockRecoveryAttempt(lastBlock)) {
- long blockRecoveryId = nextGenerationStamp(
- blockManager.isLegacyBlock(lastBlock));
- if(copyOnTruncate) {
- lastBlock.setGenerationStamp(blockRecoveryId);
- } else if(truncateRecovery) {
- recoveryBlock.setGenerationStamp(blockRecoveryId);
- }
- uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
-
- // Cannot close file right now, since the last block requires recovery.
- // This may potentially cause infinite loop in lease recovery
- // if there are no valid replicas on data-nodes.
- NameNode.stateChangeLog.warn(
- "DIR* NameSystem.internalReleaseLease: " +
- "File " + src + " has not been closed." +
- " Lease recovery is in progress. " +
- "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
- }
+ // start recovery of the last block for this file
+ long blockRecoveryId = nextGenerationStamp(
+ blockManager.isLegacyBlock(lastBlock));
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
+ if(copyOnTruncate) {
+ lastBlock.setGenerationStamp(blockRecoveryId);
+ } else if(truncateRecovery) {
+ recoveryBlock.setGenerationStamp(blockRecoveryId);
+ }
+ uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
leaseManager.renewLease(lease);
+ // Cannot close file right now, since the last block requires recovery.
+ // This may potentially cause infinite loop in lease recovery
+ // if there are no valid replicas on data-nodes.
+ NameNode.stateChangeLog.warn(
+ "DIR* NameSystem.internalReleaseLease: " +
+ "File " + src + " has not been closed." +
+ " Lease recovery is in progress. " +
+ "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
break;
}
return false;
@@ -3609,7 +3604,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// If this commit does not want to close the file, persist blocks
FSDirWriteFileOp.persistBlocks(dir, src, iFile, false);
}
- blockManager.successfulBlockRecovery(storedBlock);
} finally {
writeUnlock("commitBlockSynchronization");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bbef38/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
deleted file mode 100644
index baad89f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * This class contains unit tests for PendingRecoveryBlocks.java functionality.
- */
-public class TestPendingRecoveryBlocks {
-
- private PendingRecoveryBlocks pendingRecoveryBlocks;
- private final long recoveryTimeout = 1000L;
-
- private final BlockInfo blk1 = getBlock(1);
- private final BlockInfo blk2 = getBlock(2);
- private final BlockInfo blk3 = getBlock(3);
-
- @Before
- public void setUp() {
- pendingRecoveryBlocks =
- Mockito.spy(new PendingRecoveryBlocks(recoveryTimeout));
- }
-
- BlockInfo getBlock(long blockId) {
- return new BlockInfoContiguous(new Block(blockId), (short) 0);
- }
-
- @Test
- public void testAddDifferentBlocks() {
- assertTrue(pendingRecoveryBlocks.add(blk1));
- assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk1));
- assertTrue(pendingRecoveryBlocks.add(blk2));
- assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk2));
- assertTrue(pendingRecoveryBlocks.add(blk3));
- assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk3));
- }
-
- @Test
- public void testAddAndRemoveBlocks() {
- // Add blocks
- assertTrue(pendingRecoveryBlocks.add(blk1));
- assertTrue(pendingRecoveryBlocks.add(blk2));
-
- // Remove blk1
- pendingRecoveryBlocks.remove(blk1);
-
- // Adding back blk1 should succeed
- assertTrue(pendingRecoveryBlocks.add(blk1));
- }
-
- @Test
- public void testAddBlockWithPreviousRecoveryTimedOut() {
- // Add blk
- Mockito.doReturn(0L).when(pendingRecoveryBlocks).getTime();
- assertTrue(pendingRecoveryBlocks.add(blk1));
-
- // Should fail, has not timed out yet
- Mockito.doReturn(recoveryTimeout / 2).when(pendingRecoveryBlocks).getTime();
- assertFalse(pendingRecoveryBlocks.add(blk1));
-
- // Should succeed after timing out
- Mockito.doReturn(recoveryTimeout * 2).when(pendingRecoveryBlocks).getTime();
- assertTrue(pendingRecoveryBlocks.add(blk1));
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bbef38/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 208447d..311d5a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -18,10 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
-import org.apache.hadoop.hdfs.AppendTestUtil;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
-
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -46,7 +43,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -98,7 +94,6 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.GenericTestUtils.SleepAnswer;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
@@ -1040,107 +1035,4 @@ public class TestBlockRecovery {
Assert.fail("Thread failure: " + failureReason);
}
}
-
- /**
- * Test for block recovery taking longer than the heartbeat interval.
- */
- @Test(timeout = 300000L)
- public void testRecoverySlowerThanHeartbeat() throws Exception {
- tearDown(); // Stop the Mocked DN started in startup()
-
- SleepAnswer delayer = new SleepAnswer(3000, 6000);
- testRecoveryWithDatanodeDelayed(delayer);
- }
-
- /**
- * Test for block recovery timeout. All recovery attempts will be delayed
- * and the first attempt will be lost to trigger recovery timeout and retry.
- */
- @Test(timeout = 300000L)
- public void testRecoveryTimeout() throws Exception {
- tearDown(); // Stop the Mocked DN started in startup()
- final Random r = new Random();
-
- // Make sure first commitBlockSynchronization call from the DN gets lost
- // for the recovery timeout to expire and new recovery attempt
- // to be started.
- SleepAnswer delayer = new SleepAnswer(3000) {
- private final AtomicBoolean callRealMethod = new AtomicBoolean();
-
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- boolean interrupted = false;
- try {
- Thread.sleep(r.nextInt(3000) + 6000);
- } catch (InterruptedException ie) {
- interrupted = true;
- }
- try {
- if (callRealMethod.get()) {
- return invocation.callRealMethod();
- }
- callRealMethod.set(true);
- return null;
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- }
- };
- testRecoveryWithDatanodeDelayed(delayer);
- }
-
- private void testRecoveryWithDatanodeDelayed(
- GenericTestUtils.SleepAnswer recoveryDelayer) throws Exception {
- Configuration configuration = new HdfsConfiguration();
- configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
- MiniDFSCluster cluster = null;
-
- try {
- cluster = new MiniDFSCluster.Builder(configuration)
- .numDataNodes(2).build();
- cluster.waitActive();
- final FSNamesystem ns = cluster.getNamesystem();
- final NameNode nn = cluster.getNameNode();
- final DistributedFileSystem dfs = cluster.getFileSystem();
- ns.getBlockManager().setBlockRecoveryTimeout(
- TimeUnit.SECONDS.toMillis(10));
-
- // Create a file and never close the output stream to trigger recovery
- FSDataOutputStream out = dfs.create(new Path("/testSlowRecovery"),
- (short) 2);
- out.write(AppendTestUtil.randomBytes(0, 4096));
- out.hsync();
-
- List<DataNode> dataNodes = cluster.getDataNodes();
- for (DataNode datanode : dataNodes) {
- DatanodeProtocolClientSideTranslatorPB nnSpy =
- InternalDataNodeTestUtils.spyOnBposToNN(datanode, nn);
-
- Mockito.doAnswer(recoveryDelayer).when(nnSpy).
- commitBlockSynchronization(
- Mockito.any(ExtendedBlock.class), Mockito.anyInt(),
- Mockito.anyLong(), Mockito.anyBoolean(),
- Mockito.anyBoolean(), Mockito.anyObject(),
- Mockito.anyObject());
- }
-
- // Make sure hard lease expires to trigger replica recovery
- cluster.setLeasePeriod(100L, 100L);
-
- // Wait for recovery to succeed
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- return ns.getCompleteBlocksTotal() > 0;
- }
- }, 300, 300000);
-
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bbef38/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
index a565578..dc7f47a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Random;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
@@ -279,14 +278,12 @@ public class TestPipelinesFailover {
// Disable permissions so that another user can recover the lease.
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-
+
FSDataOutputStream stm = null;
final MiniDFSCluster cluster = newMiniCluster(conf, 3);
try {
cluster.waitActive();
cluster.transitionToActive(0);
- cluster.getNamesystem().getBlockManager().setBlockRecoveryTimeout(
- TimeUnit.SECONDS.toMillis(1));
Thread.sleep(500);
LOG.info("Starting with NN 0 active");
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/3] hadoop git commit: Revert "HDFS-11576. Block recovery will fail
indefinitely if recovery time > heartbeat interval. Contributed by Lukas
Majercak"
Posted by cd...@apache.org.
Revert "HDFS-11576. Block recovery will fail indefinitely if recovery time > heartbeat interval. Contributed by Lukas Majercak"
This reverts commit be664bd64d0dba3356f44605f870e988abfec932.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/60adf0b8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/60adf0b8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/60adf0b8
Branch: refs/heads/branch-3.0.0
Commit: 60adf0b8bdeaabcfe64f2fb3188af2073b369b55
Parents: be664bd
Author: Chris Douglas <cd...@apache.org>
Authored: Fri Dec 1 11:19:25 2017 -0800
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Dec 1 11:19:25 2017 -0800
----------------------------------------------------------------------
.../apache/hadoop/test/GenericTestUtils.java | 10 +-
.../server/blockmanagement/BlockManager.java | 40 ------
.../blockmanagement/PendingRecoveryBlocks.java | 143 -------------------
.../hdfs/server/namenode/FSNamesystem.java | 40 +++---
.../TestPendingRecoveryBlocks.java | 87 -----------
.../hdfs/server/datanode/TestBlockRecovery.java | 108 --------------
.../namenode/ha/TestPipelinesFailover.java | 5 +-
7 files changed, 20 insertions(+), 413 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/60adf0b8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
index 72c8d41..4cb9f8b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
@@ -624,16 +624,10 @@ public abstract class GenericTestUtils {
* conditions.
*/
public static class SleepAnswer implements Answer<Object> {
- private final int minSleepTime;
private final int maxSleepTime;
private static Random r = new Random();
-
+
public SleepAnswer(int maxSleepTime) {
- this(0, maxSleepTime);
- }
-
- public SleepAnswer(int minSleepTime, int maxSleepTime) {
- this.minSleepTime = minSleepTime;
this.maxSleepTime = maxSleepTime;
}
@@ -641,7 +635,7 @@ public abstract class GenericTestUtils {
public Object answer(InvocationOnMock invocation) throws Throwable {
boolean interrupted = false;
try {
- Thread.sleep(r.nextInt(maxSleepTime) + minSleepTime);
+ Thread.sleep(r.nextInt(maxSleepTime));
} catch (InterruptedException ie) {
interrupted = true;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/60adf0b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index f440c73..bdabd81 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -164,8 +164,6 @@ public class BlockManager implements BlockStatsMXBean {
private static final String QUEUE_REASON_FUTURE_GENSTAMP =
"generation stamp is in the future";
- private static final long BLOCK_RECOVERY_TIMEOUT_MULTIPLIER = 30;
-
private final Namesystem namesystem;
private final BlockManagerSafeMode bmSafeMode;
@@ -355,9 +353,6 @@ public class BlockManager implements BlockStatsMXBean {
@VisibleForTesting
final PendingReconstructionBlocks pendingReconstruction;
- /** Stores information about block recovery attempts. */
- private final PendingRecoveryBlocks pendingRecoveryBlocks;
-
/** The maximum number of replicas allowed for a block */
public final short maxReplication;
/**
@@ -554,12 +549,6 @@ public class BlockManager implements BlockStatsMXBean {
}
this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
- long heartbeatIntervalSecs = conf.getTimeDuration(
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
- long blockRecoveryTimeout = getBlockRecoveryTimeout(heartbeatIntervalSecs);
- pendingRecoveryBlocks = new PendingRecoveryBlocks(blockRecoveryTimeout);
-
this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
@@ -4749,25 +4738,6 @@ public class BlockManager implements BlockStatsMXBean {
}
}
- /**
- * Notification of a successful block recovery.
- * @param block for which the recovery succeeded
- */
- public void successfulBlockRecovery(BlockInfo block) {
- pendingRecoveryBlocks.remove(block);
- }
-
- /**
- * Checks whether a recovery attempt has been made for the given block.
- * If so, checks whether that attempt has timed out.
- * @param b block for which recovery is being attempted
- * @return true if no recovery attempt has been made or
- * the previous attempt timed out
- */
- public boolean addBlockRecoveryAttempt(BlockInfo b) {
- return pendingRecoveryBlocks.add(b);
- }
-
@VisibleForTesting
public void flushBlockOps() throws IOException {
runBlockOp(new Callable<Void>(){
@@ -4895,14 +4865,4 @@ public class BlockManager implements BlockStatsMXBean {
}
return i;
}
-
- private static long getBlockRecoveryTimeout(long heartbeatIntervalSecs) {
- return TimeUnit.SECONDS.toMillis(heartbeatIntervalSecs *
- BLOCK_RECOVERY_TIMEOUT_MULTIPLIER);
- }
-
- @VisibleForTesting
- public void setBlockRecoveryTimeout(long blockRecoveryTimeout) {
- pendingRecoveryBlocks.setRecoveryTimeoutInterval(blockRecoveryTimeout);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/60adf0b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
deleted file mode 100644
index 3f5f27c..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdfs.util.LightWeightHashSet;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * PendingRecoveryBlocks tracks recovery attempts for each block and their
- * timeouts to ensure we do not have multiple recoveries at the same time
- * and retry only after the timeout for a recovery has expired.
- */
-class PendingRecoveryBlocks {
- private static final Logger LOG = BlockManager.LOG;
-
- /** List of recovery attempts per block and the time they expire. */
- private final LightWeightHashSet<BlockRecoveryAttempt> recoveryTimeouts =
- new LightWeightHashSet<>();
-
- /** The timeout for issuing a block recovery again.
- * (it should be larger than the time to recover a block)
- */
- private long recoveryTimeoutInterval;
-
- PendingRecoveryBlocks(long timeout) {
- this.recoveryTimeoutInterval = timeout;
- }
-
- /**
- * Remove recovery attempt for the given block.
- * @param block whose recovery attempt to remove.
- */
- synchronized void remove(BlockInfo block) {
- recoveryTimeouts.remove(new BlockRecoveryAttempt(block));
- }
-
- /**
- * Checks whether a recovery attempt has been made for the given block.
- * If so, checks whether that attempt has timed out.
- * @param block block for which recovery is being attempted
- * @return true if no recovery attempt has been made or
- * the previous attempt timed out
- */
- synchronized boolean add(BlockInfo block) {
- boolean added = false;
- long curTime = getTime();
- BlockRecoveryAttempt recoveryAttempt =
- recoveryTimeouts.getElement(new BlockRecoveryAttempt(block));
-
- if (recoveryAttempt == null) {
- BlockRecoveryAttempt newAttempt = new BlockRecoveryAttempt(
- block, curTime + recoveryTimeoutInterval);
- added = recoveryTimeouts.add(newAttempt);
- } else if (recoveryAttempt.hasTimedOut(curTime)) {
- // Previous attempt timed out, reset the timeout
- recoveryAttempt.setTimeout(curTime + recoveryTimeoutInterval);
- added = true;
- } else {
- long timeoutIn = TimeUnit.MILLISECONDS.toSeconds(
- recoveryAttempt.timeoutAt - curTime);
- LOG.info("Block recovery attempt for " + block + " rejected, as the " +
- "previous attempt times out in " + timeoutIn + " seconds.");
- }
- return added;
- }
-
- /**
- * Check whether the given block is under recovery.
- * @param b block for which to check
- * @return true if the given block is being recovered
- */
- synchronized boolean isUnderRecovery(BlockInfo b) {
- BlockRecoveryAttempt recoveryAttempt =
- recoveryTimeouts.getElement(new BlockRecoveryAttempt(b));
- return recoveryAttempt != null;
- }
-
- long getTime() {
- return Time.monotonicNow();
- }
-
- @VisibleForTesting
- synchronized void setRecoveryTimeoutInterval(long recoveryTimeoutInterval) {
- this.recoveryTimeoutInterval = recoveryTimeoutInterval;
- }
-
- /**
- * Tracks timeout for block recovery attempt of a given block.
- */
- private static class BlockRecoveryAttempt {
- private final BlockInfo blockInfo;
- private long timeoutAt;
-
- private BlockRecoveryAttempt(BlockInfo blockInfo) {
- this(blockInfo, 0);
- }
-
- BlockRecoveryAttempt(BlockInfo blockInfo, long timeoutAt) {
- this.blockInfo = blockInfo;
- this.timeoutAt = timeoutAt;
- }
-
- boolean hasTimedOut(long currentTime) {
- return currentTime > timeoutAt;
- }
-
- void setTimeout(long newTimeoutAt) {
- this.timeoutAt = newTimeoutAt;
- }
-
- @Override
- public int hashCode() {
- return blockInfo.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof BlockRecoveryAttempt) {
- return this.blockInfo.equals(((BlockRecoveryAttempt) obj).blockInfo);
- }
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/60adf0b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 558fef7..0f61621 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3299,30 +3299,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
+ "Removed empty last block and closed file " + src);
return true;
}
- // Start recovery of the last block for this file
- // Only do so if there is no ongoing recovery for this block,
- // or the previous recovery for this block timed out.
- if (blockManager.addBlockRecoveryAttempt(lastBlock)) {
- long blockRecoveryId = nextGenerationStamp(
- blockManager.isLegacyBlock(lastBlock));
- if(copyOnTruncate) {
- lastBlock.setGenerationStamp(blockRecoveryId);
- } else if(truncateRecovery) {
- recoveryBlock.setGenerationStamp(blockRecoveryId);
- }
- uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
-
- // Cannot close file right now, since the last block requires recovery.
- // This may potentially cause infinite loop in lease recovery
- // if there are no valid replicas on data-nodes.
- NameNode.stateChangeLog.warn(
- "DIR* NameSystem.internalReleaseLease: " +
- "File " + src + " has not been closed." +
- " Lease recovery is in progress. " +
- "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
- }
+ // start recovery of the last block for this file
+ long blockRecoveryId = nextGenerationStamp(
+ blockManager.isLegacyBlock(lastBlock));
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
+ if(copyOnTruncate) {
+ lastBlock.setGenerationStamp(blockRecoveryId);
+ } else if(truncateRecovery) {
+ recoveryBlock.setGenerationStamp(blockRecoveryId);
+ }
+ uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
leaseManager.renewLease(lease);
+ // Cannot close file right now, since the last block requires recovery.
+ // This may potentially cause infinite loop in lease recovery
+ // if there are no valid replicas on data-nodes.
+ NameNode.stateChangeLog.warn(
+ "DIR* NameSystem.internalReleaseLease: " +
+ "File " + src + " has not been closed." +
+ " Lease recovery is in progress. " +
+ "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
break;
}
return false;
@@ -3590,7 +3585,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// If this commit does not want to close the file, persist blocks
FSDirWriteFileOp.persistBlocks(dir, src, iFile, false);
}
- blockManager.successfulBlockRecovery(storedBlock);
} finally {
writeUnlock("commitBlockSynchronization");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/60adf0b8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
deleted file mode 100644
index baad89f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * This class contains unit tests for PendingRecoveryBlocks.java functionality.
- */
-public class TestPendingRecoveryBlocks {
-
- private PendingRecoveryBlocks pendingRecoveryBlocks;
- private final long recoveryTimeout = 1000L;
-
- private final BlockInfo blk1 = getBlock(1);
- private final BlockInfo blk2 = getBlock(2);
- private final BlockInfo blk3 = getBlock(3);
-
- @Before
- public void setUp() {
- pendingRecoveryBlocks =
- Mockito.spy(new PendingRecoveryBlocks(recoveryTimeout));
- }
-
- BlockInfo getBlock(long blockId) {
- return new BlockInfoContiguous(new Block(blockId), (short) 0);
- }
-
- @Test
- public void testAddDifferentBlocks() {
- assertTrue(pendingRecoveryBlocks.add(blk1));
- assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk1));
- assertTrue(pendingRecoveryBlocks.add(blk2));
- assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk2));
- assertTrue(pendingRecoveryBlocks.add(blk3));
- assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk3));
- }
-
- @Test
- public void testAddAndRemoveBlocks() {
- // Add blocks
- assertTrue(pendingRecoveryBlocks.add(blk1));
- assertTrue(pendingRecoveryBlocks.add(blk2));
-
- // Remove blk1
- pendingRecoveryBlocks.remove(blk1);
-
- // Adding back blk1 should succeed
- assertTrue(pendingRecoveryBlocks.add(blk1));
- }
-
- @Test
- public void testAddBlockWithPreviousRecoveryTimedOut() {
- // Add blk
- Mockito.doReturn(0L).when(pendingRecoveryBlocks).getTime();
- assertTrue(pendingRecoveryBlocks.add(blk1));
-
- // Should fail, has not timed out yet
- Mockito.doReturn(recoveryTimeout / 2).when(pendingRecoveryBlocks).getTime();
- assertFalse(pendingRecoveryBlocks.add(blk1));
-
- // Should succeed after timing out
- Mockito.doReturn(recoveryTimeout * 2).when(pendingRecoveryBlocks).getTime();
- assertTrue(pendingRecoveryBlocks.add(blk1));
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/60adf0b8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 208447d..311d5a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -18,10 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
-import org.apache.hadoop.hdfs.AppendTestUtil;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
-
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -46,7 +43,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -98,7 +94,6 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.GenericTestUtils.SleepAnswer;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
@@ -1040,107 +1035,4 @@ public class TestBlockRecovery {
Assert.fail("Thread failure: " + failureReason);
}
}
-
- /**
- * Test for block recovery taking longer than the heartbeat interval.
- */
- @Test(timeout = 300000L)
- public void testRecoverySlowerThanHeartbeat() throws Exception {
- tearDown(); // Stop the Mocked DN started in startup()
-
- SleepAnswer delayer = new SleepAnswer(3000, 6000);
- testRecoveryWithDatanodeDelayed(delayer);
- }
-
- /**
- * Test for block recovery timeout. All recovery attempts will be delayed
- * and the first attempt will be lost to trigger recovery timeout and retry.
- */
- @Test(timeout = 300000L)
- public void testRecoveryTimeout() throws Exception {
- tearDown(); // Stop the Mocked DN started in startup()
- final Random r = new Random();
-
- // Make sure first commitBlockSynchronization call from the DN gets lost
- // for the recovery timeout to expire and new recovery attempt
- // to be started.
- SleepAnswer delayer = new SleepAnswer(3000) {
- private final AtomicBoolean callRealMethod = new AtomicBoolean();
-
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- boolean interrupted = false;
- try {
- Thread.sleep(r.nextInt(3000) + 6000);
- } catch (InterruptedException ie) {
- interrupted = true;
- }
- try {
- if (callRealMethod.get()) {
- return invocation.callRealMethod();
- }
- callRealMethod.set(true);
- return null;
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- }
- };
- testRecoveryWithDatanodeDelayed(delayer);
- }
-
- private void testRecoveryWithDatanodeDelayed(
- GenericTestUtils.SleepAnswer recoveryDelayer) throws Exception {
- Configuration configuration = new HdfsConfiguration();
- configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
- MiniDFSCluster cluster = null;
-
- try {
- cluster = new MiniDFSCluster.Builder(configuration)
- .numDataNodes(2).build();
- cluster.waitActive();
- final FSNamesystem ns = cluster.getNamesystem();
- final NameNode nn = cluster.getNameNode();
- final DistributedFileSystem dfs = cluster.getFileSystem();
- ns.getBlockManager().setBlockRecoveryTimeout(
- TimeUnit.SECONDS.toMillis(10));
-
- // Create a file and never close the output stream to trigger recovery
- FSDataOutputStream out = dfs.create(new Path("/testSlowRecovery"),
- (short) 2);
- out.write(AppendTestUtil.randomBytes(0, 4096));
- out.hsync();
-
- List<DataNode> dataNodes = cluster.getDataNodes();
- for (DataNode datanode : dataNodes) {
- DatanodeProtocolClientSideTranslatorPB nnSpy =
- InternalDataNodeTestUtils.spyOnBposToNN(datanode, nn);
-
- Mockito.doAnswer(recoveryDelayer).when(nnSpy).
- commitBlockSynchronization(
- Mockito.any(ExtendedBlock.class), Mockito.anyInt(),
- Mockito.anyLong(), Mockito.anyBoolean(),
- Mockito.anyBoolean(), Mockito.anyObject(),
- Mockito.anyObject());
- }
-
- // Make sure hard lease expires to trigger replica recovery
- cluster.setLeasePeriod(100L, 100L);
-
- // Wait for recovery to succeed
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- return ns.getCompleteBlocksTotal() > 0;
- }
- }, 300, 300000);
-
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/60adf0b8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
index a565578..dc7f47a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Random;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
@@ -279,14 +278,12 @@ public class TestPipelinesFailover {
// Disable permissions so that another user can recover the lease.
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-
+
FSDataOutputStream stm = null;
final MiniDFSCluster cluster = newMiniCluster(conf, 3);
try {
cluster.waitActive();
cluster.transitionToActive(0);
- cluster.getNamesystem().getBlockManager().setBlockRecoveryTimeout(
- TimeUnit.SECONDS.toMillis(1));
Thread.sleep(500);
LOG.info("Starting with NN 0 active");
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org