You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/08/17 22:26:15 UTC

hadoop git commit: HDFS-12072. Provide fairness between EC and non-EC recovery tasks. Contributed by Eddy Xu.

Repository: hadoop
Updated Branches:
  refs/heads/trunk ab1a8ae85 -> b29894889


HDFS-12072. Provide fairness between EC and non-EC recovery tasks. Contributed by Eddy Xu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b2989488
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b2989488
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b2989488

Branch: refs/heads/trunk
Commit: b29894889742dda654cd88a7ce72a4e51fccb328
Parents: ab1a8ae
Author: Andrew Wang <wa...@apache.org>
Authored: Thu Aug 17 15:26:11 2017 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Thu Aug 17 15:26:11 2017 -0700

----------------------------------------------------------------------
 .../blockmanagement/DatanodeDescriptor.java     |  6 +-
 .../server/blockmanagement/DatanodeManager.java | 45 ++++++---
 .../blockmanagement/TestDatanodeManager.java    | 96 +++++++++++++++-----
 3 files changed, 108 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2989488/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 2bd4a20..d35894c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -661,7 +661,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return erasurecodeBlocks.size();
   }
 
-  public List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
+  int getNumberOfReplicateBlocks() {
+    return replicateBlocks.size();
+  }
+
+  List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
     return replicateBlocks.poll(maxTransfers);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2989488/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 78783ca..c75bcea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1663,21 +1663,38 @@ public class DatanodeManager {
     }
 
     final List<DatanodeCommand> cmds = new ArrayList<>();
-    // check pending replication
-    List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
-        maxTransfers);
-    if (pendingList != null) {
-      cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
-          pendingList));
-      maxTransfers -= pendingList.size();
-    }
-    // check pending erasure coding tasks
-    List<BlockECReconstructionInfo> pendingECList = nodeinfo
-        .getErasureCodeCommand(maxTransfers);
-    if (pendingECList != null) {
-      cmds.add(new BlockECReconstructionCommand(
-          DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList));
+    // Allocate _approximately_ maxTransfers pending tasks to DataNode.
+    // NN chooses pending tasks based on the ratio between the lengths of
+    // replication and erasure-coded block queues.
+    int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks();
+    int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded();
+    int totalBlocks = totalReplicateBlocks + totalECBlocks;
+    if (totalBlocks > 0) {
+      int numReplicationTasks = (int) Math.ceil(
+          (double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
+      int numECTasks = (int) Math.ceil(
+          (double) (totalECBlocks * maxTransfers) / totalBlocks);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Pending replication tasks: " + numReplicationTasks
+            + " erasure-coded tasks: " + numECTasks);
+      }
+      // check pending replication tasks
+      List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
+          numReplicationTasks);
+      if (pendingList != null && !pendingList.isEmpty()) {
+        cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
+            pendingList));
+      }
+      // check pending erasure coding tasks
+      List<BlockECReconstructionInfo> pendingECList = nodeinfo
+          .getErasureCodeCommand(numECTasks);
+      if (pendingECList != null && !pendingECList.isEmpty()) {
+        cmds.add(new BlockECReconstructionCommand(
+            DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList));
+      }
     }
+
     // check block invalidation
     Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
     if (blks != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2989488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index de002f4..286f4a4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -25,6 +25,7 @@ import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -500,46 +501,93 @@ public class TestDatanodeManager {
         "127.0.0.1:23456", bothAgain.get(1).getInfoAddr());
   }
 
-  @Test
-  public void testPendingRecoveryTasks() throws IOException {
+  /**
+   * Verify the correctness of pending recovery process.
+   *
+   * @param numReplicationBlocks the number of replication blocks in the queue.
+   * @param numECBlocks number of EC blocks in the queue.
+   * @param maxTransfers the maxTransfer value.
+   * @param numReplicationTasks the number of replication tasks polled from
+   *                            the queue.
+   * @param numECTasks the number of EC tasks polled from the queue.
+   *
+   * @throws IOException
+   */
+  private void verifyPendingRecoveryTasks(
+      int numReplicationBlocks, int numECBlocks,
+      int maxTransfers, int numReplicationTasks, int numECTasks)
+      throws IOException {
     FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
     Mockito.when(fsn.hasWriteLock()).thenReturn(true);
     Configuration conf = new Configuration();
     DatanodeManager dm = Mockito.spy(mockDatanodeManager(fsn, conf));
 
-    int maxTransfers = 20;
-    int numPendingTasks = 7;
-    int numECTasks = maxTransfers - numPendingTasks;
-
     DatanodeDescriptor nodeInfo = Mockito.mock(DatanodeDescriptor.class);
     Mockito.when(nodeInfo.isRegistered()).thenReturn(true);
     Mockito.when(nodeInfo.getStorageInfos())
         .thenReturn(new DatanodeStorageInfo[0]);
 
-    List<BlockTargetPair> pendingList =
-        Collections.nCopies(numPendingTasks, new BlockTargetPair(null, null));
-    Mockito.when(nodeInfo.getReplicationCommand(maxTransfers))
-        .thenReturn(pendingList);
-    List<BlockECReconstructionInfo> ecPendingList =
-        Collections.nCopies(numECTasks, null);
+    if (numReplicationBlocks > 0) {
+      Mockito.when(nodeInfo.getNumberOfReplicateBlocks())
+          .thenReturn(numReplicationBlocks);
+
+      List<BlockTargetPair> tasks =
+          Collections.nCopies(
+              Math.min(numReplicationTasks, numReplicationBlocks),
+              new BlockTargetPair(null, null));
+      Mockito.when(nodeInfo.getReplicationCommand(numReplicationTasks))
+          .thenReturn(tasks);
+    }
+
+    if (numECBlocks > 0) {
+      Mockito.when(nodeInfo.getNumberOfBlocksToBeErasureCoded())
+          .thenReturn(numECBlocks);
+
+      List<BlockECReconstructionInfo> tasks =
+          Collections.nCopies(numECTasks, null);
+      Mockito.when(nodeInfo.getErasureCodeCommand(numECTasks))
+          .thenReturn(tasks);
+    }
 
-    Mockito.when(nodeInfo.getErasureCodeCommand(numECTasks))
-        .thenReturn(ecPendingList);
     DatanodeRegistration dnReg = Mockito.mock(DatanodeRegistration.class);
     Mockito.when(dm.getDatanode(dnReg)).thenReturn(nodeInfo);
-
     DatanodeCommand[] cmds = dm.handleHeartbeat(
         dnReg, new StorageReport[1], "bp-123", 0, 0, 10, maxTransfers, 0, null,
         SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
 
-    assertEquals(2, cmds.length);
-    assertTrue(cmds[0] instanceof BlockCommand);
-    BlockCommand replicaCmd = (BlockCommand) cmds[0];
-    assertEquals(numPendingTasks, replicaCmd.getBlocks().length);
-    assertEquals(numPendingTasks, replicaCmd.getTargets().length);
-    assertTrue(cmds[1] instanceof BlockECReconstructionCommand);
-    BlockECReconstructionCommand ecRecoveryCmd =
-        (BlockECReconstructionCommand) cmds[1];
-    assertEquals(numECTasks, ecRecoveryCmd.getECTasks().size());
+    long expectedNumCmds = Arrays.stream(
+        new int[]{numReplicationTasks, numECTasks})
+        .filter(x -> x > 0)
+        .count();
+    assertEquals(expectedNumCmds, cmds.length);
+
+    int idx = 0;
+    if (numReplicationTasks > 0) {
+      assertTrue(cmds[idx] instanceof BlockCommand);
+      BlockCommand cmd = (BlockCommand) cmds[0];
+      assertEquals(numReplicationTasks, cmd.getBlocks().length);
+      assertEquals(numReplicationTasks, cmd.getTargets().length);
+      idx++;
+    }
+
+    if (numECTasks > 0) {
+      assertTrue(cmds[idx] instanceof BlockECReconstructionCommand);
+      BlockECReconstructionCommand cmd =
+          (BlockECReconstructionCommand) cmds[idx];
+      assertEquals(numECTasks, cmd.getECTasks().size());
+    }
+
+    Mockito.verify(nodeInfo).getReplicationCommand(numReplicationTasks);
+    Mockito.verify(nodeInfo).getErasureCodeCommand(numECTasks);
+  }
+
+  @Test
+  public void testPendingRecoveryTasks() throws IOException {
+    // Tasks are slitted according to the ratio between queue lengths.
+    verifyPendingRecoveryTasks(20, 20, 20, 10, 10);
+    verifyPendingRecoveryTasks(40, 10, 20, 16, 4);
+
+    // Approximately load tasks if the ratio between queue length is large.
+    verifyPendingRecoveryTasks(400, 1, 20, 20, 1);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org