You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/07/07 01:39:56 UTC

hadoop git commit: HDFS-8461. Erasure coding: fix priority level of UnderReplicatedBlocks for striped block. Contributed by Walter Su.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7285 ee01a0950 -> 2c494a843


HDFS-8461. Erasure coding: fix priority level of UnderReplicatedBlocks for striped block. Contributed by Walter Su.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2c494a84
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2c494a84
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2c494a84

Branch: refs/heads/HDFS-7285
Commit: 2c494a843699b478039f41336cf47bd4c635eb76
Parents: ee01a09
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Jul 6 16:39:47 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon Jul 6 16:39:47 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../blockmanagement/UnderReplicatedBlocks.java  |  53 ++++++-
 .../blockmanagement/BlockManagerTestUtil.java   |   8 +
 .../TestUnderReplicatedBlockQueues.java         |  62 ++++++++
 .../namenode/TestRecoverStripedBlocks.java      | 151 ++++++++++++-------
 5 files changed, 216 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c494a84/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 8f720fc..58b91b6 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -329,3 +329,6 @@
 
     HDFS-8684. Erasure Coding: fix some block number calculation for striped
     block. (yliu)
+
+    HDFS-8461. Erasure coding: fix priority level of UnderReplicatedBlocks for
+    striped block. (Walter Su via jing9)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c494a84/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
index f9bce26..47afb05 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
  *
  * <p/>
  * The policy for choosing which priority to give added blocks
- * is implemented in {@link #getPriority(int, int, int)}.
+ * is implemented in {@link #getPriority(BlockInfo, int, int, int)}.
  * </p>
  * <p>The queue order is as follows:</p>
  * <ol>
@@ -144,14 +144,28 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
    * @param expectedReplicas expected number of replicas of the block
    * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
    */
-  private int getPriority(int curReplicas,
+  private int getPriority(BlockInfo block,
+                          int curReplicas,
                           int decommissionedReplicas,
                           int expectedReplicas) {
     assert curReplicas >= 0 : "Negative replicas!";
     if (curReplicas >= expectedReplicas) {
       // Block has enough copies, but not enough racks
       return QUEUE_REPLICAS_BADLY_DISTRIBUTED;
-    } else if (curReplicas == 0) {
+    }
+    if (block.isStriped()) {
+      BlockInfoStriped sblk = (BlockInfoStriped) block;
+      return getPriorityStriped(curReplicas, decommissionedReplicas,
+          sblk.getRealDataBlockNum(), sblk.getParityBlockNum());
+    } else {
+      return getPriorityContiguous(curReplicas, decommissionedReplicas,
+          expectedReplicas);
+    }
+  }
+
+  private int getPriorityContiguous(int curReplicas, int decommissionedReplicas,
+      int expectedReplicas) {
+    if (curReplicas == 0) {
       // If there are zero non-decommissioned replicas but there are
       // some decommissioned replicas, then assign them highest priority
       if (decommissionedReplicas > 0) {
@@ -160,7 +174,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
       //all we have are corrupt blocks
       return QUEUE_WITH_CORRUPT_BLOCKS;
     } else if (curReplicas == 1) {
-      //only on replica -risk of loss
+      // only one replica, highest risk of loss
       // highest priority
       return QUEUE_HIGHEST_PRIORITY;
     } else if ((curReplicas * 3) < expectedReplicas) {
@@ -173,6 +187,27 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
     }
   }
 
+  private int getPriorityStriped(int curReplicas, int decommissionedReplicas,
+      short dataBlkNum, short parityBlkNum) {
+    if (curReplicas < dataBlkNum) {
+      // There are some replicas on decommissioned nodes so it's not corrupted
+      if (curReplicas + decommissionedReplicas >= dataBlkNum) {
+        return QUEUE_HIGHEST_PRIORITY;
+      }
+      return QUEUE_WITH_CORRUPT_BLOCKS;
+    } else if (curReplicas == dataBlkNum) {
+      // highest risk of loss, highest priority
+      return QUEUE_HIGHEST_PRIORITY;
+    } else if ((curReplicas - dataBlkNum) * 3 < parityBlkNum + 1) {
+      // can only afford one replica loss
+      // this is considered very under-replicated
+      return QUEUE_VERY_UNDER_REPLICATED;
+    } else {
+      // add to the normal queue for under replicated blocks
+      return QUEUE_UNDER_REPLICATED;
+    }
+  }
+
   /** add a block to a under replication queue according to its priority
    * @param block a under replication block
    * @param curReplicas current number of replicas of the block
@@ -185,7 +220,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
                            int decomissionedReplicas,
                            int expectedReplicas) {
     assert curReplicas >= 0 : "Negative replicas!";
-    int priLevel = getPriority(curReplicas, decomissionedReplicas,
+    int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
                                expectedReplicas);
     if(priorityQueues.get(priLevel).add(block)) {
       if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
@@ -208,7 +243,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
                               int oldReplicas, 
                               int decommissionedReplicas,
                               int oldExpectedReplicas) {
-    int priLevel = getPriority(oldReplicas,
+    int priLevel = getPriority(block, oldReplicas,
                                decommissionedReplicas,
                                oldExpectedReplicas);
     boolean removedBlock = remove(block, priLevel);
@@ -282,8 +317,10 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
                            int curReplicasDelta, int expectedReplicasDelta) {
     int oldReplicas = curReplicas-curReplicasDelta;
     int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
-    int curPri = getPriority(curReplicas, decommissionedReplicas, curExpectedReplicas);
-    int oldPri = getPriority(oldReplicas, decommissionedReplicas, oldExpectedReplicas);
+    int curPri = getPriority(block, curReplicas, decommissionedReplicas,
+        curExpectedReplicas);
+    int oldPri = getPriority(block, oldReplicas, decommissionedReplicas,
+        oldExpectedReplicas);
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + 
         block +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c494a84/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index e25ee31..64d80bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -306,4 +306,12 @@ public class BlockManagerTestUtil {
       throws ExecutionException, InterruptedException {
     dm.getDecomManager().runMonitor();
   }
+
+  /**
+   * add block to the replicateBlocks queue of the Datanode
+   */
+  public static void addBlockToBeReplicated(DatanodeDescriptor node,
+      Block block, DatanodeStorageInfo[] targets) {
+    node.addBlockToBeReplicated(block, targets);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c494a84/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
index de36e07..0f419ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
@@ -19,6 +19,9 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -28,10 +31,21 @@ import static org.junit.Assert.fail;
 
 public class TestUnderReplicatedBlockQueues {
 
+  private final ECSchema ecSchema =
+      ErasureCodingSchemaManager.getSystemDefaultSchema();
+  private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+
   private BlockInfo genBlockInfo(long id) {
     return new BlockInfoContiguous(new Block(id), (short) 3);
   }
 
+  private BlockInfo genStripedBlockInfo(long id, long numBytes) {
+    BlockInfoStriped sblk =  new BlockInfoStriped(new Block(id), ecSchema,
+        CELLSIZE);
+    sblk.setNumBytes(numBytes);
+    return sblk;
+  }
+
   /**
    * Test that adding blocks with different replication counts puts them
    * into different queues
@@ -85,6 +99,54 @@ public class TestUnderReplicatedBlockQueues {
     assertEquals(2, queues.getCorruptReplOneBlockSize());
   }
 
+  @Test
+  public void testStripedBlockPriorities() throws Throwable {
+    int dataBlkNum = ecSchema.getNumDataUnits();
+    int parityBlkNUm = ecSchema.getNumParityUnits();
+    doTestStripedBlockPriorities(1, parityBlkNUm);
+    doTestStripedBlockPriorities(dataBlkNum, parityBlkNUm);
+  }
+
+  private void doTestStripedBlockPriorities(int dataBlkNum, int parityBlkNum)
+      throws Throwable {
+    int groupSize = dataBlkNum + parityBlkNum;
+    long numBytes = CELLSIZE * dataBlkNum;
+    UnderReplicatedBlocks queues = new UnderReplicatedBlocks();
+
+    // add a striped block which been left NUM_DATA_BLOCKS internal blocks
+    BlockInfo block1 = genStripedBlockInfo(-100, numBytes);
+    assertAdded(queues, block1, dataBlkNum, 0, groupSize);
+    assertEquals(1, queues.getUnderReplicatedBlockCount());
+    assertEquals(1, queues.size());
+    assertInLevel(queues, block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
+
+    // add a striped block which been left NUM_DATA_BLOCKS+1 internal blocks
+    BlockInfo block2 = genStripedBlockInfo(-200, numBytes);
+    assertAdded(queues, block2, dataBlkNum + 1, 0, groupSize);
+    assertEquals(2, queues.getUnderReplicatedBlockCount());
+    assertEquals(2, queues.size());
+    assertInLevel(queues, block2,
+        UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED);
+
+    // add a striped block which been left NUM_DATA_BLOCKS+2 internal blocks
+    BlockInfo block3 = genStripedBlockInfo(-300, numBytes);
+    assertAdded(queues, block3, dataBlkNum + 2, 0, groupSize);
+    assertEquals(3, queues.getUnderReplicatedBlockCount());
+    assertEquals(3, queues.size());
+    assertInLevel(queues, block3,
+        UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED);
+
+    // add a corrupted block
+    BlockInfo block_corrupt = genStripedBlockInfo(-400, numBytes);
+    assertEquals(0, queues.getCorruptBlockSize());
+    assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize);
+    assertEquals(4, queues.size());
+    assertEquals(3, queues.getUnderReplicatedBlockCount());
+    assertEquals(1, queues.getCorruptBlockSize());
+    assertInLevel(queues, block_corrupt,
+        UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
+  }
+
   private void assertAdded(UnderReplicatedBlocks queues,
                            BlockInfo block,
                            int curReplicas,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c494a84/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
index ca4fbbc..3134373 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -32,29 +32,26 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
-
-import java.io.IOException;
 import java.util.List;
 
 import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
 import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class TestRecoverStripedBlocks {
   private final short GROUP_SIZE =
-      NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
+      NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
   private MiniDFSCluster cluster;
   private final Path dirPath = new Path("/dir");
   private Path filePath = new Path(dirPath, "file");
+  private int maxReplicationStreams =
+      DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT;
 
-  @Before
-  public void setup() throws IOException {
-    final Configuration conf = new HdfsConfiguration();
+  private void initConf(Configuration conf) {
     // Large value to make sure the pending replication request can stay in
     // DatanodeDescriptor.replicateBlocks before test timeout.
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100);
@@ -62,63 +59,111 @@ public class TestRecoverStripedBlocks {
     // chooseUnderReplicatedBlocks at once.
     conf.setInt(
         DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5);
+  }
 
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 1)
-        .build();
-    cluster.waitActive();
+  @Test
+  public void testMissingStripedBlock() throws Exception {
+    doTestMissingStripedBlock(1, 0);
   }
 
-  @After
-  public void tearDown() throws Exception {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
+  @Test
+  public void testMissingStripedBlockWithBusyNode1() throws Exception {
+    doTestMissingStripedBlock(2, 1);
   }
 
   @Test
-  public void testMissingStripedBlock() throws Exception {
-    final int numBlocks = 4;
-    DFSTestUtil.createStripedFile(cluster, filePath,
-        dirPath, numBlocks, 1, true);
+  public void testMissingStripedBlockWithBusyNode2() throws Exception {
+    doTestMissingStripedBlock(3, 1);
+  }
 
-    // make sure the file is complete in NN
-    final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
-        .getINode4Write(filePath.toString()).asFile();
-    assertFalse(fileNode.isUnderConstruction());
-    assertTrue(fileNode.isStriped());
-    BlockInfo[] blocks = fileNode.getBlocks();
-    assertEquals(numBlocks, blocks.length);
-    for (BlockInfo blk : blocks) {
-      assertTrue(blk.isStriped());
-      assertTrue(blk.isComplete());
-      assertEquals(BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes());
-      final BlockInfoStriped sb = (BlockInfoStriped) blk;
-      assertEquals(GROUP_SIZE, sb.numNodes());
-    }
+  /**
+   * Start GROUP_SIZE + 1 datanodes.
+   * Inject striped blocks to first GROUP_SIZE datanodes.
+   * Then make numOfBusy datanodes busy, make numOfMissed datanodes missed.
+   * Then trigger BlockManager to compute recovery works. (so all recovery work
+   * will be scheduled to the last datanode)
+   * Finally, verify the recovery work of the last datanode.
+   */
+  private void doTestMissingStripedBlock(int numOfMissed, int numOfBusy)
+      throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 1)
+        .build();
+
+    try {
+      cluster.waitActive();
+      final int numBlocks = 4;
+      DFSTestUtil.createStripedFile(cluster, filePath,
+          dirPath, numBlocks, 1, true);
+      // all blocks will be located at first GROUP_SIZE DNs, the last DN is
+      // empty because of the util function createStripedFile
 
-    final BlockManager bm = cluster.getNamesystem().getBlockManager();
-    BlockInfo firstBlock = fileNode.getBlocks()[0];
-    DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock);
+      // make sure the file is complete in NN
+      final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
+          .getINode4Write(filePath.toString()).asFile();
+      assertFalse(fileNode.isUnderConstruction());
+      assertTrue(fileNode.isStriped());
+      BlockInfo[] blocks = fileNode.getBlocks();
+      assertEquals(numBlocks, blocks.length);
+      for (BlockInfo blk : blocks) {
+        assertTrue(blk.isStriped());
+        assertTrue(blk.isComplete());
+        assertEquals(BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS,
+            blk.getNumBytes());
+        final BlockInfoStriped sb = (BlockInfoStriped) blk;
+        assertEquals(GROUP_SIZE, sb.numNodes());
+      }
 
-    DatanodeDescriptor secondDn = storageInfos[1].getDatanodeDescriptor();
-    assertEquals(numBlocks, secondDn.numBlocks());
+      final BlockManager bm = cluster.getNamesystem().getBlockManager();
+      BlockInfo firstBlock = fileNode.getBlocks()[0];
+      DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock);
 
-    bm.getDatanodeManager().removeDatanode(secondDn);
+      // make numOfBusy nodes busy
+      int i = 0;
+      for (; i < numOfBusy; i++) {
+        DatanodeDescriptor busyNode = storageInfos[i].getDatanodeDescriptor();
+        for (int j = 0; j < maxReplicationStreams + 1; j++) {
+          BlockManagerTestUtil.addBlockToBeReplicated(busyNode, new Block(j),
+              new DatanodeStorageInfo[]{storageInfos[0]});
+        }
+      }
 
-    BlockManagerTestUtil.getComputedDatanodeWork(bm);
+      // make numOfMissed internal blocks missed
+      for (; i < numOfBusy + numOfMissed; i++) {
+        DatanodeDescriptor missedNode = storageInfos[i].getDatanodeDescriptor();
+        assertEquals(numBlocks, missedNode.numBlocks());
+        bm.getDatanodeManager().removeDatanode(missedNode);
+      }
 
-    // all the recovery work will be scheduled on the last DN
-    DataNode lastDn = cluster.getDataNodes().get(GROUP_SIZE);
-    DatanodeDescriptor last =
+      BlockManagerTestUtil.getComputedDatanodeWork(bm);
+
+      // all the recovery work will be scheduled on the last DN
+      DataNode lastDn = cluster.getDataNodes().get(GROUP_SIZE);
+      DatanodeDescriptor last =
           bm.getDatanodeManager().getDatanode(lastDn.getDatanodeId());
-    assertEquals("Counting the number of outstanding EC tasks", numBlocks,
-        last.getNumberOfBlocksToBeErasureCoded());
-    List<BlockECRecoveryInfo> recovery = last.getErasureCodeCommand(numBlocks);
-    for (BlockECRecoveryInfo info : recovery) {
-      assertEquals(1, info.getTargetDnInfos().length);
-      assertEquals(last, info.getTargetDnInfos()[0]);
-      assertEquals(GROUP_SIZE - 1, info.getSourceDnInfos().length);
-      assertEquals(GROUP_SIZE - 1, info.getLiveBlockIndices().length);
+      assertEquals("Counting the number of outstanding EC tasks", numBlocks,
+          last.getNumberOfBlocksToBeErasureCoded());
+      List<BlockECRecoveryInfo> recovery =
+          last.getErasureCodeCommand(numBlocks);
+      for (BlockECRecoveryInfo info : recovery) {
+        assertEquals(1, info.getTargetDnInfos().length);
+        assertEquals(last, info.getTargetDnInfos()[0]);
+        assertEquals(info.getSourceDnInfos().length,
+            info.getLiveBlockIndices().length);
+        if (GROUP_SIZE - numOfMissed == NUM_DATA_BLOCKS) {
+          // It's a QUEUE_HIGHEST_PRIORITY block, so the busy DNs will be chosen
+          // to make sure we have NUM_DATA_BLOCKS DNs to do recovery work.
+          assertEquals(NUM_DATA_BLOCKS, info.getSourceDnInfos().length);
+        } else {
+          // The block has no highest priority, so we don't use the busy DNs as
+          // sources
+          assertEquals(GROUP_SIZE - numOfMissed - numOfBusy,
+              info.getSourceDnInfos().length);
+        }
+      }
+    } finally {
+      cluster.shutdown();
     }
   }
 }