You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/02/20 04:02:32 UTC

hadoop git commit: HDFS-9818. Correctly handle EC reconstruction work caused by not enough racks. Contributed by Jing Zhao.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 6eae4337d -> e54cc2931


HDFS-9818. Correctly handle EC reconstruction work caused by not enough racks. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e54cc293
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e54cc293
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e54cc293

Branch: refs/heads/trunk
Commit: e54cc2931262bf49682a8323da9811976218c03b
Parents: 6eae433
Author: Jing Zhao <ji...@apache.org>
Authored: Fri Feb 19 19:02:23 2016 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Fri Feb 19 19:02:23 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hdfs/DFSStripedInputStream.java      |   7 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../server/blockmanagement/BlockManager.java    |  40 ++--
 .../BlockPlacementPolicyDefault.java            |   5 +-
 .../BlockPlacementPolicyRackFaultTolerant.java  |  10 +-
 .../BlockPlacementStatusDefault.java            |  10 +-
 .../BlockReconstructionWork.java                |   5 +
 .../blockmanagement/ErasureCodingWork.java      | 106 ++++++++-
 .../server/blockmanagement/ReplicationWork.java |   5 +
 .../erasurecode/ErasureCodingWorker.java        |  25 ++-
 .../TestBlocksWithNotEnoughRacks.java           |  12 +-
 ...constructStripedBlocksWithRackAwareness.java | 213 +++++++++++++++++++
 12 files changed, 388 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index d15e536..3483255 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -260,7 +260,12 @@ public class DFSStripedInputStream extends DFSInputStream {
 
   private void closeReader(BlockReaderInfo readerInfo) {
     if (readerInfo != null) {
-//      IOUtils.cleanup(null, readerInfo.reader);
+      if (readerInfo.reader != null) {
+        try {
+          readerInfo.reader.close();
+        } catch (Throwable ignored) {
+        }
+      }
       readerInfo.skip();
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index a377243..1d0379c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -432,6 +432,9 @@ Trunk (Unreleased)
     HDFS-9794. Streamer threads may leak if failure happens when closing the
     striped outputstream. (jing9)
 
+    HDFS-9818. Correctly handle EC reconstruction work caused by not enough
+    racks. (jing9)
+
     BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS
 
       HDFS-7347. Configurable erasure coding policy for individual files and

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 67f5026..cc52b6e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1628,7 +1628,7 @@ public class BlockManager implements BlockStatsMXBean {
       for (int i = 0 ; i < liveBlockIndices.size(); i++) {
         indices[i] = liveBlockIndices.get(i);
       }
-      return new ErasureCodingWork(block, bc, srcNodes,
+      return new ErasureCodingWork(getBlockPoolId(), block, bc, srcNodes,
           containingNodes, liveReplicaNodes, additionalReplRequired,
           priority, indices);
     } else {
@@ -1638,6 +1638,16 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
+  private boolean isInNewRack(DatanodeDescriptor[] srcs,
+      DatanodeDescriptor target) {
+    for (DatanodeDescriptor src : srcs) {
+      if (src.getNetworkLocation().equals(target.getNetworkLocation())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   private boolean validateReconstructionWork(BlockReconstructionWork rw) {
     BlockInfo block = rw.getBlock();
     int priority = rw.getPriority();
@@ -1665,31 +1675,14 @@ public class BlockManager implements BlockStatsMXBean {
     DatanodeStorageInfo[] targets = rw.getTargets();
     if ( (numReplicas.liveReplicas() >= requiredReplication) &&
         (!isPlacementPolicySatisfied(block)) ) {
-      if (rw.getSrcNodes()[0].getNetworkLocation().equals(
-          targets[0].getDatanodeDescriptor().getNetworkLocation())) {
-        //No use continuing, unless a new rack in this case
+      if (!isInNewRack(rw.getSrcNodes(), targets[0].getDatanodeDescriptor())) {
+        // No use continuing, unless a new rack in this case
         return false;
       }
     }
 
-    // Add block to the to be reconstructed list
-    if (block.isStriped()) {
-      assert rw instanceof ErasureCodingWork;
-      assert rw.getTargets().length > 0;
-      assert pendingNum == 0 : "Should wait the previous reconstruction"
-          + " to finish";
-      final ErasureCodingPolicy ecPolicy =
-          ((BlockInfoStriped) block).getErasureCodingPolicy();
-      assert ecPolicy != null;
-
-      rw.getTargets()[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
-          new ExtendedBlock(getBlockPoolId(), block),
-          rw.getSrcNodes(), rw.getTargets(),
-          ((ErasureCodingWork) rw).getLiveBlockIndicies(), ecPolicy);
-    } else {
-      rw.getSrcNodes()[0].addBlockToBeReplicated(block, targets);
-    }
-
+    // Add block to the datanode's task list
+    rw.addTaskToDatanode();
     DatanodeStorageInfo.incrementBlocksScheduled(targets);
 
     // Move the block-replication into a "pending" state.
@@ -3973,7 +3966,8 @@ public class BlockManager implements BlockStatsMXBean {
         .getPolicy(storedBlock.isStriped());
     int numReplicas = storedBlock.isStriped() ? ((BlockInfoStriped) storedBlock)
         .getRealDataBlockNum() : storedBlock.getReplication();
-    return placementPolicy.verifyBlockPlacement(locs, numReplicas).isPlacementPolicySatisfied();
+    return placementPolicy.verifyBlockPlacement(locs, numReplicas)
+        .isPlacementPolicySatisfied();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index ee891a5..e1a47ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -901,7 +901,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
       locs = DatanodeDescriptor.EMPTY_ARRAY;
     if (!clusterMap.hasClusterEverBeenMultiRack()) {
       // only one rack
-      return new BlockPlacementStatusDefault(1, 1);
+      return new BlockPlacementStatusDefault(1, 1, 1);
     }
     int minRacks = 2;
     minRacks = Math.min(minRacks, numberOfReplicas);
@@ -910,7 +910,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     Set<String> racks = new TreeSet<>();
     for (DatanodeInfo dn : locs)
       racks.add(dn.getNetworkLocation());
-    return new BlockPlacementStatusDefault(racks.size(), minRacks);
+    return new BlockPlacementStatusDefault(racks.size(), minRacks,
+        clusterMap.getNumOfRacks());
   }
   /**
    * Decide whether deleting the specified replica of the block still makes

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java
index c803b97..c0d981c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java
@@ -160,14 +160,16 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD
       locs = DatanodeDescriptor.EMPTY_ARRAY;
     if (!clusterMap.hasClusterEverBeenMultiRack()) {
       // only one rack
-      return new BlockPlacementStatusDefault(1, 1);
+      return new BlockPlacementStatusDefault(1, 1, 1);
     }
     // 1. Check that all locations are different.
     // 2. Count locations on different racks.
-    Set<String> racks = new TreeSet<String>();
-    for (DatanodeInfo dn : locs)
+    Set<String> racks = new TreeSet<>();
+    for (DatanodeInfo dn : locs) {
       racks.add(dn.getNetworkLocation());
-    return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas);
+    }
+    return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas,
+        clusterMap.getNumOfRacks());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java
index 0b8b965..75bb65d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java
@@ -21,15 +21,18 @@ public class BlockPlacementStatusDefault implements BlockPlacementStatus {
 
   private int requiredRacks = 0;
   private int currentRacks = 0;
+  private final int totalRacks;
   
-  public BlockPlacementStatusDefault(int currentRacks, int requiredRacks){
+  public BlockPlacementStatusDefault(int currentRacks, int requiredRacks,
+      int totalRacks){
     this.requiredRacks = requiredRacks;
     this.currentRacks = currentRacks;
+    this.totalRacks = totalRacks;
   }
   
   @Override
   public boolean isPlacementPolicySatisfied() {
-    return requiredRacks <= currentRacks;
+    return requiredRacks <= currentRacks || currentRacks >= totalRacks;
   }
 
   @Override
@@ -38,7 +41,8 @@ public class BlockPlacementStatusDefault implements BlockPlacementStatus {
       return null;
     }
     return "Block should be additionally replicated on " + 
-        (requiredRacks - currentRacks) + " more rack(s).";
+        (requiredRacks - currentRacks) +
+        " more rack(s). Total number of racks in the cluster: " + totalRacks;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java
index df9c164..c1998ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java
@@ -108,4 +108,9 @@ abstract class BlockReconstructionWork {
   abstract void chooseTargets(BlockPlacementPolicy blockplacement,
       BlockStoragePolicySuite storagePolicySuite,
       Set<Node> excludedNodes);
+
+  /**
+   * add reconstruction task into a source datanode
+   */
+  abstract void addTaskToDatanode();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
index 85a25d5..38ad324 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
@@ -17,15 +17,23 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.net.Node;
 
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 class ErasureCodingWork extends BlockReconstructionWork {
   private final byte[] liveBlockIndicies;
+  private final String blockPoolId;
 
-  public ErasureCodingWork(BlockInfo block,
+  public ErasureCodingWork(String blockPoolId, BlockInfo block,
       BlockCollection bc,
       DatanodeDescriptor[] srcNodes,
       List<DatanodeDescriptor> containingNodes,
@@ -34,6 +42,7 @@ class ErasureCodingWork extends BlockReconstructionWork {
       int priority, byte[] liveBlockIndicies) {
     super(block, bc, srcNodes, containingNodes,
         liveReplicaStorages, additionalReplRequired, priority);
+    this.blockPoolId = blockPoolId;
     this.liveBlockIndicies = liveBlockIndicies;
     BlockManager.LOG.debug("Creating an ErasureCodingWork to {} reconstruct ",
         block);
@@ -47,15 +56,92 @@ class ErasureCodingWork extends BlockReconstructionWork {
   void chooseTargets(BlockPlacementPolicy blockplacement,
       BlockStoragePolicySuite storagePolicySuite,
       Set<Node> excludedNodes) {
-    try {
-      // TODO: new placement policy for EC considering multiple writers
-      DatanodeStorageInfo[] chosenTargets = blockplacement.chooseTarget(
-          getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0],
-          getLiveReplicaStorages(), false, excludedNodes,
-          getBlock().getNumBytes(),
-          storagePolicySuite.getPolicy(getBc().getStoragePolicyID()));
-      setTargets(chosenTargets);
-    } finally {
+    // TODO: new placement policy for EC considering multiple writers
+    DatanodeStorageInfo[] chosenTargets = blockplacement.chooseTarget(
+        getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0],
+        getLiveReplicaStorages(), false, excludedNodes,
+        getBlock().getNumBytes(),
+        storagePolicySuite.getPolicy(getBc().getStoragePolicyID()));
+    setTargets(chosenTargets);
+  }
+
+  /**
+   * @return true if the current source nodes cover all the internal blocks.
+   * I.e., we only need to have more racks.
+   */
+  private boolean hasAllInternalBlocks() {
+    final BlockInfoStriped block = (BlockInfoStriped) getBlock();
+    if (getSrcNodes().length < block.getRealTotalBlockNum()) {
+      return false;
+    }
+    BitSet bitSet = new BitSet(block.getTotalBlockNum());
+    for (byte index : liveBlockIndicies) {
+      bitSet.set(index);
+    }
+    for (int i = 0; i < block.getRealDataBlockNum(); i++) {
+      if (!bitSet.get(i)) {
+        return false;
+      }
+    }
+    for (int i = block.getDataBlockNum(); i < block.getTotalBlockNum(); i++) {
+      if (!bitSet.get(i)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * We have all the internal blocks but not enough racks. Thus we do not need
+   * to do decoding but only simply make an extra copy of an internal block. In
+   * this scenario, use this method to choose the source datanode for simple
+   * replication.
+   * @return The index of the source datanode.
+   */
+  private int chooseSource4SimpleReplication() {
+    Map<String, List<Integer>> map = new HashMap<>();
+    for (int i = 0; i < getSrcNodes().length; i++) {
+      final String rack = getSrcNodes()[i].getNetworkLocation();
+      List<Integer> dnList = map.get(rack);
+      if (dnList == null) {
+        dnList = new ArrayList<>();
+        map.put(rack, dnList);
+      }
+      dnList.add(i);
+    }
+    List<Integer> max = null;
+    for (Map.Entry<String, List<Integer>> entry : map.entrySet()) {
+      if (max == null || entry.getValue().size() > max.size()) {
+        max = entry.getValue();
+      }
+    }
+    assert max != null;
+    return max.get(0);
+  }
+
+  @Override
+  void addTaskToDatanode() {
+    assert getTargets().length > 0;
+    BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();
+
+    // if we already have all the internal blocks, but not enough racks,
+    // we only need to replicate one internal block to a new rack
+    if (hasAllInternalBlocks()) {
+      int sourceIndex = chooseSource4SimpleReplication();
+      final byte blockIndex = liveBlockIndicies[sourceIndex];
+      final DatanodeDescriptor source = getSrcNodes()[sourceIndex];
+      final long internBlkLen = StripedBlockUtil.getInternalBlockLength(
+          stripedBlk.getNumBytes(), stripedBlk.getCellSize(),
+          stripedBlk.getDataBlockNum(), blockIndex);
+      final Block targetBlk = new Block(
+          stripedBlk.getBlockId() + blockIndex, internBlkLen,
+          stripedBlk.getGenerationStamp());
+      source.addBlockToBeReplicated(targetBlk, getTargets());
+    } else {
+      getTargets()[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
+          new ExtendedBlock(blockPoolId, stripedBlk),
+          getSrcNodes(), getTargets(), getLiveBlockIndicies(),
+          stripedBlk.getErasureCodingPolicy());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
index b44b9b1..24601a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
@@ -53,4 +53,9 @@ class ReplicationWork extends BlockReconstructionWork {
       getSrcNodes()[0].decrementPendingReplicationWithoutTargets();
     }
   }
+
+  @Override
+  void addTaskToDatanode() {
+    getSrcNodes()[0].addBlockToBeReplicated(getBlock(), getTargets());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index b08aa2e..1017e1e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -177,8 +177,14 @@ public final class ErasureCodingWorker {
       Collection<BlockECReconstructionInfo> ecTasks) {
     for (BlockECReconstructionInfo reconstructionInfo : ecTasks) {
       try {
-        EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL
-            .submit(new ReconstructAndTransferBlock(reconstructionInfo));
+        ReconstructAndTransferBlock task =
+            new ReconstructAndTransferBlock(reconstructionInfo);
+        if (task.hasValidTargets()) {
+          EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL.submit(task);
+        } else {
+          LOG.warn("No missing internal block. Skip reconstruction for task:{}",
+              reconstructionInfo);
+        }
       } catch (Throwable e) {
         LOG.warn("Failed to reconstruct striped block {}",
             reconstructionInfo.getExtendedBlock().getLocalBlock(), e);
@@ -292,6 +298,7 @@ public final class ErasureCodingWorker {
     private final CompletionService<Void> readService =
         new ExecutorCompletionService<>(
             EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL);
+    private final boolean hasValidTargets;
 
     ReconstructAndTransferBlock(BlockECReconstructionInfo reconstructionInfo) {
       ErasureCodingPolicy ecPolicy = reconstructionInfo
@@ -339,10 +346,14 @@ public final class ErasureCodingWorker {
         seqNo4Targets[i] = 0;
       }
 
-      getTargetIndices();
+      hasValidTargets = getTargetIndices();
       cachingStrategy = CachingStrategy.newDefaultStrategy();
     }
 
+    boolean hasValidTargets() {
+      return hasValidTargets;
+    }
+
     private ByteBuffer allocateBuffer(int length) {
       return ByteBuffer.allocate(length);
     }
@@ -505,24 +516,30 @@ public final class ErasureCodingWorker {
       }
     }
 
-    private void getTargetIndices() {
+    /**
+     * @return true if there is valid target for reconstruction
+     */
+    private boolean getTargetIndices() {
       BitSet bitset = new BitSet(dataBlkNum + parityBlkNum);
       for (int i = 0; i < sources.length; i++) {
         bitset.set(liveIndices[i]);
       }
       int m = 0;
       int k = 0;
+      boolean hasValidTarget = false;
       for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
         if (!bitset.get(i)) {
           if (getBlockLen(blockGroup, i) > 0) {
             if (m < targets.length) {
               targetIndices[m++] = (short)i;
+              hasValidTarget = true;
             }
           } else {
             zeroStripeIndices[k++] = (short)i;
           }
         }
       }
+      return hasValidTarget;
     }
 
     /** the reading length should not exceed the length for reconstruction. */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
index ea994a2..d91155a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
@@ -285,7 +285,7 @@ public class TestBlocksWithNotEnoughRacks {
     final DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
 
     try {
-      // Create a file with one block with a replication factor of 2
+      // Create a file with one block with a replication factor of 3
       final FileSystem fs = cluster.getFileSystem();
       DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
       ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
@@ -315,8 +315,9 @@ public class TestBlocksWithNotEnoughRacks {
       dm.removeDatanode(dnId);
 
       // Make sure we have enough live replicas even though we are
-      // short one rack and therefore need one replica
-      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
+      // short one rack. The cluster now has only 1 rack thus we just make sure
+      // we still have 3 replicas.
+      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
     } finally {
       cluster.shutdown();
     }
@@ -357,9 +358,8 @@ public class TestBlocksWithNotEnoughRacks {
 
       // The block gets re-replicated to another datanode so it has a 
       // sufficient # replicas, but not across racks, so there should
-      // be 1 rack, and 1 needed replica (even though there are 2 hosts 
-      // available and only 2 replicas required).
-      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
+      // be 1 rack.
+      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
 
       // Start the "failed" datanode, which has a replica so the block is
       // now over-replicated and therefore a replica should be removed but

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
new file mode 100644
index 0000000..2164957
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_DATA_BLOCKS;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_PARITY_BLOCKS;
+
+public class TestReconstructStripedBlocksWithRackAwareness {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      TestReconstructStripedBlocksWithRackAwareness.class);
+
+  static {
+    GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL);
+  }
+
+  private static final String[] hosts = new String[]{"host1", "host2", "host3",
+      "host4", "host5", "host6", "host7", "host8", "host9", "host10"};
+  private static final String[] racks = new String[]{"/r1", "/r1", "/r2", "/r2",
+      "/r3", "/r3", "/r4", "/r4", "/r5", "/r6"};
+  private static final List<String> singleNodeRacks = Arrays.asList("host9", "host10");
+  private static final short blockNum = (short) (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS);
+
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem fs;
+  private FSNamesystem fsn;
+  private BlockManager bm;
+
+  @Before
+  public void setup() throws Exception {
+    final HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
+        false);
+
+    cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts)
+        .numDataNodes(hosts.length).build();
+    cluster.waitActive();
+
+    fsn = cluster.getNamesystem();
+    bm = fsn.getBlockManager();
+
+    fs = cluster.getFileSystem();
+    fs.setErasureCodingPolicy(new Path("/"), null);
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * When there are all the internal blocks available but they are not placed on
+   * enough racks, NameNode should avoid normal decoding reconstruction but copy
+   * an internal block to a new rack.
+   *
+   * In this test, we first need to create a scenario that a striped block has
+   * all the internal blocks but distributed in <6 racks. Then we check if the
+   * replication monitor can correctly schedule the reconstruction work for it.
+   *
+   * For the 9 internal blocks + 5 racks setup, the test does the following:
+   * 1. create a 6 rack cluster with 10 datanodes, where there are 2 racks only
+   * containing 1 datanodes each
+   * 2. for a striped block with 9 internal blocks, there must be one internal
+   * block locating in a single-node rack. find this node and stop it
+   * 3. namenode will trigger reconstruction for the block and since the cluster
+   * has only 5 racks remaining, after the reconstruction we have 9 internal
+   * blocks distributed in 5 racks.
+   * 4. we bring the datanode back, now the cluster has 6 racks again
+   * 5. let the datanode call reportBadBlock, this will make the namenode to
+   * check if the striped block is placed in >= 6 racks, and the namenode will
+   * put the block into the under-replicated queue
+   * 6. now we can check if the replication monitor works as expected
+   */
+  @Test
+  public void testReconstructForNotEnoughRacks() throws Exception {
+    final Path file = new Path("/foo");
+    DFSTestUtil.createFile(fs, file,
+        BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS * 2, (short) 1, 0L);
+    Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
+
+    final INodeFile fileNode = fsn.getFSDirectory()
+        .getINode4Write(file.toString()).asFile();
+    BlockInfoStriped blockInfo = (BlockInfoStriped) fileNode.getLastBlock();
+
+    // find the internal block located in the single node rack
+    Block internalBlock = null;
+    String hostToStop = null;
+    for (DatanodeStorageInfo storage : blockInfo.storages) {
+      if (singleNodeRacks.contains(storage.getDatanodeDescriptor().getHostName())) {
+        hostToStop = storage.getDatanodeDescriptor().getHostName();
+        internalBlock = blockInfo.getBlockOnStorage(storage);
+      }
+    }
+    Assert.assertNotNull(internalBlock);
+    Assert.assertNotNull(hostToStop);
+
+    // delete the block on the chosen datanode
+    cluster.corruptBlockOnDataNodesByDeletingBlockFile(
+        new ExtendedBlock(bm.getBlockPoolId(), internalBlock));
+
+    // stop the chosen datanode
+    MiniDFSCluster.DataNodeProperties dnProp = null;
+    for (int i = 0; i < cluster.getDataNodes().size(); i++) {
+      DataNode dn = cluster.getDataNodes().get(i);
+      if (dn.getDatanodeId().getHostName().equals(hostToStop)) {
+        dnProp = cluster.stopDataNode(i);
+        cluster.setDataNodeDead(dn.getDatanodeId());
+        LOG.info("stop datanode " + dn.getDatanodeId().getHostName());
+      }
+    }
+    NetworkTopology topology = bm.getDatanodeManager().getNetworkTopology();
+    Assert.assertEquals(5, topology.getNumOfRacks());
+
+    // make sure the reconstruction work can finish
+    // now we have 9 internal blocks in 5 racks
+    DFSTestUtil.waitForReplication(fs, file, blockNum, 15 * 1000);
+
+    // we now should have 9 internal blocks distributed in 5 racks
+    Set<String> rackSet = new HashSet<>();
+    for (DatanodeStorageInfo storage : blockInfo.storages) {
+      rackSet.add(storage.getDatanodeDescriptor().getNetworkLocation());
+    }
+    Assert.assertEquals(5, rackSet.size());
+
+    // restart the stopped datanode
+    cluster.restartDataNode(dnProp);
+    cluster.waitActive();
+
+    // make sure we have 6 racks again
+    topology = bm.getDatanodeManager().getNetworkTopology();
+    Assert.assertEquals(hosts.length, topology.getNumOfLeaves());
+    Assert.assertEquals(6, topology.getNumOfRacks());
+
+    // pause all the heartbeats
+    DataNode badDn = null;
+    for (DataNode dn : cluster.getDataNodes()) {
+      DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+      if (dn.getDatanodeId().getHostName().equals(hostToStop)) {
+        badDn = dn;
+      }
+    }
+    assert badDn != null;
+    // let the DN report the bad block, so that the namenode will put the block
+    // into under-replicated queue. note that the block still has 9 internal
+    // blocks but in 5 racks
+    badDn.reportBadBlocks(new ExtendedBlock(bm.getBlockPoolId(), internalBlock));
+
+    // check if replication monitor correctly schedule the replication work
+    boolean scheduled = false;
+    for (int i = 0; i < 5; i++) { // retry 5 times
+      for (DatanodeStorageInfo storage : blockInfo.storages) {
+        if (storage != null) {
+          DatanodeDescriptor dn = storage.getDatanodeDescriptor();
+          Assert.assertEquals(0, dn.getNumberOfBlocksToBeErasureCoded());
+          if (dn.getNumberOfBlocksToBeReplicated() == 1) {
+            scheduled = true;
+          }
+        }
+      }
+      if (scheduled) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    Assert.assertTrue(scheduled);
+  }
+}