You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/02 07:58:30 UTC

[18/50] [abbrv] hadoop git commit: Revert "HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone classes from BlockManager. Contributed by Mingliang Liu."

Revert "HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone classes from BlockManager. Contributed by Mingliang Liu."

This reverts commit 4e9307f26dd41270f95fb50166e1a091852e4d58.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/035ed261
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/035ed261
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/035ed261

Branch: refs/heads/HDFS-7285
Commit: 035ed26147f10620fc6ed3a514d9ebbcc31304b5
Parents: 4e9307f
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Aug 27 16:09:35 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Thu Aug 27 16:09:35 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 -
 .../server/blockmanagement/BlockManager.java    | 368 ++++++++++++-------
 .../blockmanagement/BlockToMarkCorrupt.java     |  87 -----
 .../server/blockmanagement/ReplicationWork.java |  87 -----
 4 files changed, 226 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/035ed261/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index c4c8c3b..9cc3326 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -853,9 +853,6 @@ Release 2.8.0 - UNRELEASED
     HDFS-8962. Clean up checkstyle warnings in o.a.h.hdfs.DfsClientConf.
     (Mingliang Liu via wheat9)
 
-    HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone
-    classes from BlockManager. (Mingliang Liu via wheat9)
-
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/035ed261/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 5a05fa7..95933d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1181,24 +1181,24 @@ public class BlockManager implements BlockStatsMXBean {
       DatanodeStorageInfo storageInfo,
       DatanodeDescriptor node) throws IOException {
 
-    if (b.getCorrupted().isDeleted()) {
+    if (b.corrupted.isDeleted()) {
       blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
           " corrupt as it does not belong to any file", b);
-      addToInvalidates(b.getCorrupted(), node);
+      addToInvalidates(b.corrupted, node);
       return;
     } 
-    short expectedReplicas = b.getCorrupted().getReplication();
+    short expectedReplicas = b.corrupted.getReplication();
 
     // Add replica to the data-node if it is not already there
     if (storageInfo != null) {
-      storageInfo.addBlock(b.getStored());
+      storageInfo.addBlock(b.stored);
     }
 
     // Add this replica to corruptReplicas Map
-    corruptReplicas.addToCorruptReplicasMap(b.getCorrupted(), node,
-        b.getReason(), b.getReasonCode());
+    corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
+        b.reasonCode);
 
-    NumberReplicas numberOfReplicas = countNodes(b.getStored());
+    NumberReplicas numberOfReplicas = countNodes(b.stored);
     boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
         expectedReplicas;
     boolean minReplicationSatisfied =
@@ -1207,7 +1207,7 @@ public class BlockManager implements BlockStatsMXBean {
         (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
         expectedReplicas;
     boolean corruptedDuringWrite = minReplicationSatisfied &&
-        b.isCorruptedDuringWrite();
+        (b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp());
     // case 1: have enough number of live replicas
     // case 2: corrupted replicas + live replicas > Replication factor
     // case 3: Block is marked corrupt due to failure while writing. In this
@@ -1220,7 +1220,7 @@ public class BlockManager implements BlockStatsMXBean {
       invalidateBlock(b, node);
     } else if (namesystem.isPopulatingReplQueues()) {
       // add the block to neededReplication
-      updateNeededReplications(b.getStored(), -1, 0);
+      updateNeededReplications(b.stored, -1, 0);
     }
   }
 
@@ -1239,18 +1239,18 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     // Check how many copies we have of the block
-    NumberReplicas nr = countNodes(b.getStored());
+    NumberReplicas nr = countNodes(b.stored);
     if (nr.replicasOnStaleNodes() > 0) {
       blockLog.debug("BLOCK* invalidateBlocks: postponing " +
           "invalidation of {} on {} because {} replica(s) are located on " +
           "nodes with potentially out-of-date block reports", b, dn,
           nr.replicasOnStaleNodes());
-      postponeBlock(b.getCorrupted());
+      postponeBlock(b.corrupted);
       return false;
     } else if (nr.liveReplicas() >= 1) {
       // If we have at least one copy on a live node, then we can delete it.
-      addToInvalidates(b.getCorrupted(), dn);
-      removeStoredBlock(b.getStored(), node);
+      addToInvalidates(b.corrupted, dn);
+      removeStoredBlock(b.stored, node);
       blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.",
           b, dn);
       return true;
@@ -1338,18 +1338,71 @@ public class BlockManager implements BlockStatsMXBean {
    */
   @VisibleForTesting
   int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) {
+    int requiredReplication, numEffectiveReplicas;
+    List<DatanodeDescriptor> containingNodes;
+    DatanodeDescriptor srcNode;
+    BlockCollection bc = null;
+    int additionalReplRequired;
+
     int scheduledWork = 0;
-    final List<ReplicationWork> work = new LinkedList<>();
+    List<ReplicationWork> work = new LinkedList<ReplicationWork>();
 
     namesystem.writeLock();
     try {
       synchronized (neededReplications) {
         for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
           for (BlockInfo block : blocksToReplicate.get(priority)) {
-            ReplicationWork rw = scheduleReplication(block, priority);
-            if (rw != null) {
-              work.add(rw);
+            // block should belong to a file
+            bc = getBlockCollection(block);
+            // abandoned block or block reopened for append
+            if (bc == null
+                || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
+              // remove from neededReplications
+              neededReplications.remove(block, priority);
+              continue;
+            }
+
+            requiredReplication = getExpectedReplicaNum(block);
+
+            // get a source data-node
+            containingNodes = new ArrayList<DatanodeDescriptor>();
+            List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
+            NumberReplicas numReplicas = new NumberReplicas();
+            srcNode = chooseSourceDatanode(
+                block, containingNodes, liveReplicaNodes, numReplicas,
+                priority);
+            if(srcNode == null) { // block can not be replicated from any node
+              LOG.debug("Block " + block + " cannot be repl from any node");
+              continue;
+            }
+
+            // liveReplicaNodes can include READ_ONLY_SHARED replicas which are 
+            // not included in the numReplicas.liveReplicas() count
+            assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
+
+            // do not schedule more if enough replicas is already pending
+            numEffectiveReplicas = numReplicas.liveReplicas() +
+                                    pendingReplications.getNumReplicas(block);
+      
+            if (numEffectiveReplicas >= requiredReplication) {
+              if ( (pendingReplications.getNumReplicas(block) > 0) ||
+                   (blockHasEnoughRacks(block)) ) {
+                neededReplications.remove(block, priority); // remove from neededReplications
+                blockLog.debug("BLOCK* Removing {} from neededReplications as" +
+                        " it has enough replicas", block);
+                continue;
+              }
+            }
+
+            if (numReplicas.liveReplicas() < requiredReplication) {
+              additionalReplRequired = requiredReplication
+                  - numEffectiveReplicas;
+            } else {
+              additionalReplRequired = 1; // Needed on a new rack
             }
+            work.add(new ReplicationWork(block, bc, srcNode,
+                containingNodes, liveReplicaNodes, additionalReplRequired,
+                priority));
           }
         }
       }
@@ -1357,12 +1410,12 @@ public class BlockManager implements BlockStatsMXBean {
       namesystem.writeUnlock();
     }
 
-    final Set<Node> excludedNodes = new HashSet<>();
+    final Set<Node> excludedNodes = new HashSet<Node>();
     for(ReplicationWork rw : work){
       // Exclude all of the containing nodes from being targets.
       // This list includes decommissioning or corrupt nodes.
       excludedNodes.clear();
-      for (DatanodeDescriptor dn : rw.getContainingNodes()) {
+      for (DatanodeDescriptor dn : rw.containingNodes) {
         excludedNodes.add(dn);
       }
 
@@ -1375,15 +1428,67 @@ public class BlockManager implements BlockStatsMXBean {
     namesystem.writeLock();
     try {
       for(ReplicationWork rw : work){
-        final DatanodeStorageInfo[] targets = rw.getTargets();
+        final DatanodeStorageInfo[] targets = rw.targets;
         if(targets == null || targets.length == 0){
-          rw.resetTargets();
+          rw.targets = null;
           continue;
         }
 
         synchronized (neededReplications) {
-          if (validateReplicationWork(rw)) {
-            scheduledWork++;
+          BlockInfo block = rw.block;
+          int priority = rw.priority;
+          // Recheck since global lock was released
+          // block should belong to a file
+          bc = getBlockCollection(block);
+          // abandoned block or block reopened for append
+          if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
+            neededReplications.remove(block, priority); // remove from neededReplications
+            rw.targets = null;
+            continue;
+          }
+          requiredReplication = getExpectedReplicaNum(block);
+
+          // do not schedule more if enough replicas is already pending
+          NumberReplicas numReplicas = countNodes(block);
+          numEffectiveReplicas = numReplicas.liveReplicas() +
+            pendingReplications.getNumReplicas(block);
+
+          if (numEffectiveReplicas >= requiredReplication) {
+            if ( (pendingReplications.getNumReplicas(block) > 0) ||
+                 (blockHasEnoughRacks(block)) ) {
+              neededReplications.remove(block, priority); // remove from neededReplications
+              rw.targets = null;
+              blockLog.debug("BLOCK* Removing {} from neededReplications as" +
+                      " it has enough replicas", block);
+              continue;
+            }
+          }
+
+          if ( (numReplicas.liveReplicas() >= requiredReplication) &&
+               (!blockHasEnoughRacks(block)) ) {
+            if (rw.srcNode.getNetworkLocation().equals(
+                targets[0].getDatanodeDescriptor().getNetworkLocation())) {
+              //No use continuing, unless a new rack in this case
+              continue;
+            }
+          }
+
+          // Add block to the to be replicated list
+          rw.srcNode.addBlockToBeReplicated(block, targets);
+          scheduledWork++;
+          DatanodeStorageInfo.incrementBlocksScheduled(targets);
+
+          // Move the block-replication into a "pending" state.
+          // The reason we use 'pending' is so we can retry
+          // replications that fail after an appropriate amount of time.
+          pendingReplications.increment(block,
+              DatanodeStorageInfo.toDatanodeDescriptors(targets));
+          blockLog.debug("BLOCK* block {} is moved from neededReplications to "
+                  + "pendingReplications", block);
+
+          // remove from neededReplications
+          if(numEffectiveReplicas + targets.length >= requiredReplication) {
+            neededReplications.remove(block, priority); // remove from neededReplications
           }
         }
       }
@@ -1394,15 +1499,15 @@ public class BlockManager implements BlockStatsMXBean {
     if (blockLog.isInfoEnabled()) {
       // log which blocks have been scheduled for replication
       for(ReplicationWork rw : work){
-        DatanodeStorageInfo[] targets = rw.getTargets();
+        DatanodeStorageInfo[] targets = rw.targets;
         if (targets != null && targets.length != 0) {
           StringBuilder targetList = new StringBuilder("datanode(s)");
-          for (DatanodeStorageInfo target : targets) {
+          for (int k = 0; k < targets.length; k++) {
             targetList.append(' ');
-            targetList.append(target.getDatanodeDescriptor());
+            targetList.append(targets[k].getDatanodeDescriptor());
           }
-          blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNode(),
-              rw.getBlock(), targetList);
+          blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNode,
+              rw.block, targetList);
         }
       }
     }
@@ -1414,118 +1519,6 @@ public class BlockManager implements BlockStatsMXBean {
     return scheduledWork;
   }
 
-  boolean hasEnoughEffectiveReplicas(BlockInfo block,
-      NumberReplicas numReplicas, int pendingReplicaNum, int required) {
-    int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
-    return (numEffectiveReplicas >= required) &&
-        (pendingReplicaNum > 0 || blockHasEnoughRacks(block));
-  }
-
-  private ReplicationWork scheduleReplication(BlockInfo block, int priority) {
-    // block should belong to a file
-    BlockCollection bc = getBlockCollection(block);
-    // abandoned block or block reopened for append
-    if (bc == null
-        || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
-      // remove from neededReplications
-      neededReplications.remove(block, priority);
-      return null;
-    }
-
-    short requiredReplication = getExpectedReplicaNum(block);
-
-    // get a source data-node
-    List<DatanodeDescriptor> containingNodes = new ArrayList<>();
-    List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
-    NumberReplicas numReplicas = new NumberReplicas();
-    DatanodeDescriptor srcNode = chooseSourceDatanode(block, containingNodes,
-        liveReplicaNodes, numReplicas, priority);
-    if (srcNode == null) { // block can not be replicated from any node
-      LOG.debug("Block " + block + " cannot be repl from any node");
-      return null;
-    }
-
-    // liveReplicaNodes can include READ_ONLY_SHARED replicas which are
-    // not included in the numReplicas.liveReplicas() count
-    assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
-
-    int pendingNum = pendingReplications.getNumReplicas(block);
-    if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
-        requiredReplication)) {
-      neededReplications.remove(block, priority);
-      blockLog.debug("BLOCK* Removing {} from neededReplications as" +
-          " it has enough replicas", block);
-      return null;
-    }
-
-    final int additionalReplRequired;
-    if (numReplicas.liveReplicas() < requiredReplication) {
-      additionalReplRequired = requiredReplication - numReplicas.liveReplicas()
-          - pendingNum;
-    } else {
-      additionalReplRequired = 1; // Needed on a new rack
-    }
-    return new ReplicationWork(block, bc, srcNode, containingNodes,
-        liveReplicaNodes, additionalReplRequired, priority);
-  }
-
-  private boolean validateReplicationWork(ReplicationWork rw) {
-    BlockInfo block = rw.getBlock();
-    int priority = rw.getPriority();
-    // Recheck since global lock was released
-    // block should belong to a file
-    BlockCollection bc = getBlockCollection(block);
-    // abandoned block or block reopened for append
-    if(bc == null
-        || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
-      neededReplications.remove(block, priority);
-      rw.resetTargets();
-      return false;
-    }
-
-    // do not schedule more if enough replicas is already pending
-    final short requiredReplication = getExpectedReplicaNum(block);
-    NumberReplicas numReplicas = countNodes(block);
-    final int pendingNum = pendingReplications.getNumReplicas(block);
-    if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
-        requiredReplication)) {
-      neededReplications.remove(block, priority);
-      rw.resetTargets();
-      blockLog.debug("BLOCK* Removing {} from neededReplications as" +
-          " it has enough replicas", block);
-      return false;
-    }
-
-    DatanodeStorageInfo[] targets = rw.getTargets();
-    if ( (numReplicas.liveReplicas() >= requiredReplication) &&
-        (!blockHasEnoughRacks(block)) ) {
-      if (rw.getSrcNode().getNetworkLocation().equals(
-          targets[0].getDatanodeDescriptor().getNetworkLocation())) {
-        //No use continuing, unless a new rack in this case
-        return false;
-      }
-    }
-
-    // Add block to the to be replicated list
-    rw.getSrcNode().addBlockToBeReplicated(block, targets);
-    DatanodeStorageInfo.incrementBlocksScheduled(targets);
-
-    // Move the block-replication into a "pending" state.
-    // The reason we use 'pending' is so we can retry
-    // replications that fail after an appropriate amount of time.
-    pendingReplications.increment(block,
-        DatanodeStorageInfo.toDatanodeDescriptors(targets));
-    blockLog.debug("BLOCK* block {} is moved from neededReplications to "
-        + "pendingReplications", block);
-
-    int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
-    // remove from neededReplications
-    if(numEffectiveReplicas + targets.length >= requiredReplication) {
-      neededReplications.remove(block, priority);
-    }
-    return true;
-  }
-
   /** Choose target for WebHDFS redirection. */
   public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src,
       DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
@@ -1772,6 +1765,52 @@ public class BlockManager implements BlockStatsMXBean {
       this.reportedState = reportedState;
     }
   }
+  
+  /**
+   * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
+   * list of blocks that should be considered corrupt due to a block report.
+   */
+  private static class BlockToMarkCorrupt {
+    /** The corrupted block in a datanode. */
+    final BlockInfo corrupted;
+    /** The corresponding block stored in the BlockManager. */
+    final BlockInfo stored;
+    /** The reason to mark corrupt. */
+    final String reason;
+    /** The reason code to be stored */
+    final Reason reasonCode;
+
+    BlockToMarkCorrupt(BlockInfo corrupted,
+        BlockInfo stored, String reason,
+        Reason reasonCode) {
+      Preconditions.checkNotNull(corrupted, "corrupted is null");
+      Preconditions.checkNotNull(stored, "stored is null");
+
+      this.corrupted = corrupted;
+      this.stored = stored;
+      this.reason = reason;
+      this.reasonCode = reasonCode;
+    }
+
+    BlockToMarkCorrupt(BlockInfo stored, String reason,
+        Reason reasonCode) {
+      this(stored, stored, reason, reasonCode);
+    }
+
+    BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
+        Reason reasonCode) {
+      this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored,
+          reason, reasonCode);
+      //the corrupted block in datanode has a different generation stamp
+      corrupted.setGenerationStamp(gs);
+    }
+
+    @Override
+    public String toString() {
+      return corrupted + "("
+          + (corrupted == stored? "same as stored": "stored=" + stored) + ")";
+    }
+  }
 
   /**
    * The given storage is reporting all its blocks.
@@ -3758,6 +3797,51 @@ public class BlockManager implements BlockStatsMXBean {
         null);
   }
 
+  private static class ReplicationWork {
+
+    private final BlockInfo block;
+    private final BlockCollection bc;
+
+    private final DatanodeDescriptor srcNode;
+    private final List<DatanodeDescriptor> containingNodes;
+    private final List<DatanodeStorageInfo> liveReplicaStorages;
+    private final int additionalReplRequired;
+
+    private DatanodeStorageInfo targets[];
+    private final int priority;
+
+    public ReplicationWork(BlockInfo block,
+        BlockCollection bc,
+        DatanodeDescriptor srcNode,
+        List<DatanodeDescriptor> containingNodes,
+        List<DatanodeStorageInfo> liveReplicaStorages,
+        int additionalReplRequired,
+        int priority) {
+      this.block = block;
+      this.bc = bc;
+      this.srcNode = srcNode;
+      this.srcNode.incrementPendingReplicationWithoutTargets();
+      this.containingNodes = containingNodes;
+      this.liveReplicaStorages = liveReplicaStorages;
+      this.additionalReplRequired = additionalReplRequired;
+      this.priority = priority;
+      this.targets = null;
+    }
+    
+    private void chooseTargets(BlockPlacementPolicy blockplacement,
+        BlockStoragePolicySuite storagePolicySuite,
+        Set<Node> excludedNodes) {
+      try {
+        targets = blockplacement.chooseTarget(bc.getName(),
+            additionalReplRequired, srcNode, liveReplicaStorages, false,
+            excludedNodes, block.getNumBytes(),
+            storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
+      } finally {
+        srcNode.decrementPendingReplicationWithoutTargets();
+      }
+    }
+  }
+
   /**
    * A simple result enum for the result of
    * {@link BlockManager#processMisReplicatedBlock(BlockInfo)}.
@@ -3771,9 +3855,9 @@ public class BlockManager implements BlockStatsMXBean {
     OVER_REPLICATED,
     /** A decision can't currently be made about this block. */
     POSTPONE,
-    /** The block is under construction, so should be ignored. */
+    /** The block is under construction, so should be ignored */
     UNDER_CONSTRUCTION,
-    /** The block is properly replicated. */
+    /** The block is properly replicated */
     OK
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/035ed261/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
deleted file mode 100644
index 3842e56..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import static org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
-
-import com.google.common.base.Preconditions;
-
-/**
- * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
- * list of blocks that should be considered corrupt due to a block report.
- */
-class BlockToMarkCorrupt {
-  /** The corrupted block in a datanode. */
-  private final BlockInfo corrupted;
-  /** The corresponding block stored in the BlockManager. */
-  private final BlockInfo stored;
-  /** The reason to mark corrupt. */
-  private final String reason;
-  /** The reason code to be stored */
-  private final CorruptReplicasMap.Reason reasonCode;
-
-  BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason,
-      CorruptReplicasMap.Reason reasonCode) {
-    Preconditions.checkNotNull(corrupted, "corrupted is null");
-    Preconditions.checkNotNull(stored, "stored is null");
-
-    this.corrupted = corrupted;
-    this.stored = stored;
-    this.reason = reason;
-    this.reasonCode = reasonCode;
-  }
-
-  BlockToMarkCorrupt(BlockInfo stored, String reason,
-      CorruptReplicasMap.Reason reasonCode) {
-    this(stored, stored, reason, reasonCode);
-  }
-
-  BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
-      CorruptReplicasMap.Reason reasonCode) {
-    this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored,
-        reason, reasonCode);
-    //the corrupted block in datanode has a different generation stamp
-    corrupted.setGenerationStamp(gs);
-  }
-
-  public boolean isCorruptedDuringWrite() {
-    return stored.getGenerationStamp() > corrupted.getGenerationStamp();
-  }
-
-  public BlockInfo getCorrupted() {
-    return corrupted;
-  }
-
-  public BlockInfo getStored() {
-    return stored;
-  }
-
-  public String getReason() {
-    return reason;
-  }
-
-  public Reason getReasonCode() {
-    return reasonCode;
-  }
-
-  @Override
-  public String toString() {
-    return corrupted + "("
-        + (corrupted == stored ? "same as stored": "stored=" + stored) + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/035ed261/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
deleted file mode 100644
index f8a6dad..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import org.apache.hadoop.net.Node;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-class ReplicationWork {
-  private final BlockInfo block;
-  private final BlockCollection bc;
-  private final DatanodeDescriptor srcNode;
-  private final int additionalReplRequired;
-  private final int priority;
-  private final List<DatanodeDescriptor> containingNodes;
-  private final List<DatanodeStorageInfo> liveReplicaStorages;
-  private DatanodeStorageInfo[] targets;
-
-  public ReplicationWork(BlockInfo block, BlockCollection bc,
-      DatanodeDescriptor srcNode, List<DatanodeDescriptor> containingNodes,
-      List<DatanodeStorageInfo> liveReplicaStorages, int additionalReplRequired,
-      int priority) {
-    this.block = block;
-    this.bc = bc;
-    this.srcNode = srcNode;
-    this.srcNode.incrementPendingReplicationWithoutTargets();
-    this.containingNodes = containingNodes;
-    this.liveReplicaStorages = liveReplicaStorages;
-    this.additionalReplRequired = additionalReplRequired;
-    this.priority = priority;
-    this.targets = null;
-  }
-
-  void chooseTargets(BlockPlacementPolicy blockplacement,
-      BlockStoragePolicySuite storagePolicySuite,
-      Set<Node> excludedNodes) {
-    try {
-      targets = blockplacement.chooseTarget(bc.getName(),
-          additionalReplRequired, srcNode, liveReplicaStorages, false,
-          excludedNodes, block.getNumBytes(),
-          storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
-    } finally {
-      srcNode.decrementPendingReplicationWithoutTargets();
-    }
-  }
-
-  DatanodeStorageInfo[] getTargets() {
-    return targets;
-  }
-
-  void resetTargets() {
-    this.targets = null;
-  }
-
-  List<DatanodeDescriptor> getContainingNodes() {
-    return Collections.unmodifiableList(containingNodes);
-  }
-
-  public int getPriority() {
-    return priority;
-  }
-
-  public BlockInfo getBlock() {
-    return block;
-  }
-
-  public DatanodeDescriptor getSrcNode() {
-    return srcNode;
-  }
-}