You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC

svn commit: r1619012 [11/35] - in /hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Aug 19 23:49:39 2014
@@ -261,13 +261,19 @@ public class BlockManager {
     this.namesystem = namesystem;
     datanodeManager = new DatanodeManager(this, namesystem, conf);
     heartbeatManager = datanodeManager.getHeartbeatManager();
-    invalidateBlocks = new InvalidateBlocks(datanodeManager);
+
+    final long pendingPeriod = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
+        DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT) * 1000L;
+    invalidateBlocks = new InvalidateBlocks(
+        datanodeManager.blockInvalidateLimit, pendingPeriod);
 
     // Compute the map capacity by allocating 2% of total memory
     blocksMap = new BlocksMap(
         LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
     blockplacement = BlockPlacementPolicy.getInstance(
-        conf, stats, datanodeManager.getNetworkTopology());
+        conf, stats, datanodeManager.getNetworkTopology(), 
+        datanodeManager.getHost2DatanodeMap());
     pendingReplications = new PendingReplicationBlocks(conf.getInt(
       DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
       DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
@@ -423,10 +429,8 @@ public class BlockManager {
 
   public void close() {
     try {
-      if (replicationThread != null) {
-        replicationThread.interrupt();
-        replicationThread.join(3000);
-      }
+      replicationThread.interrupt();
+      replicationThread.join(3000);
     } catch (InterruptedException ie) {
     }
     datanodeManager.close();
@@ -549,7 +553,6 @@ public class BlockManager {
   }
 
   /**
-   * @param block
    * @return true if the block has minimum replicas
    */
   public boolean checkMinReplication(Block block) {
@@ -699,7 +702,7 @@ public class BlockManager {
 
     // remove this block from the list of pending blocks to be deleted. 
     for (DatanodeStorageInfo storage : targets) {
-      invalidateBlocks.remove(storage.getStorageID(), oldBlock);
+      invalidateBlocks.remove(storage.getDatanodeDescriptor(), oldBlock);
     }
     
     // Adjust safe-mode totals, since under-construction blocks don't
@@ -722,9 +725,8 @@ public class BlockManager {
     final List<DatanodeStorageInfo> locations
         = new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block));
     for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
-      final String storageID = storage.getStorageID();
       // filter invalidate replicas
-      if(!invalidateBlocks.contains(storageID, block)) {
+      if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) {
         locations.add(storage);
       }
     }
@@ -819,7 +821,7 @@ public class BlockManager {
       for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
         final DatanodeDescriptor d = storage.getDatanodeDescriptor();
         final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
-        if (isCorrupt || (!isCorrupt && !replicaCorrupt))
+        if (isCorrupt || (!replicaCorrupt))
           machines[j++] = storage;
       }
     }
@@ -943,6 +945,16 @@ public class BlockManager {
   }
 
   /**
+   * Check if a block is replicated to at least the minimum replication.
+   */
+  public boolean isSufficientlyReplicated(BlockInfo b) {
+    // Compare against the lesser of the minReplication and number of live DNs.
+    final int replication =
+        Math.min(minReplication, getDatanodeManager().getNumLiveDataNodes());
+    return countNodes(b).liveReplicas() >= replication;
+  }
+
+  /**
    * return a list of blocks & their locations on <code>datanode</code> whose
    * total size is <code>size</code>
    * 
@@ -1010,9 +1022,11 @@ public class BlockManager {
     while(it.hasNext()) {
       removeStoredBlock(it.next(), node);
     }
+    // Remove all pending DN messages referencing this DN.
+    pendingDNMessages.removeAllMessagesForDatanode(node);
 
     node.resetBlocks();
-    invalidateBlocks.remove(node.getDatanodeUuid());
+    invalidateBlocks.remove(node);
     
     // If the DN hasn't block-reported since the most recent
     // failover, then we may have been holding up on processing
@@ -1035,6 +1049,9 @@ public class BlockManager {
    * datanode and log the operation
    */
   void addToInvalidates(final Block block, final DatanodeInfo datanode) {
+    if (!namesystem.isPopulatingReplQueues()) {
+      return;
+    }
     invalidateBlocks.add(block, datanode, true);
   }
 
@@ -1043,6 +1060,9 @@ public class BlockManager {
    * datanodes.
    */
   private void addToInvalidates(Block b) {
+    if (!namesystem.isPopulatingReplQueues()) {
+      return;
+    }
     StringBuilder datanodes = new StringBuilder();
     for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
@@ -1059,6 +1079,7 @@ public class BlockManager {
    * Mark the block belonging to datanode as corrupt
    * @param blk Block to be marked as corrupt
    * @param dn Datanode which holds the corrupt replica
+   * @param storageID if known, null otherwise.
    * @param reason a textual reason why the block should be marked corrupt,
    * for logging purposes
    */
@@ -1075,17 +1096,29 @@ public class BlockManager {
           + blk + " not found");
       return;
     }
-    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason,
-        Reason.CORRUPTION_REPORTED), dn, storageID);
-  }
 
-  private void markBlockAsCorrupt(BlockToMarkCorrupt b,
-      DatanodeInfo dn, String storageID) throws IOException {
     DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
-      throw new IOException("Cannot mark " + b
-          + " as corrupt because datanode " + dn + " does not exist");
+      throw new IOException("Cannot mark " + blk
+          + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
+          + ") does not exist");
     }
+    
+    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
+            blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
+        storageID == null ? null : node.getStorageInfo(storageID),
+        node);
+  }
+
+  /**
+   * 
+   * @param b
+   * @param storageInfo storage that contains the block, if known. null otherwise.
+   * @throws IOException
+   */
+  private void markBlockAsCorrupt(BlockToMarkCorrupt b,
+      DatanodeStorageInfo storageInfo,
+      DatanodeDescriptor node) throws IOException {
 
     BlockCollection bc = b.corrupted.getBlockCollection();
     if (bc == null) {
@@ -1096,12 +1129,32 @@ public class BlockManager {
     } 
 
     // Add replica to the data-node if it is not already there
-    node.addBlock(storageID, b.stored);
+    if (storageInfo != null) {
+      storageInfo.addBlock(b.stored);
+    }
 
     // Add this replica to corruptReplicas Map
     corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
         b.reasonCode);
-    if (countNodes(b.stored).liveReplicas() >= bc.getBlockReplication()) {
+
+    NumberReplicas numberOfReplicas = countNodes(b.stored);
+    boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= bc
+        .getBlockReplication();
+    boolean minReplicationSatisfied =
+        numberOfReplicas.liveReplicas() >= minReplication;
+    boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
+        (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
+        bc.getBlockReplication();
+    boolean corruptedDuringWrite = minReplicationSatisfied &&
+        (b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp());
+    // case 1: have enough number of live replicas
+    // case 2: corrupted replicas + live replicas > Replication factor
+    // case 3: Block is marked corrupt due to failure while writing. In this
+    //         case genstamp will be different than that of valid block.
+    // In all these cases we can delete the replica.
+    // In case of 3, rbw block will be deleted and valid block can be replicated
+    if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
+        || corruptedDuringWrite) {
       // the block is over-replicated so invalidate the replicas immediately
       invalidateBlock(b, node);
     } else if (namesystem.isPopulatingReplQueues()) {
@@ -1179,14 +1232,20 @@ public class BlockManager {
    * @return total number of block for deletion
    */
   int computeInvalidateWork(int nodesToProcess) {
-    final List<String> nodes = invalidateBlocks.getStorageIDs();
+    final List<DatanodeInfo> nodes = invalidateBlocks.getDatanodes();
     Collections.shuffle(nodes);
 
     nodesToProcess = Math.min(nodes.size(), nodesToProcess);
 
     int blockCnt = 0;
-    for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
-      blockCnt += invalidateWorkForOneNode(nodes.get(nodeCnt));
+    for (DatanodeInfo dnInfo : nodes) {
+      int blocks = invalidateWorkForOneNode(dnInfo);
+      if (blocks > 0) {
+        blockCnt += blocks;
+        if (--nodesToProcess == 0) {
+          break;
+        }
+      }
     }
     return blockCnt;
   }
@@ -1411,7 +1470,7 @@ public class BlockManager {
    * @throws IOException
    *           if the number of targets < minimum replication.
    * @see BlockPlacementPolicy#chooseTarget(String, int, Node,
-   *      List, boolean, Set, long)
+   *      List, boolean, Set, long, StorageType)
    */
   public DatanodeStorageInfo[] chooseTarget(final String src,
       final int numOfReplicas, final DatanodeDescriptor client,
@@ -1648,7 +1707,7 @@ public class BlockManager {
    * @throws IOException
    */
   public boolean processReport(final DatanodeID nodeID,
-      final DatanodeStorage storage, final String poolId,
+      final DatanodeStorage storage,
       final BlockListAsLongs newReport) throws IOException {
     namesystem.writeLock();
     final long startTime = Time.now(); //after acquiring write lock
@@ -1680,9 +1739,9 @@ public class BlockManager {
       if (storageInfo.numBlocks() == 0) {
         // The first block report can be processed a lot more efficiently than
         // ordinary block reports.  This shortens restart times.
-        processFirstBlockReport(node, storage.getStorageID(), newReport);
+        processFirstBlockReport(storageInfo, newReport);
       } else {
-        processReport(node, storage, newReport);
+        processReport(storageInfo, newReport);
       }
       
       // Now that we have an up-to-date block report, we know that any
@@ -1708,6 +1767,7 @@ public class BlockManager {
     }
     blockLog.info("BLOCK* processReport: from storage " + storage.getStorageID()
         + " node " + nodeID + ", blocks: " + newReport.getNumberOfBlocks()
+        + ", hasStaleStorages: " + node.hasStaleStorages()
         + ", processing time: " + (endTime - startTime) + " msecs");
     return !node.hasStaleStorages();
   }
@@ -1743,9 +1803,8 @@ public class BlockManager {
     }
   }
   
-  private void processReport(final DatanodeDescriptor node,
-      final DatanodeStorage storage,
-      final BlockListAsLongs report) throws IOException {
+  private void processReport(final DatanodeStorageInfo storageInfo,
+                             final BlockListAsLongs report) throws IOException {
     // Normal case:
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
@@ -1755,19 +1814,20 @@ public class BlockManager {
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
-    reportDiff(node, storage, report,
+    reportDiff(storageInfo, report,
         toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-
+   
+    DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     // Process the blocks on each queue
     for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b, node, storage.getStorageID());
+      addStoredBlockUnderConstruction(b, storageInfo);
     }
     for (Block b : toRemove) {
       removeStoredBlock(b, node);
     }
     int numBlocksLogged = 0;
     for (BlockInfo b : toAdd) {
-      addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog);
+      addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -1781,7 +1841,7 @@ public class BlockManager {
       addToInvalidates(b, node);
     }
     for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b, node, storage.getStorageID());
+      markBlockAsCorrupt(b, storageInfo, node);
     }
   }
 
@@ -1792,16 +1852,16 @@ public class BlockManager {
    * a toRemove list (since there won't be any).  It also silently discards 
    * any invalid blocks, thereby deferring their processing until 
    * the next block report.
-   * @param node - DatanodeDescriptor of the node that sent the report
+   * @param storageInfo - DatanodeStorageInfo that sent the report
    * @param report - the initial block report, to be processed
    * @throws IOException 
    */
-  private void processFirstBlockReport(final DatanodeDescriptor node,
-      final String storageID,
+  private void processFirstBlockReport(
+      final DatanodeStorageInfo storageInfo,
       final BlockListAsLongs report) throws IOException {
     if (report == null) return;
     assert (namesystem.hasWriteLock());
-    assert (node.getStorageInfo(storageID).numBlocks() == 0);
+    assert (storageInfo.numBlocks() == 0);
     BlockReportIterator itBR = report.getBlockReportIterator();
 
     while(itBR.hasNext()) {
@@ -1810,7 +1870,7 @@ public class BlockManager {
       
       if (shouldPostponeBlocksFromFuture &&
           namesystem.isGenStampInFuture(iblk)) {
-        queueReportedBlock(node, storageID, iblk, reportedState,
+        queueReportedBlock(storageInfo, iblk, reportedState,
             QUEUE_REASON_FUTURE_GENSTAMP);
         continue;
       }
@@ -1822,15 +1882,16 @@ public class BlockManager {
       // If block is corrupt, mark it and continue to next block.
       BlockUCState ucState = storedBlock.getBlockUCState();
       BlockToMarkCorrupt c = checkReplicaCorrupt(
-          iblk, reportedState, storedBlock, ucState, node);
+          iblk, reportedState, storedBlock, ucState,
+          storageInfo.getDatanodeDescriptor());
       if (c != null) {
         if (shouldPostponeBlocksFromFuture) {
           // In the Standby, we may receive a block report for a file that we
           // just have an out-of-date gen-stamp or state for, for example.
-          queueReportedBlock(node, storageID, iblk, reportedState,
+          queueReportedBlock(storageInfo, iblk, reportedState,
               QUEUE_REASON_CORRUPT_STATE);
         } else {
-          markBlockAsCorrupt(c, node, storageID);
+          markBlockAsCorrupt(c, storageInfo, storageInfo.getDatanodeDescriptor());
         }
         continue;
       }
@@ -1838,7 +1899,7 @@ public class BlockManager {
       // If block is under construction, add this replica to its list
       if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
         ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
-            node.getStorageInfo(storageID), iblk, reportedState);
+            storageInfo, iblk, reportedState);
         // OpenFileBlocks only inside snapshots also will be added to safemode
         // threshold. So we need to update such blocks to safemode
         // refer HDFS-5283
@@ -1851,12 +1912,12 @@ public class BlockManager {
       }      
       //add replica if appropriate
       if (reportedState == ReplicaState.FINALIZED) {
-        addStoredBlockImmediate(storedBlock, node, storageID);
+        addStoredBlockImmediate(storedBlock, storageInfo);
       }
     }
   }
 
-  private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, 
+  private void reportDiff(DatanodeStorageInfo storageInfo, 
       BlockListAsLongs newReport, 
       Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
       Collection<Block> toRemove,           // remove from DatanodeDescriptor
@@ -1864,8 +1925,6 @@ public class BlockManager {
       Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
       Collection<StatefulBlockInfo> toUC) { // add to under-construction list
 
-    final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID());
-
     // place a delimiter in the list which separates blocks 
     // that have been reported from those that have not
     BlockInfo delimiter = new BlockInfo(new Block(), 1);
@@ -1882,7 +1941,7 @@ public class BlockManager {
     while(itBR.hasNext()) {
       Block iblk = itBR.next();
       ReplicaState iState = itBR.getCurrentReplicaState();
-      BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(),
+      BlockInfo storedBlock = processReportedBlock(storageInfo,
           iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
 
       // move block to the head of the list
@@ -1919,7 +1978,7 @@ public class BlockManager {
    * BlockInfoUnderConstruction's list of replicas.</li>
    * </ol>
    * 
-   * @param dn descriptor for the datanode that made the report
+   * @param storageInfo DatanodeStorageInfo that sent the report.
    * @param block reported block replica
    * @param reportedState reported replica state
    * @param toAdd add to DatanodeDescriptor
@@ -1931,14 +1990,16 @@ public class BlockManager {
    * @return the up-to-date stored block, if it should be kept.
    *         Otherwise, null.
    */
-  private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
-      final String storageID,
+  private BlockInfo processReportedBlock(
+      final DatanodeStorageInfo storageInfo,
       final Block block, final ReplicaState reportedState, 
       final Collection<BlockInfo> toAdd, 
       final Collection<Block> toInvalidate, 
       final Collection<BlockToMarkCorrupt> toCorrupt,
       final Collection<StatefulBlockInfo> toUC) {
     
+    DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
+
     if(LOG.isDebugEnabled()) {
       LOG.debug("Reported block " + block
           + " on " + dn + " size " + block.getNumBytes()
@@ -1947,7 +2008,7 @@ public class BlockManager {
   
     if (shouldPostponeBlocksFromFuture &&
         namesystem.isGenStampInFuture(block)) {
-      queueReportedBlock(dn, storageID, block, reportedState,
+      queueReportedBlock(storageInfo, block, reportedState,
           QUEUE_REASON_FUTURE_GENSTAMP);
       return null;
     }
@@ -1968,7 +2029,7 @@ public class BlockManager {
     }
 
     // Ignore replicas already scheduled to be removed from the DN
-    if(invalidateBlocks.contains(dn.getDatanodeUuid(), block)) {
+    if(invalidateBlocks.contains(dn, block)) {
       /*
        * TODO: following assertion is incorrect, see HDFS-2668 assert
        * storedBlock.findDatanode(dn) < 0 : "Block " + block +
@@ -1984,7 +2045,10 @@ public class BlockManager {
         // If the block is an out-of-date generation stamp or state,
         // but we're the standby, we shouldn't treat it as corrupt,
         // but instead just queue it for later processing.
-        queueReportedBlock(dn, storageID, storedBlock, reportedState,
+        // TODO: Pretty confident this should be s/storedBlock/block below,
+        // since we should be postponing the info of the reported block, not
+        // the stored block. See HDFS-6289 for more context.
+        queueReportedBlock(storageInfo, storedBlock, reportedState,
             QUEUE_REASON_CORRUPT_STATE);
       } else {
         toCorrupt.add(c);
@@ -2001,7 +2065,7 @@ public class BlockManager {
     // Add replica if appropriate. If the replica was previously corrupt
     // but now okay, it might need to be updated.
     if (reportedState == ReplicaState.FINALIZED
-        && (storedBlock.findDatanode(dn) < 0
+        && (!storedBlock.findDatanode(dn)
         || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
       toAdd.add(storedBlock);
     }
@@ -2013,17 +2077,17 @@ public class BlockManager {
    * standby node. @see PendingDataNodeMessages.
    * @param reason a textual reason to report in the debug logs
    */
-  private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
+  private void queueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState, String reason) {
     assert shouldPostponeBlocksFromFuture;
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Queueing reported block " + block +
           " in state " + reportedState + 
-          " from datanode " + dn + " for later processing " +
-          "because " + reason + ".");
+          " from datanode " + storageInfo.getDatanodeDescriptor() +
+          " for later processing because " + reason + ".");
     }
-    pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState);
+    pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState);
   }
 
   /**
@@ -2046,7 +2110,7 @@ public class BlockManager {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Processing previouly queued message " + rbi);
       }
-      processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(), 
+      processAndHandleReportedBlock(rbi.getStorageInfo(), 
           rbi.getBlock(), rbi.getReportedState(), null);
     }
   }
@@ -2103,6 +2167,16 @@ public class BlockManager {
         } else {
           return null; // not corrupt
         }
+      case UNDER_CONSTRUCTION:
+        if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
+          final long reportedGS = reported.getGenerationStamp();
+          return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is "
+              + ucState + " and reported state " + reportedState
+              + ", But reported genstamp " + reportedGS
+              + " does not match genstamp in block map "
+              + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
+        }
+        return null;
       default:
         return null;
       }
@@ -2166,19 +2240,20 @@ public class BlockManager {
   }
 
   void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
-      DatanodeDescriptor node, String storageID) throws IOException {
+      DatanodeStorageInfo storageInfo) throws IOException {
     BlockInfoUnderConstruction block = ucBlock.storedBlock;
-    block.addReplicaIfNotPresent(node.getStorageInfo(storageID),
-        ucBlock.reportedBlock, ucBlock.reportedState);
+    block.addReplicaIfNotPresent(
+        storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
 
-    if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
-      addStoredBlock(block, node, storageID, null, true);
+    if (ucBlock.reportedState == ReplicaState.FINALIZED &&
+        !block.findDatanode(storageInfo.getDatanodeDescriptor())) {
+      addStoredBlock(block, storageInfo, null, true);
     }
   } 
 
   /**
    * Faster version of
-   * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)}
+   * {@link #addStoredBlock(BlockInfo, DatanodeStorageInfo, DatanodeDescriptor, boolean)}
    * , intended for use with initial block report at startup. If not in startup
    * safe mode, will call standard addStoredBlock(). Assumes this method is
    * called "immediately" so there is no need to refresh the storedBlock from
@@ -2189,17 +2264,17 @@ public class BlockManager {
    * @throws IOException
    */
   private void addStoredBlockImmediate(BlockInfo storedBlock,
-      DatanodeDescriptor node, String storageID)
+      DatanodeStorageInfo storageInfo)
   throws IOException {
     assert (storedBlock != null && namesystem.hasWriteLock());
     if (!namesystem.isInStartupSafeMode() 
         || namesystem.isPopulatingReplQueues()) {
-      addStoredBlock(storedBlock, node, storageID, null, false);
+      addStoredBlock(storedBlock, storageInfo, null, false);
       return;
     }
 
     // just add it
-    node.addBlock(storageID, storedBlock);
+    storageInfo.addBlock(storedBlock);
 
     // Now check for completion of blocks and safe block count
     int numCurrentReplica = countLiveNodes(storedBlock);
@@ -2221,13 +2296,13 @@ public class BlockManager {
    * @return the block that is stored in blockMap.
    */
   private Block addStoredBlock(final BlockInfo block,
-                               DatanodeDescriptor node,
-                               String storageID,
+                               DatanodeStorageInfo storageInfo,
                                DatanodeDescriptor delNodeHint,
                                boolean logEveryBlock)
   throws IOException {
     assert block != null && namesystem.hasWriteLock();
     BlockInfo storedBlock;
+    DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     if (block instanceof BlockInfoUnderConstruction) {
       //refresh our copy in case the block got completed in another thread
       storedBlock = blocksMap.getStoredBlock(block);
@@ -2243,12 +2318,11 @@ public class BlockManager {
       // it will happen in next block report otherwise.
       return block;
     }
-    assert storedBlock != null : "Block must be stored by now";
     BlockCollection bc = storedBlock.getBlockCollection();
     assert bc != null : "Block must belong to a file";
 
     // add block to the datanode
-    boolean added = node.addBlock(storageID, storedBlock);
+    boolean added = storageInfo.addBlock(storedBlock);
 
     int curReplicaDelta;
     if (added) {
@@ -2587,7 +2661,7 @@ public class BlockManager {
     if (addedNode == delNodeHint) {
       delNodeHint = null;
     }
-    Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
+    Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
     Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
         .getNodes(block);
     for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
@@ -2607,7 +2681,7 @@ public class BlockManager {
         if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
           // exclude corrupt replicas
           if (corruptNodes == null || !corruptNodes.contains(cur)) {
-            nonExcess.add(cur);
+            nonExcess.add(storage);
           }
         }
       }
@@ -2631,7 +2705,7 @@ public class BlockManager {
    * If no such a node is available,
    * then pick a node with least free space
    */
-  private void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess, 
+  private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess, 
                               Block b, short replication,
                               DatanodeDescriptor addedNode,
                               DatanodeDescriptor delNodeHint,
@@ -2639,28 +2713,33 @@ public class BlockManager {
     assert namesystem.hasWriteLock();
     // first form a rack to datanodes map and
     BlockCollection bc = getBlockCollection(b);
-    final Map<String, List<DatanodeDescriptor>> rackMap
-        = new HashMap<String, List<DatanodeDescriptor>>();
-    final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
-    final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
+
+    final Map<String, List<DatanodeStorageInfo>> rackMap
+        = new HashMap<String, List<DatanodeStorageInfo>>();
+    final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
+    final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
     
     // split nodes into two sets
     // moreThanOne contains nodes on rack with more than one replica
     // exactlyOne contains the remaining nodes
-    replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne,
-        exactlyOne);
+    replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
     
     // pick one node to delete that favors the delete hint
     // otherwise pick one with least space from priSet if it is not empty
     // otherwise one node with least space from remains
     boolean firstOne = true;
+    final DatanodeStorageInfo delNodeHintStorage
+        = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
+    final DatanodeStorageInfo addedNodeStorage
+        = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode);
     while (nonExcess.size() - replication > 0) {
       // check if we can delete delNodeHint
-      final DatanodeInfo cur;
-      if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
-          && (moreThanOne.contains(delNodeHint)
-              || (addedNode != null && !moreThanOne.contains(addedNode))) ) {
-        cur = delNodeHint;
+      final DatanodeStorageInfo cur;
+      if (firstOne && delNodeHintStorage != null
+          && (moreThanOne.contains(delNodeHintStorage)
+              || (addedNodeStorage != null
+                  && !moreThanOne.contains(addedNodeStorage)))) {
+        cur = delNodeHintStorage;
       } else { // regular excessive replica removal
         cur = replicator.chooseReplicaToDelete(bc, b, replication,
         		moreThanOne, exactlyOne);
@@ -2672,7 +2751,7 @@ public class BlockManager {
           exactlyOne, cur);
 
       nonExcess.remove(cur);
-      addToExcessReplicate(cur, b);
+      addToExcessReplicate(cur.getDatanodeDescriptor(), b);
 
       //
       // The 'excessblocks' tracks blocks until we get confirmation
@@ -2683,7 +2762,7 @@ public class BlockManager {
       // should be deleted.  Items are removed from the invalidate list
       // upon giving instructions to the namenode.
       //
-      addToInvalidates(b, cur);
+      addToInvalidates(b, cur.getDatanodeDescriptor());
       blockLog.info("BLOCK* chooseExcessReplicates: "
                 +"("+cur+", "+b+") is added to invalidated blocks set");
     }
@@ -2772,12 +2851,15 @@ public class BlockManager {
     } else {
       final String[] datanodeUuids = new String[locations.size()];
       final String[] storageIDs = new String[datanodeUuids.length];
+      final StorageType[] storageTypes = new StorageType[datanodeUuids.length];
       for(int i = 0; i < locations.size(); i++) {
         final DatanodeStorageInfo s = locations.get(i);
         datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid();
         storageIDs[i] = s.getStorageID();
+        storageTypes[i] = s.getStorageType();
       }
-      results.add(new BlockWithLocations(block, datanodeUuids, storageIDs));
+      results.add(new BlockWithLocations(block, datanodeUuids, storageIDs,
+          storageTypes));
       return block.getNumBytes();
     }
   }
@@ -2786,8 +2868,9 @@ public class BlockManager {
    * The given node is reporting that it received a certain block.
    */
   @VisibleForTesting
-  void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint)
+  void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint)
       throws IOException {
+    DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     // Decrement number of blocks scheduled to this datanode.
     // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with 
     // RECEIVED_BLOCK), we currently also decrease the approximate number. 
@@ -2807,12 +2890,12 @@ public class BlockManager {
     // Modify the blocks->datanode map and node's map.
     //
     pendingReplications.decrement(block, node);
-    processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED,
+    processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
         delHintNode);
   }
   
-  private void processAndHandleReportedBlock(DatanodeDescriptor node,
-      String storageID, Block block,
+  private void processAndHandleReportedBlock(
+      DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState, DatanodeDescriptor delHintNode)
       throws IOException {
     // blockReceived reports a finalized block
@@ -2820,7 +2903,9 @@ public class BlockManager {
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
-    processReportedBlock(node, storageID, block, reportedState,
+    final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
+
+    processReportedBlock(storageInfo, block, reportedState,
                               toAdd, toInvalidate, toCorrupt, toUC);
     // the block is only in one of the to-do lists
     // if it is in none then data-node already has it
@@ -2828,11 +2913,11 @@ public class BlockManager {
       : "The block should be only in one of the lists.";
 
     for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b, node, storageID);
+      addStoredBlockUnderConstruction(b, storageInfo);
     }
     long numBlocksLogged = 0;
     for (BlockInfo b : toAdd) {
-      addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog);
+      addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -2846,7 +2931,7 @@ public class BlockManager {
       addToInvalidates(b, node);
     }
     for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b, node, storageID);
+      markBlockAsCorrupt(b, storageInfo, node);
     }
   }
 
@@ -2873,13 +2958,15 @@ public class BlockManager {
           "Got incremental block report from unregistered or dead node");
     }
 
-    if (node.getStorageInfo(srdb.getStorage().getStorageID()) == null) {
+    DatanodeStorageInfo storageInfo =
+        node.getStorageInfo(srdb.getStorage().getStorageID());
+    if (storageInfo == null) {
       // The DataNode is reporting an unknown storage. Usually the NN learns
       // about new storages from heartbeats but during NN restart we may
       // receive a block report or incremental report before the heartbeat.
       // We must handle this for protocol compatibility. This issue was
       // uncovered by HDFS-6094.
-      node.updateStorage(srdb.getStorage());
+      storageInfo = node.updateStorage(srdb.getStorage());
     }
 
     for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
@@ -2889,14 +2976,13 @@ public class BlockManager {
         deleted++;
         break;
       case RECEIVED_BLOCK:
-        addBlock(node, srdb.getStorage().getStorageID(),
-            rdbi.getBlock(), rdbi.getDelHints());
+        addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints());
         received++;
         break;
       case RECEIVING_BLOCK:
         receiving++;
-        processAndHandleReportedBlock(node, srdb.getStorage().getStorageID(),
-            rdbi.getBlock(), ReplicaState.RBW, null);
+        processAndHandleReportedBlock(storageInfo, rdbi.getBlock(),
+                                      ReplicaState.RBW, null);
         break;
       default:
         String msg = 
@@ -2999,10 +3085,14 @@ public class BlockManager {
   
   /**
    * On stopping decommission, check if the node has excess replicas.
-   * If there are any excess replicas, call processOverReplicatedBlock()
+   * If there are any excess replicas, call processOverReplicatedBlock().
+   * Process over replicated blocks only when active NN is out of safe mode.
    */
   void processOverReplicatedBlocksOnReCommission(
       final DatanodeDescriptor srcNode) {
+    if (!namesystem.isPopulatingReplQueues()) {
+      return;
+    }
     final Iterator<? extends Block> it = srcNode.getBlockIterator();
     int numOverReplicated = 0;
     while(it.hasNext()) {
@@ -3068,11 +3158,13 @@ public class BlockManager {
             }
           }
           if (!neededReplications.contains(block) &&
-            pendingReplications.getNumReplicas(block) == 0) {
+            pendingReplications.getNumReplicas(block) == 0 &&
+            namesystem.isPopulatingReplQueues()) {
             //
             // These blocks have been reported from the datanode
             // after the startDecommission method has been executed. These
             // blocks were in flight when the decommissioning was started.
+            // Process these blocks only when active NN is out of safe mode.
             //
             neededReplications.add(block,
                                    curReplicas,
@@ -3082,6 +3174,15 @@ public class BlockManager {
         }
       }
     }
+
+    if (!status && !srcNode.isAlive) {
+      LOG.warn("srcNode " + srcNode + " is dead " +
+          "when decommission is in progress. Continue to mark " +
+          "it as decommission in progress. In that way, when it rejoins the " +
+          "cluster it can continue the decommission process.");
+      status = true;
+    }
+
     srcNode.decommissioningStatus.set(underReplicatedBlocks,
         decommissionOnlyReplicas, 
         underReplicatedInOpenFiles);
@@ -3186,9 +3287,8 @@ public class BlockManager {
    *
    * @return number of blocks scheduled for removal during this iteration.
    */
-  private int invalidateWorkForOneNode(String nodeId) {
+  private int invalidateWorkForOneNode(DatanodeInfo dn) {
     final List<Block> toInvalidate;
-    final DatanodeDescriptor dn;
     
     namesystem.writeLock();
     try {
@@ -3197,15 +3297,13 @@ public class BlockManager {
         LOG.debug("In safemode, not computing replication work");
         return 0;
       }
-      // get blocks to invalidate for the nodeId
-      assert nodeId != null;
-      dn = datanodeManager.getDatanode(nodeId);
-      if (dn == null) {
-        invalidateBlocks.remove(nodeId);
-        return 0;
-      }
-      toInvalidate = invalidateBlocks.invalidateWork(nodeId, dn);
-      if (toInvalidate == null) {
+      try {
+        toInvalidate = invalidateBlocks.invalidateWork(datanodeManager.getDatanode(dn));
+        
+        if (toInvalidate == null) {
+          return 0;
+        }
+      } catch(UnregisteredNodeException une) {
         return 0;
       }
     } finally {
@@ -3287,12 +3385,7 @@ public class BlockManager {
   }
 
   public int getCapacity() {
-    namesystem.readLock();
-    try {
-      return blocksMap.getCapacity();
-    } finally {
-      namesystem.readUnlock();
-    }
+    return blocksMap.getCapacity();
   }
   
   /**
@@ -3344,8 +3437,11 @@ public class BlockManager {
     public void run() {
       while (namesystem.isRunning()) {
         try {
-          computeDatanodeWork();
-          processPendingReplications();
+          // Process replication work only when active NN is out of safe mode.
+          if (namesystem.isPopulatingReplQueues()) {
+            computeDatanodeWork();
+            processPendingReplications();
+          }
           Thread.sleep(replicationRecheckInterval);
         } catch (Throwable t) {
           if (!namesystem.isRunning()) {
@@ -3373,7 +3469,6 @@ public class BlockManager {
    * heartbeat.
    * 
    * @return number of blocks scheduled for replication or removal.
-   * @throws IOException
    */
   int computeDatanodeWork() {
     // Blocks should not be replicated or removed if in safe mode.

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Tue Aug 19 23:49:39 2014
@@ -61,7 +61,7 @@ public abstract class BlockPlacementPoli
    * @param srcPath the file to which this chooseTargets is being invoked.
    * @param numOfReplicas additional number of replicas wanted.
    * @param writer the writer's machine, null if not in the cluster.
-   * @param chosenNodes datanodes that have been chosen as targets.
+   * @param chosen datanodes that have been chosen as targets.
    * @param returnChosenNodes decide if the chosenNodes are returned.
    * @param excludedNodes datanodes that should not be considered as targets.
    * @param blocksize size of the data to be written.
@@ -78,8 +78,8 @@ public abstract class BlockPlacementPoli
                                              StorageType storageType);
   
   /**
-   * Same as {@link #chooseTarget(String, int, Node, List, boolean, 
-   * Set, long)} with added parameter {@code favoredDatanodes}
+   * Same as {@link #chooseTarget(String, int, Node, Set, long, List, StorageType)}
+   * with added parameter {@code favoredDatanodes}
    * @param favoredNodes datanodes that should be favored as targets. This
    *          is only a hint and due to cluster state, namenode may not be 
    *          able to place the blocks on these datanodes.
@@ -124,11 +124,12 @@ public abstract class BlockPlacementPoli
                    listed in the previous parameter.
    * @return the replica that is the best candidate for deletion
    */
-  abstract public DatanodeDescriptor chooseReplicaToDelete(BlockCollection srcBC,
-                                      Block block, 
-                                      short replicationFactor,
-                                      Collection<DatanodeDescriptor> existingReplicas,
-                                      Collection<DatanodeDescriptor> moreExistingReplicas);
+  abstract public DatanodeStorageInfo chooseReplicaToDelete(
+      BlockCollection srcBC,
+      Block block, 
+      short replicationFactor,
+      Collection<DatanodeStorageInfo> existingReplicas,
+      Collection<DatanodeStorageInfo> moreExistingReplicas);
 
   /**
    * Used to setup a BlockPlacementPolicy object. This should be defined by 
@@ -139,11 +140,13 @@ public abstract class BlockPlacementPoli
    * @param clusterMap cluster topology
    */
   abstract protected void initialize(Configuration conf,  FSClusterStats stats, 
-                                     NetworkTopology clusterMap);
+                                     NetworkTopology clusterMap, 
+                                     Host2NodesMap host2datanodeMap);
     
   /**
    * Get an instance of the configured Block Placement Policy based on the
-   * the configuration property {@link DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}.
+   * the configuration property
+   * {@link  DFSConfigKeys#DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}.
    * 
    * @param conf the configuration to be used
    * @param stats an object that is used to retrieve the load on the cluster
@@ -152,14 +155,15 @@ public abstract class BlockPlacementPoli
    */
   public static BlockPlacementPolicy getInstance(Configuration conf, 
                                                  FSClusterStats stats,
-                                                 NetworkTopology clusterMap) {
+                                                 NetworkTopology clusterMap,
+                                                 Host2NodesMap host2datanodeMap) {
     final Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass(
         DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
         DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
         BlockPlacementPolicy.class);
     final BlockPlacementPolicy replicator = ReflectionUtils.newInstance(
         replicatorClass, conf);
-    replicator.initialize(conf, stats, clusterMap);
+    replicator.initialize(conf, stats, clusterMap, host2datanodeMap);
     return replicator;
   }
   
@@ -172,21 +176,23 @@ public abstract class BlockPlacementPoli
    * @param exactlyOne The List of replica nodes on rack with only one replica
    * @param cur current replica to remove
    */
-  public void adjustSetsWithChosenReplica(final Map<String, 
-      List<DatanodeDescriptor>> rackMap,
-      final List<DatanodeDescriptor> moreThanOne,
-      final List<DatanodeDescriptor> exactlyOne, final DatanodeInfo cur) {
+  public void adjustSetsWithChosenReplica(
+      final Map<String, List<DatanodeStorageInfo>> rackMap,
+      final List<DatanodeStorageInfo> moreThanOne,
+      final List<DatanodeStorageInfo> exactlyOne,
+      final DatanodeStorageInfo cur) {
     
-    String rack = getRack(cur);
-    final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
-    datanodes.remove(cur);
-    if (datanodes.isEmpty()) {
+    final String rack = getRack(cur.getDatanodeDescriptor());
+    final List<DatanodeStorageInfo> storages = rackMap.get(rack);
+    storages.remove(cur);
+    if (storages.isEmpty()) {
       rackMap.remove(rack);
     }
     if (moreThanOne.remove(cur)) {
-      if (datanodes.size() == 1) {
-        moreThanOne.remove(datanodes.get(0));
-        exactlyOne.add(datanodes.get(0));
+      if (storages.size() == 1) {
+        final DatanodeStorageInfo remaining = storages.get(0);
+        moreThanOne.remove(remaining);
+        exactlyOne.add(remaining);
       }
     } else {
       exactlyOne.remove(cur);
@@ -195,7 +201,6 @@ public abstract class BlockPlacementPoli
 
   /**
    * Get rack string from a data node
-   * @param datanode
    * @return rack of data node
    */
   protected String getRack(final DatanodeInfo datanode) {
@@ -206,34 +211,34 @@ public abstract class BlockPlacementPoli
    * Split data nodes into two sets, one set includes nodes on rack with
    * more than one  replica, the other set contains the remaining nodes.
    * 
-   * @param dataNodes
+   * @param dataNodes datanodes to be split into two sets
    * @param rackMap a map from rack to datanodes
    * @param moreThanOne contains nodes on rack with more than one replica
    * @param exactlyOne remains contains the remaining nodes
    */
   public void splitNodesWithRack(
-      Collection<DatanodeDescriptor> dataNodes,
-      final Map<String, List<DatanodeDescriptor>> rackMap,
-      final List<DatanodeDescriptor> moreThanOne,
-      final List<DatanodeDescriptor> exactlyOne) {
-    for(DatanodeDescriptor node : dataNodes) {
-      final String rackName = getRack(node);
-      List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
-      if (datanodeList == null) {
-        datanodeList = new ArrayList<DatanodeDescriptor>();
-        rackMap.put(rackName, datanodeList);
+      final Iterable<DatanodeStorageInfo> storages,
+      final Map<String, List<DatanodeStorageInfo>> rackMap,
+      final List<DatanodeStorageInfo> moreThanOne,
+      final List<DatanodeStorageInfo> exactlyOne) {
+    for(DatanodeStorageInfo s: storages) {
+      final String rackName = getRack(s.getDatanodeDescriptor());
+      List<DatanodeStorageInfo> storageList = rackMap.get(rackName);
+      if (storageList == null) {
+        storageList = new ArrayList<DatanodeStorageInfo>();
+        rackMap.put(rackName, storageList);
       }
-      datanodeList.add(node);
+      storageList.add(s);
     }
     
     // split nodes into two sets
-    for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
-      if (datanodeList.size() == 1) {
+    for(List<DatanodeStorageInfo> storageList : rackMap.values()) {
+      if (storageList.size() == 1) {
         // exactlyOne contains nodes on rack with only one replica
-        exactlyOne.add(datanodeList.get(0));
+        exactlyOne.add(storageList.get(0));
       } else {
         // moreThanOne contains nodes on rack with more than one replica
-        moreThanOne.addAll(datanodeList);
+        moreThanOne.addAll(storageList);
       }
     }
   }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Tue Aug 19 23:49:39 2014
@@ -70,6 +70,7 @@ public class BlockPlacementPolicyDefault
   protected boolean considerLoad; 
   private boolean preferLocalNode = true;
   protected NetworkTopology clusterMap;
+  protected Host2NodesMap host2datanodeMap;
   private FSClusterStats stats;
   protected long heartbeatInterval;   // interval for DataNode heartbeats
   private long staleInterval;   // interval used to identify stale DataNodes
@@ -80,8 +81,9 @@ public class BlockPlacementPolicyDefault
   protected int tolerateHeartbeatMultiplier;
 
   protected BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
-                           NetworkTopology clusterMap) {
-    initialize(conf, stats, clusterMap);
+                           NetworkTopology clusterMap, 
+                           Host2NodesMap host2datanodeMap) {
+    initialize(conf, stats, clusterMap, host2datanodeMap);
   }
 
   protected BlockPlacementPolicyDefault() {
@@ -89,11 +91,13 @@ public class BlockPlacementPolicyDefault
     
   @Override
   public void initialize(Configuration conf,  FSClusterStats stats,
-                         NetworkTopology clusterMap) {
+                         NetworkTopology clusterMap, 
+                         Host2NodesMap host2datanodeMap) {
     this.considerLoad = conf.getBoolean(
         DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
     this.stats = stats;
     this.clusterMap = clusterMap;
+    this.host2datanodeMap = host2datanodeMap;
     this.heartbeatInterval = conf.getLong(
         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000;
@@ -141,14 +145,14 @@ public class BlockPlacementPolicyDefault
       List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
       boolean avoidStaleNodes = stats != null
           && stats.isAvoidingStaleDataNodesForWrite();
-      for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) {
+      for (int i = 0; i < favoredNodes.size() && results.size() < numOfReplicas; i++) {
         DatanodeDescriptor favoredNode = favoredNodes.get(i);
         // Choose a single node which is local to favoredNode.
         // 'results' is updated within chooseLocalNode
         final DatanodeStorageInfo target = chooseLocalStorage(favoredNode,
             favoriteAndExcludedNodes, blocksize, 
             getMaxNodesPerRack(results.size(), numOfReplicas)[1],
-            results, avoidStaleNodes, storageType);
+            results, avoidStaleNodes, storageType, false);
         if (target == null) {
           LOG.warn("Could not find a target for file " + src
               + " with favored node " + favoredNode); 
@@ -267,7 +271,7 @@ public class BlockPlacementPolicyDefault
     try {
       if (numOfResults == 0) {
         writer = chooseLocalStorage(writer, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes, storageType)
+            maxNodesPerRack, results, avoidStaleNodes, storageType, true)
                 .getDatanodeDescriptor();
         if (--numOfReplicas == 0) {
           return writer;
@@ -341,12 +345,14 @@ public class BlockPlacementPolicyDefault
                                              int maxNodesPerRack,
                                              List<DatanodeStorageInfo> results,
                                              boolean avoidStaleNodes,
-                                             StorageType storageType)
+                                             StorageType storageType,
+                                             boolean fallbackToLocalRack)
       throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
-    if (localMachine == null)
+    if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
           maxNodesPerRack, results, avoidStaleNodes, storageType);
+    }
     if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
       DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
       // otherwise try local machine first
@@ -359,7 +365,11 @@ public class BlockPlacementPolicyDefault
           }
         }
       } 
-    }      
+    }
+
+    if (!fallbackToLocalRack) {
+      return null;
+    }
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, blocksize,
         maxNodesPerRack, results, avoidStaleNodes, storageType);
@@ -632,15 +642,11 @@ public class BlockPlacementPolicyDefault
 
     // check the communication traffic of the target machine
     if (considerLoad) {
-      double avgLoad = 0;
-      if (stats != null) {
-        int size = stats.getNumDatanodesInService();
-        if (size != 0) {
-          avgLoad = (double)stats.getTotalLoad()/size;
-        }
-      }
-      if (node.getXceiverCount() > (2.0 * avgLoad)) {
-        logNodeIsNotChosen(storage, "the node is too busy ");
+      final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
+      final int nodeLoad = node.getXceiverCount();
+      if (nodeLoad > maxLoad) {
+        logNodeIsNotChosen(storage,
+            "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") ");
         return false;
       }
     }
@@ -723,31 +729,34 @@ public class BlockPlacementPolicyDefault
   }
 
   @Override
-  public DatanodeDescriptor chooseReplicaToDelete(BlockCollection bc,
+  public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc,
       Block block, short replicationFactor,
-      Collection<DatanodeDescriptor> first,
-      Collection<DatanodeDescriptor> second) {
+      Collection<DatanodeStorageInfo> first,
+      Collection<DatanodeStorageInfo> second) {
     long oldestHeartbeat =
       now() - heartbeatInterval * tolerateHeartbeatMultiplier;
-    DatanodeDescriptor oldestHeartbeatNode = null;
+    DatanodeStorageInfo oldestHeartbeatStorage = null;
     long minSpace = Long.MAX_VALUE;
-    DatanodeDescriptor minSpaceNode = null;
+    DatanodeStorageInfo minSpaceStorage = null;
 
     // Pick the node with the oldest heartbeat or with the least free space,
     // if all hearbeats are within the tolerable heartbeat interval
-    for(DatanodeDescriptor node : pickupReplicaSet(first, second)) {
+    for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) {
+      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       long free = node.getRemaining();
       long lastHeartbeat = node.getLastUpdate();
       if(lastHeartbeat < oldestHeartbeat) {
         oldestHeartbeat = lastHeartbeat;
-        oldestHeartbeatNode = node;
+        oldestHeartbeatStorage = storage;
       }
       if (minSpace > free) {
         minSpace = free;
-        minSpaceNode = node;
+        minSpaceStorage = storage;
       }
     }
-    return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode;
+
+    return oldestHeartbeatStorage != null? oldestHeartbeatStorage
+        : minSpaceStorage;
   }
 
   /**
@@ -756,9 +765,9 @@ public class BlockPlacementPolicyDefault
    * replica while second set contains remaining replica nodes.
    * So pick up first set if not empty. If first is empty, then pick second.
    */
-  protected Collection<DatanodeDescriptor> pickupReplicaSet(
-      Collection<DatanodeDescriptor> first,
-      Collection<DatanodeDescriptor> second) {
+  protected Collection<DatanodeStorageInfo> pickupReplicaSet(
+      Collection<DatanodeStorageInfo> first,
+      Collection<DatanodeStorageInfo> second) {
     return first.isEmpty() ? second : first;
   }
   

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Tue Aug 19 23:49:39 2014
@@ -47,8 +47,8 @@ import org.apache.hadoop.net.NodeBase;
 public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault {
 
   protected BlockPlacementPolicyWithNodeGroup(Configuration conf,  FSClusterStats stats,
-      NetworkTopology clusterMap) {
-    initialize(conf, stats, clusterMap);
+      NetworkTopology clusterMap, DatanodeManager datanodeManager) {
+    initialize(conf, stats, clusterMap, host2datanodeMap);
   }
 
   protected BlockPlacementPolicyWithNodeGroup() {
@@ -56,8 +56,9 @@ public class BlockPlacementPolicyWithNod
 
   @Override
   public void initialize(Configuration conf,  FSClusterStats stats,
-          NetworkTopology clusterMap) {
-    super.initialize(conf, stats, clusterMap);
+          NetworkTopology clusterMap, 
+          Host2NodesMap host2datanodeMap) {
+    super.initialize(conf, stats, clusterMap, host2datanodeMap);
   }
 
   /** choose local node of localMachine as the target.
@@ -69,7 +70,8 @@ public class BlockPlacementPolicyWithNod
   protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
       Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
       List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
-      StorageType storageType) throws NotEnoughReplicasException {
+      StorageType storageType, boolean fallbackToLocalRack
+      ) throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
@@ -96,6 +98,10 @@ public class BlockPlacementPolicyWithNod
     if (chosenStorage != null) {
       return chosenStorage;
     }
+
+    if (!fallbackToLocalRack) {
+      return null;
+    }
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, 
         blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
@@ -241,6 +247,36 @@ public class BlockPlacementPolicyWithNod
         countOfExcludedNodes++;
       }
     }
+    
+    countOfExcludedNodes += addDependentNodesToExcludedNodes(
+        chosenNode, excludedNodes);
+    return countOfExcludedNodes;
+  }
+  
+  /**
+   * Add all nodes from a dependent nodes list to excludedNodes.
+   * @return number of new excluded nodes
+   */
+  private int addDependentNodesToExcludedNodes(DatanodeDescriptor chosenNode,
+      Set<Node> excludedNodes) {
+    if (this.host2datanodeMap == null) {
+      return 0;
+    }
+    int countOfExcludedNodes = 0;
+    for(String hostname : chosenNode.getDependentHostNames()) {
+      DatanodeDescriptor node =
+          this.host2datanodeMap.getDataNodeByHostName(hostname);
+      if(node!=null) {
+        if (excludedNodes.add(node)) {
+          countOfExcludedNodes++;
+        }
+      } else {
+        LOG.warn("Not able to find datanode " + hostname
+            + " which has dependency with datanode "
+            + chosenNode.getHostName());
+      }
+    }
+    
     return countOfExcludedNodes;
   }
 
@@ -255,9 +291,9 @@ public class BlockPlacementPolicyWithNod
    * If first is empty, then pick second.
    */
   @Override
-  public Collection<DatanodeDescriptor> pickupReplicaSet(
-      Collection<DatanodeDescriptor> first,
-      Collection<DatanodeDescriptor> second) {
+  public Collection<DatanodeStorageInfo> pickupReplicaSet(
+      Collection<DatanodeStorageInfo> first,
+      Collection<DatanodeStorageInfo> second) {
     // If no replica within same rack, return directly.
     if (first.isEmpty()) {
       return second;
@@ -265,25 +301,24 @@ public class BlockPlacementPolicyWithNod
     // Split data nodes in the first set into two sets, 
     // moreThanOne contains nodes on nodegroup with more than one replica
     // exactlyOne contains the remaining nodes
-    Map<String, List<DatanodeDescriptor>> nodeGroupMap = 
-        new HashMap<String, List<DatanodeDescriptor>>();
+    Map<String, List<DatanodeStorageInfo>> nodeGroupMap = 
+        new HashMap<String, List<DatanodeStorageInfo>>();
     
-    for(DatanodeDescriptor node : first) {
-      final String nodeGroupName = 
-          NetworkTopology.getLastHalf(node.getNetworkLocation());
-      List<DatanodeDescriptor> datanodeList = 
-          nodeGroupMap.get(nodeGroupName);
-      if (datanodeList == null) {
-        datanodeList = new ArrayList<DatanodeDescriptor>();
-        nodeGroupMap.put(nodeGroupName, datanodeList);
+    for(DatanodeStorageInfo storage : first) {
+      final String nodeGroupName = NetworkTopology.getLastHalf(
+          storage.getDatanodeDescriptor().getNetworkLocation());
+      List<DatanodeStorageInfo> storageList = nodeGroupMap.get(nodeGroupName);
+      if (storageList == null) {
+        storageList = new ArrayList<DatanodeStorageInfo>();
+        nodeGroupMap.put(nodeGroupName, storageList);
       }
-      datanodeList.add(node);
+      storageList.add(storage);
     }
     
-    final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
-    final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
+    final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
+    final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
     // split nodes into two sets
-    for(List<DatanodeDescriptor> datanodeList : nodeGroupMap.values()) {
+    for(List<DatanodeStorageInfo> datanodeList : nodeGroupMap.values()) {
       if (datanodeList.size() == 1 ) {
         // exactlyOne contains nodes on nodegroup with exactly one replica
         exactlyOne.add(datanodeList.get(0));

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Tue Aug 19 23:49:39 2014
@@ -23,8 +23,8 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.util.GSet;
 import org.apache.hadoop.util.LightWeightGSet;
-import org.apache.hadoop.util.LightWeightGSet.SetIterator;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 
@@ -217,9 +217,14 @@ class BlocksMap {
     BlockInfo currentBlock = blocks.get(newBlock);
     assert currentBlock != null : "the block if not in blocksMap";
     // replace block in data-node lists
-    for(int idx = currentBlock.numNodes()-1; idx >= 0; idx--) {
-      DatanodeDescriptor dn = currentBlock.getDatanode(idx);
-      dn.replaceBlock(currentBlock, newBlock);
+    for (int i = currentBlock.numNodes() - 1; i >= 0; i--) {
+      final DatanodeDescriptor dn = currentBlock.getDatanode(i);
+      final DatanodeStorageInfo storage = currentBlock.findStorageInfo(dn);
+      final boolean removed = storage.removeBlock(currentBlock);
+      Preconditions.checkState(removed, "currentBlock not found.");
+
+      final boolean added = storage.addBlock(newBlock);
+      Preconditions.checkState(added, "newBlock already exists.");
     }
     // replace block in the map itself
     blocks.put(newBlock);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Tue Aug 19 23:49:39 2014
@@ -33,8 +33,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -53,8 +51,11 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.util.GSet;
 import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+;
 
 /**
  * Scans the namesystem, scheduling blocks to be cached as appropriate.
@@ -65,8 +66,8 @@ import com.google.common.base.Preconditi
 @InterfaceAudience.LimitedPrivate({"HDFS"})
 public class CacheReplicationMonitor extends Thread implements Closeable {
 
-  private static final Log LOG =
-      LogFactory.getLog(CacheReplicationMonitor.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CacheReplicationMonitor.class);
 
   private final FSNamesystem namesystem;
 
@@ -103,21 +104,21 @@ public class CacheReplicationMonitor ext
   private final Condition scanFinished;
 
   /**
-   * Whether there are pending CacheManager operations that necessitate a
-   * CacheReplicationMonitor rescan. Protected by the CRM lock.
+   * The number of rescans completed. Used to wait for scans to finish.
+   * Protected by the CacheReplicationMonitor lock.
    */
-  private boolean needsRescan = true;
+  private long completedScanCount = 0;
 
   /**
-   * Whether we are currently doing a rescan. Protected by the CRM lock.
+   * The scan we're currently performing, or -1 if no scan is in progress.
+   * Protected by the CacheReplicationMonitor lock.
    */
-  private boolean isScanning = false;
+  private long curScanCount = -1;
 
   /**
-   * The number of rescans completed. Used to wait for scans to finish.
-   * Protected by the CacheReplicationMonitor lock.
+   * The number of rescans we need to complete.  Protected by the CRM lock.
    */
-  private long scanCount = 0;
+  private long neededScanCount = 0;
 
   /**
    * True if this monitor should terminate. Protected by the CRM lock.
@@ -168,7 +169,7 @@ public class CacheReplicationMonitor ext
               LOG.info("Shutting down CacheReplicationMonitor");
               return;
             }
-            if (needsRescan) {
+            if (completedScanCount < neededScanCount) {
               LOG.info("Rescanning because of pending operations");
               break;
             }
@@ -181,8 +182,6 @@ public class CacheReplicationMonitor ext
             doRescan.await(delta, TimeUnit.MILLISECONDS);
             curTimeMs = Time.monotonicNow();
           }
-          isScanning = true;
-          needsRescan = false;
         } finally {
           lock.unlock();
         }
@@ -193,8 +192,8 @@ public class CacheReplicationMonitor ext
         // Update synchronization-related variables.
         lock.lock();
         try {
-          isScanning = false;
-          scanCount++;
+          completedScanCount = curScanCount;
+          curScanCount = -1;
           scanFinished.signalAll();
         } finally {
           lock.unlock();
@@ -207,7 +206,7 @@ public class CacheReplicationMonitor ext
       LOG.info("Shutting down CacheReplicationMonitor.");
       return;
     } catch (Throwable t) {
-      LOG.fatal("Thread exiting", t);
+      LOG.error("Thread exiting", t);
       terminate(1, t);
     }
   }
@@ -225,16 +224,15 @@ public class CacheReplicationMonitor ext
         "Must not hold the FSN write lock when waiting for a rescan.");
     Preconditions.checkArgument(lock.isHeldByCurrentThread(),
         "Must hold the CRM lock when waiting for a rescan.");
-    if (!needsRescan) {
+    if (neededScanCount <= completedScanCount) {
       return;
     }
     // If no scan is already ongoing, mark the CRM as dirty and kick
-    if (!isScanning) {
+    if (curScanCount < 0) {
       doRescan.signal();
     }
     // Wait until the scan finishes and the count advances
-    final long startCount = scanCount;
-    while ((!shutdown) && (startCount >= scanCount)) {
+    while ((!shutdown) && (completedScanCount < neededScanCount)) {
       try {
         scanFinished.await();
       } catch (InterruptedException e) {
@@ -252,7 +250,14 @@ public class CacheReplicationMonitor ext
   public void setNeedsRescan() {
     Preconditions.checkArgument(lock.isHeldByCurrentThread(),
         "Must hold the CRM lock when setting the needsRescan bit.");
-    this.needsRescan = true;
+    if (curScanCount >= 0) {
+      // If there is a scan in progress, we need to wait for the scan after
+      // that.
+      neededScanCount = curScanCount + 1;
+    } else {
+      // If there is no scan in progress, we need to wait for the next scan.
+      neededScanCount = completedScanCount + 1;
+    }
   }
 
   /**
@@ -281,12 +286,19 @@ public class CacheReplicationMonitor ext
   private void rescan() throws InterruptedException {
     scannedDirectives = 0;
     scannedBlocks = 0;
-    namesystem.writeLock();
     try {
-      if (shutdown) {
-        throw new InterruptedException("CacheReplicationMonitor was " +
-            "shut down.");
+      namesystem.writeLock();
+      try {
+        lock.lock();
+        if (shutdown) {
+          throw new InterruptedException("CacheReplicationMonitor was " +
+              "shut down.");
+        }
+        curScanCount = completedScanCount + 1;
+      } finally {
+        lock.unlock();
       }
+
       resetStatistics();
       rescanCacheDirectives();
       rescanCachedBlockMap();
@@ -316,11 +328,8 @@ public class CacheReplicationMonitor ext
       scannedDirectives++;
       // Skip processing this entry if it has expired
       if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Directive " + directive.getId() + ": the directive " +
-              "expired at " + directive.getExpiryTime() + " (now = " +
-              now + ")");
-        }
+        LOG.debug("Directive {}: the directive expired at {} (now = {})",
+             directive.getId(), directive.getExpiryTime(), now);
         continue;
       }
       String path = directive.getPath();
@@ -329,17 +338,14 @@ public class CacheReplicationMonitor ext
         node = fsDir.getINode(path);
       } catch (UnresolvedLinkException e) {
         // We don't cache through symlinks
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Directive " + directive.getId() +
-              ": got UnresolvedLinkException while resolving path " + path);
-        }
+        LOG.debug("Directive {}: got UnresolvedLinkException while resolving "
+                + "path {}", directive.getId(), path
+        );
         continue;
       }
       if (node == null)  {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Directive " + directive.getId() +
-              ": No inode found at " + path);
-        }
+        LOG.debug("Directive {}: No inode found at {}", directive.getId(),
+            path);
       } else if (node.isDirectory()) {
         INodeDirectory dir = node.asDirectory();
         ReadOnlyList<INode> children = dir
@@ -352,10 +358,8 @@ public class CacheReplicationMonitor ext
       } else if (node.isFile()) {
         rescanFile(directive, node.asFile());
       } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Directive " + directive.getId() + 
-              ": ignoring non-directive, non-file inode " + node);
-        }
+        LOG.debug("Directive {}: ignoring non-directive, non-file inode {} ",
+            directive.getId(), node);
       }
     }
   }
@@ -381,15 +385,13 @@ public class CacheReplicationMonitor ext
     // do not cache this file.
     CachePool pool = directive.getPool();
     if (pool.getBytesNeeded() > pool.getLimit()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("Directive %d: not scanning file %s because " +
-            "bytesNeeded for pool %s is %d, but the pool's limit is %d",
-            directive.getId(),
-            file.getFullPathName(),
-            pool.getPoolName(),
-            pool.getBytesNeeded(),
-            pool.getLimit()));
-      }
+      LOG.debug("Directive {}: not scanning file {} because " +
+          "bytesNeeded for pool {} is {}, but the pool's limit is {}",
+          directive.getId(),
+          file.getFullPathName(),
+          pool.getPoolName(),
+          pool.getBytesNeeded(),
+          pool.getLimit());
       return;
     }
 
@@ -397,11 +399,10 @@ public class CacheReplicationMonitor ext
     for (BlockInfo blockInfo : blockInfos) {
       if (!blockInfo.getBlockUCState().equals(BlockUCState.COMPLETE)) {
         // We don't try to cache blocks that are under construction.
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Directive " + directive.getId() + ": can't cache " +
-              "block " + blockInfo + " because it is in state " +
-              blockInfo.getBlockUCState() + ", not COMPLETE.");
-        }
+        LOG.trace("Directive {}: can't cache block {} because it is in state "
+                + "{}, not COMPLETE.", directive.getId(), blockInfo,
+            blockInfo.getBlockUCState()
+        );
         continue;
       }
       Block block = new Block(blockInfo.getBlockId());
@@ -415,7 +416,7 @@ public class CacheReplicationMonitor ext
         // Update bytesUsed using the current replication levels.
         // Assumptions: we assume that all the blocks are the same length
         // on each datanode.  We can assume this because we're only caching
-        // blocks in state COMMITTED.
+        // blocks in state COMPLETE.
         // Note that if two directives are caching the same block(s), they will
         // both get them added to their bytesCached.
         List<DatanodeDescriptor> cachedOn =
@@ -441,21 +442,16 @@ public class CacheReplicationMonitor ext
           ocblock.setReplicationAndMark(directive.getReplication(), mark);
         }
       }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Directive " + directive.getId() + ": setting replication " +
-                "for block " + blockInfo + " to " + ocblock.getReplication());
-      }
+      LOG.trace("Directive {}: setting replication for block {} to {}",
+          directive.getId(), blockInfo, ocblock.getReplication());
     }
     // Increment the "cached" statistics
     directive.addBytesCached(cachedTotal);
     if (cachedTotal == neededTotal) {
       directive.addFilesCached(1);
     }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Directive " + directive.getId() + ": caching " +
-          file.getFullPathName() + ": " + cachedTotal + "/" + neededTotal +
-          " bytes");
-    }
+    LOG.debug("Directive {}: caching {}: {}/{} bytes", directive.getId(),
+        file.getFullPathName(), cachedTotal, neededTotal);
   }
 
   private String findReasonForNotCaching(CachedBlock cblock, 
@@ -512,11 +508,9 @@ public class CacheReplicationMonitor ext
           iter.hasNext(); ) {
         DatanodeDescriptor datanode = iter.next();
         if (!cblock.isInList(datanode.getCached())) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Block " + cblock.getBlockId() + ": removing from " +
-                "PENDING_UNCACHED for node " + datanode.getDatanodeUuid() +
-                "because the DataNode uncached it.");
-          }
+          LOG.trace("Block {}: removing from PENDING_UNCACHED for node {} "
+              + "because the DataNode uncached it.", cblock.getBlockId(),
+              datanode.getDatanodeUuid());
           datanode.getPendingUncached().remove(cblock);
           iter.remove();
         }
@@ -526,10 +520,8 @@ public class CacheReplicationMonitor ext
       String reason = findReasonForNotCaching(cblock, blockInfo);
       int neededCached = 0;
       if (reason != null) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Block " + cblock.getBlockId() + ": can't cache " +
-              "block because it is " + reason);
-        }
+        LOG.trace("Block {}: can't cache block because it is {}",
+            cblock.getBlockId(), reason);
       } else {
         neededCached = cblock.getReplication();
       }
@@ -541,12 +533,12 @@ public class CacheReplicationMonitor ext
           DatanodeDescriptor datanode = iter.next();
           datanode.getPendingCached().remove(cblock);
           iter.remove();
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Block " + cblock.getBlockId() + ": removing from " +
-                "PENDING_CACHED for node " + datanode.getDatanodeUuid() +
-                "because we already have " + numCached + " cached " +
-                "replicas and we only need " + neededCached);
-          }
+          LOG.trace("Block {}: removing from PENDING_CACHED for node {}"
+                  + "because we already have {} cached replicas and we only" +
+                  " need {}",
+              cblock.getBlockId(), datanode.getDatanodeUuid(), numCached,
+              neededCached
+          );
         }
       }
       if (numCached < neededCached) {
@@ -556,12 +548,11 @@ public class CacheReplicationMonitor ext
           DatanodeDescriptor datanode = iter.next();
           datanode.getPendingUncached().remove(cblock);
           iter.remove();
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Block " + cblock.getBlockId() + ": removing from " +
-                "PENDING_UNCACHED for node " + datanode.getDatanodeUuid() +
-                "because we only have " + numCached + " cached replicas " +
-                "and we need " + neededCached);
-          }
+          LOG.trace("Block {}: removing from PENDING_UNCACHED for node {} "
+                  + "because we only have {} cached replicas and we need " +
+                  "{}", cblock.getBlockId(), datanode.getDatanodeUuid(),
+              numCached, neededCached
+          );
         }
       }
       int neededUncached = numCached -
@@ -581,11 +572,10 @@ public class CacheReplicationMonitor ext
           pendingUncached.isEmpty() &&
           pendingCached.isEmpty()) {
         // we have nothing more to do with this block.
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Block " + cblock.getBlockId() + ": removing from " +
-              "cachedBlocks, since neededCached == 0, and " +
-              "pendingUncached and pendingCached are empty.");
-        }
+        LOG.trace("Block {}: removing from cachedBlocks, since neededCached "
+                + "== 0, and pendingUncached and pendingCached are empty.",
+            cblock.getBlockId()
+        );
         cbIter.remove();
       }
     }
@@ -643,18 +633,14 @@ public class CacheReplicationMonitor ext
     BlockInfo blockInfo = blockManager.
           getStoredBlock(new Block(cachedBlock.getBlockId()));
     if (blockInfo == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Block " + cachedBlock.getBlockId() + ": can't add new " +
-            "cached replicas, because there is no record of this block " +
-            "on the NameNode.");
-      }
+      LOG.debug("Block {}: can't add new cached replicas," +
+          " because there is no record of this block " +
+          "on the NameNode.", cachedBlock.getBlockId());
       return;
     }
     if (!blockInfo.isComplete()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Block " + cachedBlock.getBlockId() + ": can't cache this " +
-            "block, because it is not yet complete.");
-      }
+      LOG.debug("Block {}: can't cache this block, because it is not yet"
+          + " complete.", cachedBlock.getBlockId());
       return;
     }
     // Filter the list of replicas to only the valid targets
@@ -678,7 +664,7 @@ public class CacheReplicationMonitor ext
       if (pendingCached.contains(datanode) || cached.contains(datanode)) {
         continue;
       }
-      long pendingCapacity = datanode.getCacheRemaining();
+      long pendingBytes = 0;
       // Subtract pending cached blocks from effective capacity
       Iterator<CachedBlock> it = datanode.getPendingCached().iterator();
       while (it.hasNext()) {
@@ -686,7 +672,7 @@ public class CacheReplicationMonitor ext
         BlockInfo info =
             blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
         if (info != null) {
-          pendingCapacity -= info.getNumBytes();
+          pendingBytes -= info.getNumBytes();
         }
       }
       it = datanode.getPendingUncached().iterator();
@@ -696,17 +682,17 @@ public class CacheReplicationMonitor ext
         BlockInfo info =
             blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
         if (info != null) {
-          pendingCapacity += info.getNumBytes();
+          pendingBytes += info.getNumBytes();
         }
       }
+      long pendingCapacity = pendingBytes + datanode.getCacheRemaining();
       if (pendingCapacity < blockInfo.getNumBytes()) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Block " + blockInfo.getBlockId() + ": DataNode " +
-              datanode.getDatanodeUuid() + " is not a valid possibility " +
-              "because the block has size " + blockInfo.getNumBytes() + ", but " +
-              "the DataNode only has " + datanode.getCacheRemaining() + " " +
-              "bytes of cache remaining.");
-        }
+        LOG.trace("Block {}: DataNode {} is not a valid possibility " +
+            "because the block has size {}, but the DataNode only has {}" +
+            "bytes of cache remaining ({} pending bytes, {} already cached.",
+            blockInfo.getBlockId(), datanode.getDatanodeUuid(),
+            blockInfo.getNumBytes(), pendingCapacity, pendingBytes,
+            datanode.getCacheRemaining());
         outOfCapacity++;
         continue;
       }
@@ -715,22 +701,20 @@ public class CacheReplicationMonitor ext
     List<DatanodeDescriptor> chosen = chooseDatanodesForCaching(possibilities,
         neededCached, blockManager.getDatanodeManager().getStaleInterval());
     for (DatanodeDescriptor datanode : chosen) {
-      if (LOG.isTraceEnabled()) {
-          LOG.trace("Block " + blockInfo.getBlockId() + ": added to " +
-              "PENDING_CACHED on DataNode " + datanode.getDatanodeUuid());
-      }
+      LOG.trace("Block {}: added to PENDING_CACHED on DataNode {}",
+          blockInfo.getBlockId(), datanode.getDatanodeUuid());
       pendingCached.add(datanode);
       boolean added = datanode.getPendingCached().add(cachedBlock);
       assert added;
     }
     // We were unable to satisfy the requested replication factor
     if (neededCached > chosen.size()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Block " + blockInfo.getBlockId() + ": we only have " +
-            (cachedBlock.getReplication() - neededCached + chosen.size()) +
-            " of " + cachedBlock.getReplication() + " cached replicas.  " +
-            outOfCapacity + " DataNodes have insufficient cache capacity.");
-      }
+      LOG.debug("Block {}: we only have {} of {} cached replicas."
+              + " {} DataNodes have insufficient cache capacity.",
+          blockInfo.getBlockId(),
+          (cachedBlock.getReplication() - neededCached + chosen.size()),
+          cachedBlock.getReplication(), outOfCapacity
+      );
     }
   }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java Tue Aug 19 23:49:39 2014
@@ -48,18 +48,6 @@ public class CorruptReplicasMap{
 
   private final SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
     new TreeMap<Block, Map<DatanodeDescriptor, Reason>>();
-  
-  /**
-   * Mark the block belonging to datanode as corrupt.
-   *
-   * @param blk Block to be added to CorruptReplicasMap
-   * @param dn DatanodeDescriptor which holds the corrupt replica
-   * @param reason a textual reason (for logging purposes)
-   */
-  public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
-      String reason) {
-    addToCorruptReplicasMap(blk, dn, reason, Reason.NONE);
-  }
 
   /**
    * Mark the block belonging to datanode as corrupt.
@@ -69,7 +57,7 @@ public class CorruptReplicasMap{
    * @param reason a textual reason (for logging purposes)
    * @param reasonCode the enum representation of the reason
    */
-  public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
+  void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
       String reason, Reason reasonCode) {
     Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
     if (nodes == null) {
@@ -127,7 +115,6 @@ public class CorruptReplicasMap{
   boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode,
       Reason reason) {
     Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk);
-    boolean removed = false;
     if (datanodes==null)
       return false;
 
@@ -174,12 +161,12 @@ public class CorruptReplicasMap{
     return ((nodes != null) && (nodes.contains(node)));
   }
 
-  public int numCorruptReplicas(Block blk) {
+  int numCorruptReplicas(Block blk) {
     Collection<DatanodeDescriptor> nodes = getNodes(blk);
     return (nodes == null) ? 0 : nodes.size();
   }
   
-  public int size() {
+  int size() {
     return corruptReplicasMap.size();
   }