You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/02 07:58:35 UTC

[23/50] [abbrv] hadoop git commit: HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone classes from BlockManager. Contributed by Mingliang Liu.

HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone classes from BlockManager. Contributed by Mingliang Liu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6d12cd8d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6d12cd8d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6d12cd8d

Branch: refs/heads/HDFS-7285
Commit: 6d12cd8d609dec26d44cece9937c35b7d72a3cd1
Parents: cbb2495
Author: Haohui Mai <wh...@apache.org>
Authored: Fri Aug 28 14:10:40 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Aug 28 14:14:32 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../server/blockmanagement/BlockManager.java    | 368 +++++++------------
 .../blockmanagement/BlockToMarkCorrupt.java     |  87 +++++
 .../server/blockmanagement/ReplicationWork.java |  87 +++++
 4 files changed, 319 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d12cd8d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 67a6a6e..c6acfc8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -855,6 +855,9 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-8865. Improve quota initialization performance. (kihwal)
 
+    HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone
+    classes from BlockManager. (Mingliang Liu via wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d12cd8d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 95933d2..8f7bb55 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1181,24 +1181,24 @@ public class BlockManager implements BlockStatsMXBean {
       DatanodeStorageInfo storageInfo,
       DatanodeDescriptor node) throws IOException {
 
-    if (b.corrupted.isDeleted()) {
+    if (b.getCorrupted().isDeleted()) {
       blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
           " corrupt as it does not belong to any file", b);
-      addToInvalidates(b.corrupted, node);
+      addToInvalidates(b.getCorrupted(), node);
       return;
     } 
-    short expectedReplicas = b.corrupted.getReplication();
+    short expectedReplicas = b.getCorrupted().getReplication();
 
     // Add replica to the data-node if it is not already there
     if (storageInfo != null) {
-      storageInfo.addBlock(b.stored);
+      storageInfo.addBlock(b.getStored());
     }
 
     // Add this replica to corruptReplicas Map
-    corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
-        b.reasonCode);
+    corruptReplicas.addToCorruptReplicasMap(b.getCorrupted(), node,
+        b.getReason(), b.getReasonCode());
 
-    NumberReplicas numberOfReplicas = countNodes(b.stored);
+    NumberReplicas numberOfReplicas = countNodes(b.getStored());
     boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
         expectedReplicas;
     boolean minReplicationSatisfied =
@@ -1207,7 +1207,7 @@ public class BlockManager implements BlockStatsMXBean {
         (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
         expectedReplicas;
     boolean corruptedDuringWrite = minReplicationSatisfied &&
-        (b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp());
+        b.isCorruptedDuringWrite();
     // case 1: have enough number of live replicas
     // case 2: corrupted replicas + live replicas > Replication factor
     // case 3: Block is marked corrupt due to failure while writing. In this
@@ -1220,7 +1220,7 @@ public class BlockManager implements BlockStatsMXBean {
       invalidateBlock(b, node);
     } else if (namesystem.isPopulatingReplQueues()) {
       // add the block to neededReplication
-      updateNeededReplications(b.stored, -1, 0);
+      updateNeededReplications(b.getStored(), -1, 0);
     }
   }
 
@@ -1239,18 +1239,18 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     // Check how many copies we have of the block
-    NumberReplicas nr = countNodes(b.stored);
+    NumberReplicas nr = countNodes(b.getStored());
     if (nr.replicasOnStaleNodes() > 0) {
       blockLog.debug("BLOCK* invalidateBlocks: postponing " +
           "invalidation of {} on {} because {} replica(s) are located on " +
           "nodes with potentially out-of-date block reports", b, dn,
           nr.replicasOnStaleNodes());
-      postponeBlock(b.corrupted);
+      postponeBlock(b.getCorrupted());
       return false;
     } else if (nr.liveReplicas() >= 1) {
       // If we have at least one copy on a live node, then we can delete it.
-      addToInvalidates(b.corrupted, dn);
-      removeStoredBlock(b.stored, node);
+      addToInvalidates(b.getCorrupted(), dn);
+      removeStoredBlock(b.getStored(), node);
       blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.",
           b, dn);
       return true;
@@ -1338,71 +1338,18 @@ public class BlockManager implements BlockStatsMXBean {
    */
   @VisibleForTesting
   int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) {
-    int requiredReplication, numEffectiveReplicas;
-    List<DatanodeDescriptor> containingNodes;
-    DatanodeDescriptor srcNode;
-    BlockCollection bc = null;
-    int additionalReplRequired;
-
     int scheduledWork = 0;
-    List<ReplicationWork> work = new LinkedList<ReplicationWork>();
+    final List<ReplicationWork> work = new LinkedList<>();
 
     namesystem.writeLock();
     try {
       synchronized (neededReplications) {
         for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
           for (BlockInfo block : blocksToReplicate.get(priority)) {
-            // block should belong to a file
-            bc = getBlockCollection(block);
-            // abandoned block or block reopened for append
-            if (bc == null
-                || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
-              // remove from neededReplications
-              neededReplications.remove(block, priority);
-              continue;
-            }
-
-            requiredReplication = getExpectedReplicaNum(block);
-
-            // get a source data-node
-            containingNodes = new ArrayList<DatanodeDescriptor>();
-            List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
-            NumberReplicas numReplicas = new NumberReplicas();
-            srcNode = chooseSourceDatanode(
-                block, containingNodes, liveReplicaNodes, numReplicas,
-                priority);
-            if(srcNode == null) { // block can not be replicated from any node
-              LOG.debug("Block " + block + " cannot be repl from any node");
-              continue;
-            }
-
-            // liveReplicaNodes can include READ_ONLY_SHARED replicas which are 
-            // not included in the numReplicas.liveReplicas() count
-            assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
-
-            // do not schedule more if enough replicas is already pending
-            numEffectiveReplicas = numReplicas.liveReplicas() +
-                                    pendingReplications.getNumReplicas(block);
-      
-            if (numEffectiveReplicas >= requiredReplication) {
-              if ( (pendingReplications.getNumReplicas(block) > 0) ||
-                   (blockHasEnoughRacks(block)) ) {
-                neededReplications.remove(block, priority); // remove from neededReplications
-                blockLog.debug("BLOCK* Removing {} from neededReplications as" +
-                        " it has enough replicas", block);
-                continue;
-              }
-            }
-
-            if (numReplicas.liveReplicas() < requiredReplication) {
-              additionalReplRequired = requiredReplication
-                  - numEffectiveReplicas;
-            } else {
-              additionalReplRequired = 1; // Needed on a new rack
+            ReplicationWork rw = scheduleReplication(block, priority);
+            if (rw != null) {
+              work.add(rw);
             }
-            work.add(new ReplicationWork(block, bc, srcNode,
-                containingNodes, liveReplicaNodes, additionalReplRequired,
-                priority));
           }
         }
       }
@@ -1410,12 +1357,12 @@ public class BlockManager implements BlockStatsMXBean {
       namesystem.writeUnlock();
     }
 
-    final Set<Node> excludedNodes = new HashSet<Node>();
+    final Set<Node> excludedNodes = new HashSet<>();
     for(ReplicationWork rw : work){
       // Exclude all of the containing nodes from being targets.
       // This list includes decommissioning or corrupt nodes.
       excludedNodes.clear();
-      for (DatanodeDescriptor dn : rw.containingNodes) {
+      for (DatanodeDescriptor dn : rw.getContainingNodes()) {
         excludedNodes.add(dn);
       }
 
@@ -1428,67 +1375,15 @@ public class BlockManager implements BlockStatsMXBean {
     namesystem.writeLock();
     try {
       for(ReplicationWork rw : work){
-        final DatanodeStorageInfo[] targets = rw.targets;
+        final DatanodeStorageInfo[] targets = rw.getTargets();
         if(targets == null || targets.length == 0){
-          rw.targets = null;
+          rw.resetTargets();
           continue;
         }
 
         synchronized (neededReplications) {
-          BlockInfo block = rw.block;
-          int priority = rw.priority;
-          // Recheck since global lock was released
-          // block should belong to a file
-          bc = getBlockCollection(block);
-          // abandoned block or block reopened for append
-          if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
-            neededReplications.remove(block, priority); // remove from neededReplications
-            rw.targets = null;
-            continue;
-          }
-          requiredReplication = getExpectedReplicaNum(block);
-
-          // do not schedule more if enough replicas is already pending
-          NumberReplicas numReplicas = countNodes(block);
-          numEffectiveReplicas = numReplicas.liveReplicas() +
-            pendingReplications.getNumReplicas(block);
-
-          if (numEffectiveReplicas >= requiredReplication) {
-            if ( (pendingReplications.getNumReplicas(block) > 0) ||
-                 (blockHasEnoughRacks(block)) ) {
-              neededReplications.remove(block, priority); // remove from neededReplications
-              rw.targets = null;
-              blockLog.debug("BLOCK* Removing {} from neededReplications as" +
-                      " it has enough replicas", block);
-              continue;
-            }
-          }
-
-          if ( (numReplicas.liveReplicas() >= requiredReplication) &&
-               (!blockHasEnoughRacks(block)) ) {
-            if (rw.srcNode.getNetworkLocation().equals(
-                targets[0].getDatanodeDescriptor().getNetworkLocation())) {
-              //No use continuing, unless a new rack in this case
-              continue;
-            }
-          }
-
-          // Add block to the to be replicated list
-          rw.srcNode.addBlockToBeReplicated(block, targets);
-          scheduledWork++;
-          DatanodeStorageInfo.incrementBlocksScheduled(targets);
-
-          // Move the block-replication into a "pending" state.
-          // The reason we use 'pending' is so we can retry
-          // replications that fail after an appropriate amount of time.
-          pendingReplications.increment(block,
-              DatanodeStorageInfo.toDatanodeDescriptors(targets));
-          blockLog.debug("BLOCK* block {} is moved from neededReplications to "
-                  + "pendingReplications", block);
-
-          // remove from neededReplications
-          if(numEffectiveReplicas + targets.length >= requiredReplication) {
-            neededReplications.remove(block, priority); // remove from neededReplications
+          if (validateReplicationWork(rw)) {
+            scheduledWork++;
           }
         }
       }
@@ -1499,15 +1394,15 @@ public class BlockManager implements BlockStatsMXBean {
     if (blockLog.isInfoEnabled()) {
       // log which blocks have been scheduled for replication
       for(ReplicationWork rw : work){
-        DatanodeStorageInfo[] targets = rw.targets;
+        DatanodeStorageInfo[] targets = rw.getTargets();
         if (targets != null && targets.length != 0) {
           StringBuilder targetList = new StringBuilder("datanode(s)");
-          for (int k = 0; k < targets.length; k++) {
+          for (DatanodeStorageInfo target : targets) {
             targetList.append(' ');
-            targetList.append(targets[k].getDatanodeDescriptor());
+            targetList.append(target.getDatanodeDescriptor());
           }
-          blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNode,
-              rw.block, targetList);
+          blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNode(),
+              rw.getBlock(), targetList);
         }
       }
     }
@@ -1519,6 +1414,118 @@ public class BlockManager implements BlockStatsMXBean {
     return scheduledWork;
   }
 
+  boolean hasEnoughEffectiveReplicas(BlockInfo block,
+      NumberReplicas numReplicas, int pendingReplicaNum, int required) {
+    int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
+    return (numEffectiveReplicas >= required) &&
+        (pendingReplicaNum > 0 || blockHasEnoughRacks(block));
+  }
+
+  private ReplicationWork scheduleReplication(BlockInfo block, int priority) {
+    // block should belong to a file
+    BlockCollection bc = getBlockCollection(block);
+    // abandoned block or block reopened for append
+    if (bc == null
+        || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
+      // remove from neededReplications
+      neededReplications.remove(block, priority);
+      return null;
+    }
+
+    short requiredReplication = getExpectedReplicaNum(block);
+
+    // get a source data-node
+    List<DatanodeDescriptor> containingNodes = new ArrayList<>();
+    List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
+    NumberReplicas numReplicas = new NumberReplicas();
+    DatanodeDescriptor srcNode = chooseSourceDatanode(block, containingNodes,
+        liveReplicaNodes, numReplicas, priority);
+    if (srcNode == null) { // block can not be replicated from any node
+      LOG.debug("Block " + block + " cannot be repl from any node");
+      return null;
+    }
+
+    // liveReplicaNodes can include READ_ONLY_SHARED replicas which are
+    // not included in the numReplicas.liveReplicas() count
+    assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
+
+    int pendingNum = pendingReplications.getNumReplicas(block);
+    if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
+        requiredReplication)) {
+      neededReplications.remove(block, priority);
+      blockLog.debug("BLOCK* Removing {} from neededReplications as" +
+          " it has enough replicas", block);
+      return null;
+    }
+
+    final int additionalReplRequired;
+    if (numReplicas.liveReplicas() < requiredReplication) {
+      additionalReplRequired = requiredReplication - numReplicas.liveReplicas()
+          - pendingNum;
+    } else {
+      additionalReplRequired = 1; // Needed on a new rack
+    }
+    return new ReplicationWork(block, bc, srcNode, containingNodes,
+        liveReplicaNodes, additionalReplRequired, priority);
+  }
+
+  private boolean validateReplicationWork(ReplicationWork rw) {
+    BlockInfo block = rw.getBlock();
+    int priority = rw.getPriority();
+    // Recheck since global lock was released
+    // block should belong to a file
+    BlockCollection bc = getBlockCollection(block);
+    // abandoned block or block reopened for append
+    if (bc == null
+        || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
+      neededReplications.remove(block, priority);
+      rw.resetTargets();
+      return false;
+    }
+
+    // do not schedule more if enough replicas is already pending
+    final short requiredReplication = getExpectedReplicaNum(block);
+    NumberReplicas numReplicas = countNodes(block);
+    final int pendingNum = pendingReplications.getNumReplicas(block);
+    if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
+        requiredReplication)) {
+      neededReplications.remove(block, priority);
+      rw.resetTargets();
+      blockLog.debug("BLOCK* Removing {} from neededReplications as" +
+          " it has enough replicas", block);
+      return false;
+    }
+
+    DatanodeStorageInfo[] targets = rw.getTargets();
+    if ( (numReplicas.liveReplicas() >= requiredReplication) &&
+        (!blockHasEnoughRacks(block)) ) {
+      if (rw.getSrcNode().getNetworkLocation().equals(
+          targets[0].getDatanodeDescriptor().getNetworkLocation())) {
+        //No use continuing, unless a new rack in this case
+        return false;
+      }
+    }
+
+    // Add block to the to be replicated list
+    rw.getSrcNode().addBlockToBeReplicated(block, targets);
+    DatanodeStorageInfo.incrementBlocksScheduled(targets);
+
+    // Move the block-replication into a "pending" state.
+    // The reason we use 'pending' is so we can retry
+    // replications that fail after an appropriate amount of time.
+    pendingReplications.increment(block,
+        DatanodeStorageInfo.toDatanodeDescriptors(targets));
+    blockLog.debug("BLOCK* block {} is moved from neededReplications to "
+        + "pendingReplications", block);
+
+    int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
+    // remove from neededReplications
+    if(numEffectiveReplicas + targets.length >= requiredReplication) {
+      neededReplications.remove(block, priority);
+    }
+    return true;
+  }
+
   /** Choose target for WebHDFS redirection. */
   public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src,
       DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
@@ -1765,52 +1772,6 @@ public class BlockManager implements BlockStatsMXBean {
       this.reportedState = reportedState;
     }
   }
-  
-  /**
-   * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
-   * list of blocks that should be considered corrupt due to a block report.
-   */
-  private static class BlockToMarkCorrupt {
-    /** The corrupted block in a datanode. */
-    final BlockInfo corrupted;
-    /** The corresponding block stored in the BlockManager. */
-    final BlockInfo stored;
-    /** The reason to mark corrupt. */
-    final String reason;
-    /** The reason code to be stored */
-    final Reason reasonCode;
-
-    BlockToMarkCorrupt(BlockInfo corrupted,
-        BlockInfo stored, String reason,
-        Reason reasonCode) {
-      Preconditions.checkNotNull(corrupted, "corrupted is null");
-      Preconditions.checkNotNull(stored, "stored is null");
-
-      this.corrupted = corrupted;
-      this.stored = stored;
-      this.reason = reason;
-      this.reasonCode = reasonCode;
-    }
-
-    BlockToMarkCorrupt(BlockInfo stored, String reason,
-        Reason reasonCode) {
-      this(stored, stored, reason, reasonCode);
-    }
-
-    BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
-        Reason reasonCode) {
-      this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored,
-          reason, reasonCode);
-      //the corrupted block in datanode has a different generation stamp
-      corrupted.setGenerationStamp(gs);
-    }
-
-    @Override
-    public String toString() {
-      return corrupted + "("
-          + (corrupted == stored? "same as stored": "stored=" + stored) + ")";
-    }
-  }
 
   /**
    * The given storage is reporting all its blocks.
@@ -3797,51 +3758,6 @@ public class BlockManager implements BlockStatsMXBean {
         null);
   }
 
-  private static class ReplicationWork {
-
-    private final BlockInfo block;
-    private final BlockCollection bc;
-
-    private final DatanodeDescriptor srcNode;
-    private final List<DatanodeDescriptor> containingNodes;
-    private final List<DatanodeStorageInfo> liveReplicaStorages;
-    private final int additionalReplRequired;
-
-    private DatanodeStorageInfo targets[];
-    private final int priority;
-
-    public ReplicationWork(BlockInfo block,
-        BlockCollection bc,
-        DatanodeDescriptor srcNode,
-        List<DatanodeDescriptor> containingNodes,
-        List<DatanodeStorageInfo> liveReplicaStorages,
-        int additionalReplRequired,
-        int priority) {
-      this.block = block;
-      this.bc = bc;
-      this.srcNode = srcNode;
-      this.srcNode.incrementPendingReplicationWithoutTargets();
-      this.containingNodes = containingNodes;
-      this.liveReplicaStorages = liveReplicaStorages;
-      this.additionalReplRequired = additionalReplRequired;
-      this.priority = priority;
-      this.targets = null;
-    }
-    
-    private void chooseTargets(BlockPlacementPolicy blockplacement,
-        BlockStoragePolicySuite storagePolicySuite,
-        Set<Node> excludedNodes) {
-      try {
-        targets = blockplacement.chooseTarget(bc.getName(),
-            additionalReplRequired, srcNode, liveReplicaStorages, false,
-            excludedNodes, block.getNumBytes(),
-            storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
-      } finally {
-        srcNode.decrementPendingReplicationWithoutTargets();
-      }
-    }
-  }
-
   /**
    * A simple result enum for the result of
    * {@link BlockManager#processMisReplicatedBlock(BlockInfo)}.
@@ -3855,9 +3771,9 @@ public class BlockManager implements BlockStatsMXBean {
     OVER_REPLICATED,
     /** A decision can't currently be made about this block. */
     POSTPONE,
-    /** The block is under construction, so should be ignored */
+    /** The block is under construction, so should be ignored. */
     UNDER_CONSTRUCTION,
-    /** The block is properly replicated */
+    /** The block is properly replicated. */
     OK
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d12cd8d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
new file mode 100644
index 0000000..3842e56
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
+ * list of blocks that should be considered corrupt due to a block report.
+ */
+class BlockToMarkCorrupt {
+  /** The corrupted block in a datanode. */
+  private final BlockInfo corrupted;
+  /** The corresponding block stored in the BlockManager. */
+  private final BlockInfo stored;
+  /** The reason to mark corrupt. */
+  private final String reason;
+  /** The reason code to be stored */
+  private final CorruptReplicasMap.Reason reasonCode;
+
+  BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason,
+      CorruptReplicasMap.Reason reasonCode) {
+    Preconditions.checkNotNull(corrupted, "corrupted is null");
+    Preconditions.checkNotNull(stored, "stored is null");
+
+    this.corrupted = corrupted;
+    this.stored = stored;
+    this.reason = reason;
+    this.reasonCode = reasonCode;
+  }
+
+  BlockToMarkCorrupt(BlockInfo stored, String reason,
+      CorruptReplicasMap.Reason reasonCode) {
+    this(stored, stored, reason, reasonCode);
+  }
+
+  BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
+      CorruptReplicasMap.Reason reasonCode) {
+    this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored,
+        reason, reasonCode);
+    //the corrupted block in datanode has a different generation stamp
+    corrupted.setGenerationStamp(gs);
+  }
+
+  public boolean isCorruptedDuringWrite() {
+    return stored.getGenerationStamp() > corrupted.getGenerationStamp();
+  }
+
+  public BlockInfo getCorrupted() {
+    return corrupted;
+  }
+
+  public BlockInfo getStored() {
+    return stored;
+  }
+
+  public String getReason() {
+    return reason;
+  }
+
+  public Reason getReasonCode() {
+    return reasonCode;
+  }
+
+  @Override
+  public String toString() {
+    return corrupted + "("
+        + (corrupted == stored ? "same as stored": "stored=" + stored) + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d12cd8d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
new file mode 100644
index 0000000..f8a6dad
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.net.Node;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+class ReplicationWork {
+  private final BlockInfo block;
+  private final BlockCollection bc;
+  private final DatanodeDescriptor srcNode;
+  private final int additionalReplRequired;
+  private final int priority;
+  private final List<DatanodeDescriptor> containingNodes;
+  private final List<DatanodeStorageInfo> liveReplicaStorages;
+  private DatanodeStorageInfo[] targets;
+
+  public ReplicationWork(BlockInfo block, BlockCollection bc,
+      DatanodeDescriptor srcNode, List<DatanodeDescriptor> containingNodes,
+      List<DatanodeStorageInfo> liveReplicaStorages, int additionalReplRequired,
+      int priority) {
+    this.block = block;
+    this.bc = bc;
+    this.srcNode = srcNode;
+    this.srcNode.incrementPendingReplicationWithoutTargets();
+    this.containingNodes = containingNodes;
+    this.liveReplicaStorages = liveReplicaStorages;
+    this.additionalReplRequired = additionalReplRequired;
+    this.priority = priority;
+    this.targets = null;
+  }
+
+  void chooseTargets(BlockPlacementPolicy blockplacement,
+      BlockStoragePolicySuite storagePolicySuite,
+      Set<Node> excludedNodes) {
+    try {
+      targets = blockplacement.chooseTarget(bc.getName(),
+          additionalReplRequired, srcNode, liveReplicaStorages, false,
+          excludedNodes, block.getNumBytes(),
+          storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
+    } finally {
+      srcNode.decrementPendingReplicationWithoutTargets();
+    }
+  }
+
+  DatanodeStorageInfo[] getTargets() {
+    return targets;
+  }
+
+  void resetTargets() {
+    this.targets = null;
+  }
+
+  List<DatanodeDescriptor> getContainingNodes() {
+    return Collections.unmodifiableList(containingNodes);
+  }
+
+  public int getPriority() {
+    return priority;
+  }
+
+  public BlockInfo getBlock() {
+    return block;
+  }
+
+  public DatanodeDescriptor getSrcNode() {
+    return srcNode;
+  }
+}