You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/08/22 00:17:32 UTC
[02/45] hadoop git commit: HDFS-12072. Provide fairness between EC
and non-EC recovery tasks. Contributed by Eddy Xu.
HDFS-12072. Provide fairness between EC and non-EC recovery tasks. Contributed by Eddy Xu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b2989488
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b2989488
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b2989488
Branch: refs/heads/YARN-3926
Commit: b29894889742dda654cd88a7ce72a4e51fccb328
Parents: ab1a8ae
Author: Andrew Wang <wa...@apache.org>
Authored: Thu Aug 17 15:26:11 2017 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Thu Aug 17 15:26:11 2017 -0700
----------------------------------------------------------------------
.../blockmanagement/DatanodeDescriptor.java | 6 +-
.../server/blockmanagement/DatanodeManager.java | 45 ++++++---
.../blockmanagement/TestDatanodeManager.java | 96 +++++++++++++++-----
3 files changed, 108 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2989488/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 2bd4a20..d35894c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -661,7 +661,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
return erasurecodeBlocks.size();
}
- public List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
+ int getNumberOfReplicateBlocks() {
+ return replicateBlocks.size();
+ }
+
+ List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
return replicateBlocks.poll(maxTransfers);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2989488/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 78783ca..c75bcea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1663,21 +1663,38 @@ public class DatanodeManager {
}
final List<DatanodeCommand> cmds = new ArrayList<>();
- // check pending replication
- List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
- maxTransfers);
- if (pendingList != null) {
- cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
- pendingList));
- maxTransfers -= pendingList.size();
- }
- // check pending erasure coding tasks
- List<BlockECReconstructionInfo> pendingECList = nodeinfo
- .getErasureCodeCommand(maxTransfers);
- if (pendingECList != null) {
- cmds.add(new BlockECReconstructionCommand(
- DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList));
+ // Allocate _approximately_ maxTransfers pending tasks to DataNode.
+ // NN chooses pending tasks based on the ratio between the lengths of
+ // replication and erasure-coded block queues.
+ int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks();
+ int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded();
+ int totalBlocks = totalReplicateBlocks + totalECBlocks;
+ if (totalBlocks > 0) {
+ int numReplicationTasks = (int) Math.ceil(
+ (double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
+ int numECTasks = (int) Math.ceil(
+ (double) (totalECBlocks * maxTransfers) / totalBlocks);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Pending replication tasks: " + numReplicationTasks
+ + " erasure-coded tasks: " + numECTasks);
+ }
+ // check pending replication tasks
+ List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
+ numReplicationTasks);
+ if (pendingList != null && !pendingList.isEmpty()) {
+ cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
+ pendingList));
+ }
+ // check pending erasure coding tasks
+ List<BlockECReconstructionInfo> pendingECList = nodeinfo
+ .getErasureCodeCommand(numECTasks);
+ if (pendingECList != null && !pendingECList.isEmpty()) {
+ cmds.add(new BlockECReconstructionCommand(
+ DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList));
+ }
}
+
// check block invalidation
Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
if (blks != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2989488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index de002f4..286f4a4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -25,6 +25,7 @@ import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -500,46 +501,93 @@ public class TestDatanodeManager {
"127.0.0.1:23456", bothAgain.get(1).getInfoAddr());
}
- @Test
- public void testPendingRecoveryTasks() throws IOException {
+ /**
+ * Verify the correctness of pending recovery process.
+ *
+ * @param numReplicationBlocks the number of replication blocks in the queue.
+ * @param numECBlocks number of EC blocks in the queue.
+ * @param maxTransfers the maxTransfer value.
+ * @param numReplicationTasks the number of replication tasks polled from
+ * the queue.
+ * @param numECTasks the number of EC tasks polled from the queue.
+ *
+ * @throws IOException
+ */
+ private void verifyPendingRecoveryTasks(
+ int numReplicationBlocks, int numECBlocks,
+ int maxTransfers, int numReplicationTasks, int numECTasks)
+ throws IOException {
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
Configuration conf = new Configuration();
DatanodeManager dm = Mockito.spy(mockDatanodeManager(fsn, conf));
- int maxTransfers = 20;
- int numPendingTasks = 7;
- int numECTasks = maxTransfers - numPendingTasks;
-
DatanodeDescriptor nodeInfo = Mockito.mock(DatanodeDescriptor.class);
Mockito.when(nodeInfo.isRegistered()).thenReturn(true);
Mockito.when(nodeInfo.getStorageInfos())
.thenReturn(new DatanodeStorageInfo[0]);
- List<BlockTargetPair> pendingList =
- Collections.nCopies(numPendingTasks, new BlockTargetPair(null, null));
- Mockito.when(nodeInfo.getReplicationCommand(maxTransfers))
- .thenReturn(pendingList);
- List<BlockECReconstructionInfo> ecPendingList =
- Collections.nCopies(numECTasks, null);
+ if (numReplicationBlocks > 0) {
+ Mockito.when(nodeInfo.getNumberOfReplicateBlocks())
+ .thenReturn(numReplicationBlocks);
+
+ List<BlockTargetPair> tasks =
+ Collections.nCopies(
+ Math.min(numReplicationTasks, numReplicationBlocks),
+ new BlockTargetPair(null, null));
+ Mockito.when(nodeInfo.getReplicationCommand(numReplicationTasks))
+ .thenReturn(tasks);
+ }
+
+ if (numECBlocks > 0) {
+ Mockito.when(nodeInfo.getNumberOfBlocksToBeErasureCoded())
+ .thenReturn(numECBlocks);
+
+ List<BlockECReconstructionInfo> tasks =
+ Collections.nCopies(numECTasks, null);
+ Mockito.when(nodeInfo.getErasureCodeCommand(numECTasks))
+ .thenReturn(tasks);
+ }
- Mockito.when(nodeInfo.getErasureCodeCommand(numECTasks))
- .thenReturn(ecPendingList);
DatanodeRegistration dnReg = Mockito.mock(DatanodeRegistration.class);
Mockito.when(dm.getDatanode(dnReg)).thenReturn(nodeInfo);
-
DatanodeCommand[] cmds = dm.handleHeartbeat(
dnReg, new StorageReport[1], "bp-123", 0, 0, 10, maxTransfers, 0, null,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
- assertEquals(2, cmds.length);
- assertTrue(cmds[0] instanceof BlockCommand);
- BlockCommand replicaCmd = (BlockCommand) cmds[0];
- assertEquals(numPendingTasks, replicaCmd.getBlocks().length);
- assertEquals(numPendingTasks, replicaCmd.getTargets().length);
- assertTrue(cmds[1] instanceof BlockECReconstructionCommand);
- BlockECReconstructionCommand ecRecoveryCmd =
- (BlockECReconstructionCommand) cmds[1];
- assertEquals(numECTasks, ecRecoveryCmd.getECTasks().size());
+ long expectedNumCmds = Arrays.stream(
+ new int[]{numReplicationTasks, numECTasks})
+ .filter(x -> x > 0)
+ .count();
+ assertEquals(expectedNumCmds, cmds.length);
+
+ int idx = 0;
+ if (numReplicationTasks > 0) {
+ assertTrue(cmds[idx] instanceof BlockCommand);
+ BlockCommand cmd = (BlockCommand) cmds[0];
+ assertEquals(numReplicationTasks, cmd.getBlocks().length);
+ assertEquals(numReplicationTasks, cmd.getTargets().length);
+ idx++;
+ }
+
+ if (numECTasks > 0) {
+ assertTrue(cmds[idx] instanceof BlockECReconstructionCommand);
+ BlockECReconstructionCommand cmd =
+ (BlockECReconstructionCommand) cmds[idx];
+ assertEquals(numECTasks, cmd.getECTasks().size());
+ }
+
+ Mockito.verify(nodeInfo).getReplicationCommand(numReplicationTasks);
+ Mockito.verify(nodeInfo).getErasureCodeCommand(numECTasks);
+ }
+
+ @Test
+ public void testPendingRecoveryTasks() throws IOException {
+ // Tasks are slitted according to the ratio between queue lengths.
+ verifyPendingRecoveryTasks(20, 20, 20, 10, 10);
+ verifyPendingRecoveryTasks(40, 10, 20, 16, 4);
+
+ // Approximately load tasks if the ratio between queue length is large.
+ verifyPendingRecoveryTasks(400, 1, 20, 20, 1);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org