You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/03/04 02:01:48 UTC

[09/50] [abbrv] hadoop git commit: HDFS-9866. BlockManager#chooseExcessReplicasStriped may weaken rack fault tolerance. Contributed by Jing Zhao.

HDFS-9866. BlockManager#chooseExcessReplicasStriped may weaken rack fault tolerance. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/408f2c80
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/408f2c80
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/408f2c80

Branch: refs/heads/HDFS-1312
Commit: 408f2c807bbaaaa37ce1b69a5dfa9d76ed427d6e
Parents: 7634d40
Author: Jing Zhao <ji...@apache.org>
Authored: Sun Feb 28 14:54:49 2016 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Sun Feb 28 14:54:49 2016 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../server/blockmanagement/BlockManager.java    |   6 +-
 .../blockmanagement/BlockPlacementPolicy.java   |  48 ++++---
 .../BlockPlacementPolicyDefault.java            |  20 +--
 .../blockmanagement/ErasureCodingWork.java      |   6 +
 ...constructStripedBlocksWithRackAwareness.java | 133 ++++++++++---------
 .../blockmanagement/TestReplicationPolicy.java  |  15 ++-
 .../TestReplicationPolicyWithNodeGroup.java     |   2 +-
 .../TestReplicationPolicyWithUpgradeDomain.java |  12 +-
 9 files changed, 133 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/408f2c80/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 81b48c3..651f0a1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -446,6 +446,9 @@ Trunk (Unreleased)
     HDFS-9734. Refactoring of checksum failure report related codes.
     (Kai Zheng via zhz)
 
+    HDFS-9866. BlockManager#chooseExcessReplicasStriped may weaken rack fault
+    tolerance. (jing9)
+
     BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS
 
       HDFS-7347. Configurable erasure coding policy for individual files and

http://git-wip-us.apache.org/repos/asf/hadoop/blob/408f2c80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index f483d8e..77eea0a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -3245,7 +3245,7 @@ public class BlockManager implements BlockStatsMXBean {
       DatanodeDescriptor delNodeHint, List<StorageType> excessTypes) {
     BlockPlacementPolicy replicator = placementPolicies.getPolicy(false);
     List<DatanodeStorageInfo> replicasToDelete = replicator
-        .chooseReplicasToDelete(nonExcess, replication, excessTypes,
+        .chooseReplicasToDelete(nonExcess, nonExcess, replication, excessTypes,
             addedNode, delNodeHint);
     for (DatanodeStorageInfo choosenReplica : replicasToDelete) {
       processChosenExcessReplica(nonExcess, choosenReplica, storedBlock);
@@ -3316,8 +3316,8 @@ public class BlockManager implements BlockStatsMXBean {
       internalBlock.setBlockId(storedBlock.getBlockId() + targetIndex);
       while (candidates.size() > 1) {
         List<DatanodeStorageInfo> replicasToDelete = placementPolicy
-            .chooseReplicasToDelete(candidates, (short) 1, excessTypes, null,
-                null);
+            .chooseReplicasToDelete(nonExcess, candidates, (short) 1,
+                excessTypes, null, null);
         for (DatanodeStorageInfo chosen : replicasToDelete) {
           processChosenExcessReplica(nonExcess, chosen, storedBlock);
           candidates.remove(chosen);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/408f2c80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
index 8478387..92b03d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
@@ -77,8 +77,6 @@ public abstract class BlockPlacementPolicy {
                                              BlockStoragePolicy storagePolicy);
   
   /**
-   * Same as {@link #chooseTarget(String, int, Node, Set, long, List, StorageType)}
-   * with added parameter {@code favoredDatanodes}
    * @param favoredNodes datanodes that should be favored as targets. This
    *          is only a hint and due to cluster state, namenode may not be 
    *          able to place the blocks on these datanodes.
@@ -106,17 +104,21 @@ public abstract class BlockPlacementPolicy {
    * @param numOfReplicas replica number of file to be verified
    * @return the result of verification
    */
-  abstract public BlockPlacementStatus verifyBlockPlacement(
+  public abstract BlockPlacementStatus verifyBlockPlacement(
       DatanodeInfo[] locs, int numOfReplicas);
 
   /**
    * Select the excess replica storages for deletion based on either
    * delNodehint/Excess storage types.
    *
-   * @param candidates
+   * @param availableReplicas
    *          available replicas
+   * @param delCandidates
+   *          Candidates for deletion. For normal replication, this set is the
+   *          same with availableReplicas. For striped blocks, this set is a
+   *          subset of availableReplicas.
    * @param expectedNumOfReplicas
-   *          The required number of replicas for this block
+   *          The expected number of replicas remaining in the delCandidates
    * @param excessTypes
    *          type of the storagepolicy
    * @param addedNode
@@ -125,10 +127,12 @@ public abstract class BlockPlacementPolicy {
    *          Hint for excess storage selection
    * @return Returns the list of excess replicas chosen for deletion
    */
-  abstract public List<DatanodeStorageInfo> chooseReplicasToDelete(
-      Collection<DatanodeStorageInfo> candidates, int expectedNumOfReplicas,
+  public abstract List<DatanodeStorageInfo> chooseReplicasToDelete(
+      Collection<DatanodeStorageInfo> availableReplicas,
+      Collection<DatanodeStorageInfo> delCandidates, int expectedNumOfReplicas,
       List<StorageType> excessTypes, DatanodeDescriptor addedNode,
       DatanodeDescriptor delNodeHint);
+
   /**
    * Used to setup a BlockPlacementPolicy object. This should be defined by 
    * all implementations of a BlockPlacementPolicy.
@@ -137,7 +141,7 @@ public abstract class BlockPlacementPolicy {
    * @param stats retrieve cluster status from here
    * @param clusterMap cluster topology
    */
-  abstract protected void initialize(Configuration conf,  FSClusterStats stats, 
+  protected abstract void initialize(Configuration conf,  FSClusterStats stats,
                                      NetworkTopology clusterMap, 
                                      Host2NodesMap host2datanodeMap);
 
@@ -149,7 +153,7 @@ public abstract class BlockPlacementPolicy {
    * @param source source replica of the move
    * @param target target replica of the move
    */
-  abstract public boolean isMovable(Collection<DatanodeInfo> candidates,
+  public abstract boolean isMovable(Collection<DatanodeInfo> candidates,
       DatanodeInfo source, DatanodeInfo target);
 
   /**
@@ -191,10 +195,8 @@ public abstract class BlockPlacementPolicy {
         "class " + datanode.getClass().getName() + " not allowed");
     if (datanode instanceof DatanodeInfo) {
       return ((DatanodeInfo)datanode);
-    } else if (datanode instanceof DatanodeStorageInfo) {
-      return ((DatanodeStorageInfo)datanode).getDatanodeDescriptor();
     } else {
-      return null;
+      return ((DatanodeStorageInfo)datanode).getDatanodeDescriptor();
     }
   }
 
@@ -209,35 +211,37 @@ public abstract class BlockPlacementPolicy {
   /**
    * Split data nodes into two sets, one set includes nodes on rack with
    * more than one  replica, the other set contains the remaining nodes.
-   * 
-   * @param storagesOrDataNodes DatanodeStorageInfo/DatanodeInfo to be split
+   *
+   * @param availableSet all the available DataNodes/storages of the block
+   * @param candidates DatanodeStorageInfo/DatanodeInfo to be split
    *        into two sets
    * @param rackMap a map from rack to datanodes
    * @param moreThanOne contains nodes on rack with more than one replica
    * @param exactlyOne remains contains the remaining nodes
    */
   public <T> void splitNodesWithRack(
-      final Iterable<T> storagesOrDataNodes,
+      final Iterable<T> availableSet,
+      final Collection<T> candidates,
       final Map<String, List<T>> rackMap,
       final List<T> moreThanOne,
       final List<T> exactlyOne) {
-    for(T s: storagesOrDataNodes) {
+    for(T s: availableSet) {
       final String rackName = getRack(getDatanodeInfo(s));
       List<T> storageList = rackMap.get(rackName);
       if (storageList == null) {
-        storageList = new ArrayList<T>();
+        storageList = new ArrayList<>();
         rackMap.put(rackName, storageList);
       }
       storageList.add(s);
     }
-    // split nodes into two sets
-    for(List<T> storageList : rackMap.values()) {
-      if (storageList.size() == 1) {
+    for (T candidate : candidates) {
+      final String rackName = getRack(getDatanodeInfo(candidate));
+      if (rackMap.get(rackName).size() == 1) {
         // exactlyOne contains nodes on rack with only one replica
-        exactlyOne.add(storageList.get(0));
+        exactlyOne.add(candidate);
       } else {
         // moreThanOne contains nodes on rack with more than one replica
-        moreThanOne.addAll(storageList);
+        moreThanOne.add(candidate);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/408f2c80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index e1a47ae..f20f5fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -972,7 +972,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
 
   @Override
   public List<DatanodeStorageInfo> chooseReplicasToDelete(
-      Collection<DatanodeStorageInfo> candidates,
+      Collection<DatanodeStorageInfo> availableReplicas,
+      Collection<DatanodeStorageInfo> delCandidates,
       int expectedNumOfReplicas,
       List<StorageType> excessTypes,
       DatanodeDescriptor addedNode,
@@ -985,28 +986,29 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
     final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
 
-    // split nodes into two sets
+    // split candidate nodes for deletion into two sets
     // moreThanOne contains nodes on rack with more than one replica
     // exactlyOne contains the remaining nodes
-    splitNodesWithRack(candidates, rackMap, moreThanOne, exactlyOne);
+    splitNodesWithRack(availableReplicas, delCandidates, rackMap, moreThanOne,
+        exactlyOne);
 
     // pick one node to delete that favors the delete hint
     // otherwise pick one with least space from priSet if it is not empty
     // otherwise one node with least space from remains
     boolean firstOne = true;
     final DatanodeStorageInfo delNodeHintStorage =
-        DatanodeStorageInfo.getDatanodeStorageInfo(candidates, delNodeHint);
+        DatanodeStorageInfo.getDatanodeStorageInfo(delCandidates, delNodeHint);
     final DatanodeStorageInfo addedNodeStorage =
-        DatanodeStorageInfo.getDatanodeStorageInfo(candidates, addedNode);
+        DatanodeStorageInfo.getDatanodeStorageInfo(delCandidates, addedNode);
 
-    while (candidates.size() - expectedNumOfReplicas > excessReplicas.size()) {
+    while (delCandidates.size() - expectedNumOfReplicas > excessReplicas.size()) {
       final DatanodeStorageInfo cur;
       if (firstOne && useDelHint(delNodeHintStorage, addedNodeStorage,
           moreThanOne, exactlyOne, excessTypes)) {
         cur = delNodeHintStorage;
       } else { // regular excessive replica removal
-        cur = chooseReplicaToDelete(moreThanOne, exactlyOne, excessTypes,
-            rackMap);
+        cur = chooseReplicaToDelete(moreThanOne, exactlyOne,
+            excessTypes, rackMap);
       }
       firstOne = false;
       if (cur == null) {
@@ -1056,7 +1058,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     final Map<String, List<DatanodeInfo>> rackMap = new HashMap<>();
     final List<DatanodeInfo> moreThanOne = new ArrayList<>();
     final List<DatanodeInfo> exactlyOne = new ArrayList<>();
-    splitNodesWithRack(locs, rackMap, moreThanOne, exactlyOne);
+    splitNodesWithRack(locs, locs, rackMap, moreThanOne, exactlyOne);
     return notReduceNumOfGroups(moreThanOne, source, target);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/408f2c80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
index 38ad324..7877c56 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.net.Node;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.HashMap;
 import java.util.List;
@@ -137,6 +138,11 @@ class ErasureCodingWork extends BlockReconstructionWork {
           stripedBlk.getBlockId() + blockIndex, internBlkLen,
           stripedBlk.getGenerationStamp());
       source.addBlockToBeReplicated(targetBlk, getTargets());
+      if (BlockManager.LOG.isDebugEnabled()) {
+        BlockManager.LOG.debug("Add replication task from source {} to " +
+            "targets {} for EC block {}", source, Arrays.toString(getTargets()),
+            targetBlk);
+      }
     } else {
       getTargets()[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
           new ExtendedBlock(blockPoolId, stripedBlk),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/408f2c80/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
index 2164957..d269a9d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
@@ -23,8 +23,9 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -39,9 +40,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
+import java.io.IOException;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
@@ -55,14 +55,13 @@ public class TestReconstructStripedBlocksWithRackAwareness {
   static {
     GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.ALL);
     GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL);
+    GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL);
   }
 
   private static final String[] hosts = new String[]{"host1", "host2", "host3",
       "host4", "host5", "host6", "host7", "host8", "host9", "host10"};
   private static final String[] racks = new String[]{"/r1", "/r1", "/r2", "/r2",
       "/r3", "/r3", "/r4", "/r4", "/r5", "/r6"};
-  private static final List<String> singleNodeRacks = Arrays.asList("host9", "host10");
-  private static final short blockNum = (short) (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS);
 
   private MiniDFSCluster cluster;
   private DistributedFileSystem fs;
@@ -94,6 +93,20 @@ public class TestReconstructStripedBlocksWithRackAwareness {
     }
   }
 
+  private MiniDFSCluster.DataNodeProperties stopDataNode(String hostname)
+      throws IOException {
+    MiniDFSCluster.DataNodeProperties dnProp = null;
+    for (int i = 0; i < cluster.getDataNodes().size(); i++) {
+      DataNode dn = cluster.getDataNodes().get(i);
+      if (dn.getDatanodeId().getHostName().equals(hostname)) {
+        dnProp = cluster.stopDataNode(i);
+        cluster.setDataNodeDead(dn.getDatanodeId());
+        LOG.info("stop datanode " + dn.getDatanodeId().getHostName());
+      }
+    }
+    return dnProp;
+  }
+
   /**
    * When there are all the internal blocks available but they are not placed on
    * enough racks, NameNode should avoid normal decoding reconstruction but copy
@@ -102,24 +115,13 @@ public class TestReconstructStripedBlocksWithRackAwareness {
    * In this test, we first need to create a scenario that a striped block has
    * all the internal blocks but distributed in <6 racks. Then we check if the
    * replication monitor can correctly schedule the reconstruction work for it.
-   *
-   * For the 9 internal blocks + 5 racks setup, the test does the following:
-   * 1. create a 6 rack cluster with 10 datanodes, where there are 2 racks only
-   * containing 1 datanodes each
-   * 2. for a striped block with 9 internal blocks, there must be one internal
-   * block locating in a single-node rack. find this node and stop it
-   * 3. namenode will trigger reconstruction for the block and since the cluster
-   * has only 5 racks remaining, after the reconstruction we have 9 internal
-   * blocks distributed in 5 racks.
-   * 4. we bring the datanode back, now the cluster has 6 racks again
-   * 5. let the datanode call reportBadBlock, this will make the namenode to
-   * check if the striped block is placed in >= 6 racks, and the namenode will
-   * put the block into the under-replicated queue
-   * 6. now we can check if the replication monitor works as expected
    */
   @Test
   public void testReconstructForNotEnoughRacks() throws Exception {
+    MiniDFSCluster.DataNodeProperties host10 = stopDataNode("host10");
+
     final Path file = new Path("/foo");
+    // the file's block is in 9 dn but 5 racks
     DFSTestUtil.createFile(fs, file,
         BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS * 2, (short) 1, 0L);
     Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
@@ -128,39 +130,6 @@ public class TestReconstructStripedBlocksWithRackAwareness {
         .getINode4Write(file.toString()).asFile();
     BlockInfoStriped blockInfo = (BlockInfoStriped) fileNode.getLastBlock();
 
-    // find the internal block located in the single node rack
-    Block internalBlock = null;
-    String hostToStop = null;
-    for (DatanodeStorageInfo storage : blockInfo.storages) {
-      if (singleNodeRacks.contains(storage.getDatanodeDescriptor().getHostName())) {
-        hostToStop = storage.getDatanodeDescriptor().getHostName();
-        internalBlock = blockInfo.getBlockOnStorage(storage);
-      }
-    }
-    Assert.assertNotNull(internalBlock);
-    Assert.assertNotNull(hostToStop);
-
-    // delete the block on the chosen datanode
-    cluster.corruptBlockOnDataNodesByDeletingBlockFile(
-        new ExtendedBlock(bm.getBlockPoolId(), internalBlock));
-
-    // stop the chosen datanode
-    MiniDFSCluster.DataNodeProperties dnProp = null;
-    for (int i = 0; i < cluster.getDataNodes().size(); i++) {
-      DataNode dn = cluster.getDataNodes().get(i);
-      if (dn.getDatanodeId().getHostName().equals(hostToStop)) {
-        dnProp = cluster.stopDataNode(i);
-        cluster.setDataNodeDead(dn.getDatanodeId());
-        LOG.info("stop datanode " + dn.getDatanodeId().getHostName());
-      }
-    }
-    NetworkTopology topology = bm.getDatanodeManager().getNetworkTopology();
-    Assert.assertEquals(5, topology.getNumOfRacks());
-
-    // make sure the reconstruction work can finish
-    // now we have 9 internal blocks in 5 racks
-    DFSTestUtil.waitForReplication(fs, file, blockNum, 15 * 1000);
-
     // we now should have 9 internal blocks distributed in 5 racks
     Set<String> rackSet = new HashSet<>();
     for (DatanodeStorageInfo storage : blockInfo.storages) {
@@ -169,27 +138,25 @@ public class TestReconstructStripedBlocksWithRackAwareness {
     Assert.assertEquals(5, rackSet.size());
 
     // restart the stopped datanode
-    cluster.restartDataNode(dnProp);
+    cluster.restartDataNode(host10);
     cluster.waitActive();
 
     // make sure we have 6 racks again
-    topology = bm.getDatanodeManager().getNetworkTopology();
+    NetworkTopology topology = bm.getDatanodeManager().getNetworkTopology();
     Assert.assertEquals(hosts.length, topology.getNumOfLeaves());
     Assert.assertEquals(6, topology.getNumOfRacks());
 
     // pause all the heartbeats
-    DataNode badDn = null;
     for (DataNode dn : cluster.getDataNodes()) {
       DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
-      if (dn.getDatanodeId().getHostName().equals(hostToStop)) {
-        badDn = dn;
-      }
     }
-    assert badDn != null;
-    // let the DN report the bad block, so that the namenode will put the block
-    // into under-replicated queue. note that the block still has 9 internal
-    // blocks but in 5 racks
-    badDn.reportBadBlocks(new ExtendedBlock(bm.getBlockPoolId(), internalBlock));
+
+    fsn.writeLock();
+    try {
+      bm.processMisReplicatedBlocks();
+    } finally {
+      fsn.writeUnlock();
+    }
 
     // check if replication monitor correctly schedule the replication work
     boolean scheduled = false;
@@ -210,4 +177,42 @@ public class TestReconstructStripedBlocksWithRackAwareness {
     }
     Assert.assertTrue(scheduled);
   }
+
+  @Test
+  public void testChooseExcessReplicasToDelete() throws Exception {
+    MiniDFSCluster.DataNodeProperties host10 = stopDataNode("host10");
+
+    final Path file = new Path("/foo");
+    DFSTestUtil.createFile(fs, file,
+        BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS * 2, (short) 1, 0L);
+
+    // stop host1
+    MiniDFSCluster.DataNodeProperties host1 = stopDataNode("host1");
+    // bring host10 back
+    cluster.restartDataNode(host10);
+    cluster.waitActive();
+
+    // wait for reconstruction to finish
+    final short blockNum = (short) (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS);
+    DFSTestUtil.waitForReplication(fs, file, blockNum, 15 * 1000);
+
+    // restart host1
+    cluster.restartDataNode(host1);
+    cluster.waitActive();
+    for (DataNode dn : cluster.getDataNodes()) {
+      if (dn.getDatanodeId().getHostName().equals("host1")) {
+        DataNodeTestUtils.triggerBlockReport(dn);
+        break;
+      }
+    }
+
+    // make sure the excess replica is detected, and we delete host1's replica
+    // so that we have 6 racks
+    DFSTestUtil.waitForReplication(fs, file, blockNum, 15 * 1000);
+    LocatedBlocks blks = fs.getClient().getLocatedBlocks(file.toString(), 0);
+    LocatedStripedBlock block = (LocatedStripedBlock) blks.getLastLocatedBlock();
+    for (DatanodeInfo dn : block.getLocations()) {
+      Assert.assertFalse(dn.getHostName().equals("host1"));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/408f2c80/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index 3259612..be63d87 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -965,7 +965,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
     
     List<DatanodeStorageInfo> first = new ArrayList<>();
     List<DatanodeStorageInfo> second = new ArrayList<>();
-    replicator.splitNodesWithRack(replicaList, rackMap, first, second);
+    replicator.splitNodesWithRack(replicaList, replicaList, rackMap, first,
+        second);
     // storages[0] and storages[1] are in first set as their rack has two 
     // replica nodes, while storages[2] and dataNodes[5] are in second set.
     assertEquals(2, first.size());
@@ -1018,7 +1019,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
     DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor();
     List<StorageType> excessTypes = storagePolicy.chooseExcess((short) 3,
         DatanodeStorageInfo.toStorageTypes(nonExcess));
-    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
         excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
     assertTrue(excessReplicas.size() == 1);
     assertTrue(excessReplicas.contains(storages[0]));
@@ -1031,7 +1032,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
     nonExcess.add(excessStorage);
     excessTypes = storagePolicy.chooseExcess((short) 3,
         DatanodeStorageInfo.toStorageTypes(nonExcess));
-    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
         excessTypes, storages[3].getDatanodeDescriptor(), null);
     assertTrue(excessReplicas.contains(excessStorage));
 
@@ -1051,7 +1052,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
     nonExcess.add(storages[5]);
     excessTypes = storagePolicy.chooseExcess((short) 3,
         DatanodeStorageInfo.toStorageTypes(nonExcess));
-    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
         excessTypes, storages[3].getDatanodeDescriptor(),
         storages[5].getDatanodeDescriptor());
     assertEquals(1, excessReplicas.size());
@@ -1070,7 +1071,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
     nonExcess.add(storages[3]);
     excessTypes = storagePolicy.chooseExcess((short) 3,
         DatanodeStorageInfo.toStorageTypes(nonExcess));
-    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
         excessTypes, storages[1].getDatanodeDescriptor(),
         storages[3].getDatanodeDescriptor());
     assertEquals(1, excessReplicas.size());
@@ -1084,7 +1085,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
     nonExcess.add(storages[2]);
     excessTypes = storagePolicy.chooseExcess((short) 1,
         DatanodeStorageInfo.toStorageTypes(nonExcess));
-    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 1,
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 1,
         excessTypes, storages[2].getDatanodeDescriptor(), null);
     assertEquals(1, excessReplicas.size());
     assertTrue(excessReplicas.contains(excessSSD));
@@ -1104,7 +1105,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
     nonExcess.add(storages[5]);
     excessTypes = storagePolicy.chooseExcess((short) 2,
         DatanodeStorageInfo.toStorageTypes(nonExcess));
-    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 2,
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 2,
         excessTypes, null, null);
     assertEquals(0, excessReplicas.size());
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/408f2c80/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
index edcab10..ce210e6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
@@ -637,7 +637,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
 
     List<DatanodeStorageInfo> first = new ArrayList<>();
     List<DatanodeStorageInfo> second = new ArrayList<>();
-    replicator.splitNodesWithRack(
+    replicator.splitNodesWithRack(replicaList,
         replicaList, rackMap, first, second);
     assertEquals(3, first.size());
     assertEquals(1, second.size());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/408f2c80/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java
index c939220..69bc228 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java
@@ -330,7 +330,7 @@ public class TestReplicationPolicyWithUpgradeDomain
     DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor();
     List<StorageType> excessTypes = storagePolicy.chooseExcess((short) 3,
         DatanodeStorageInfo.toStorageTypes(nonExcess));
-    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
         excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
     assertTrue(excessReplicas.size() == 1);
     assertTrue(excessReplicas.contains(storages[0]));
@@ -340,7 +340,7 @@ public class TestReplicationPolicyWithUpgradeDomain
     delHintNode = storages[1].getDatanodeDescriptor();
     excessTypes = storagePolicy.chooseExcess((short) 3,
         DatanodeStorageInfo.toStorageTypes(nonExcess));
-    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
         excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
     assertTrue(excessReplicas.size() == 1);
     assertTrue(excessReplicas.contains(storages[0]));
@@ -353,7 +353,7 @@ public class TestReplicationPolicyWithUpgradeDomain
     nonExcess.add(storages[8]);
     excessTypes = storagePolicy.chooseExcess((short) 3,
         DatanodeStorageInfo.toStorageTypes(nonExcess));
-    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
         excessTypes, storages[8].getDatanodeDescriptor(), null);
     assertTrue(excessReplicas.size() == 1);
     assertTrue(excessReplicas.contains(storages[1]));
@@ -366,7 +366,7 @@ public class TestReplicationPolicyWithUpgradeDomain
     nonExcess.add(storages[5]);
     excessTypes = storagePolicy.chooseExcess((short) 3,
         DatanodeStorageInfo.toStorageTypes(nonExcess));
-    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
         excessTypes, storages[8].getDatanodeDescriptor(), null);
     assertTrue(excessReplicas.size() == 1);
     assertTrue(excessReplicas.contains(storages[1]) ||
@@ -384,7 +384,7 @@ public class TestReplicationPolicyWithUpgradeDomain
     nonExcess.add(excessStorage);
     excessTypes = storagePolicy.chooseExcess((short) 3,
         DatanodeStorageInfo.toStorageTypes(nonExcess));
-    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
         excessTypes, storages[3].getDatanodeDescriptor(), null);
     assertTrue(excessReplicas.size() == 2);
     assertTrue(excessReplicas.contains(storages[0]));
@@ -416,7 +416,7 @@ public class TestReplicationPolicyWithUpgradeDomain
     nonExcess.add(storages[8]);
     excessTypes = storagePolicy.chooseExcess((short) 3,
         DatanodeStorageInfo.toStorageTypes(nonExcess));
-    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
         excessTypes, storages[3].getDatanodeDescriptor(),
         storages[7].getDatanodeDescriptor());
     assertEquals(1, excessReplicas.size());