You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/10/17 00:15:34 UTC

svn commit: r1532924 [1/2] - in /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/datanod...

Author: cmccabe
Date: Wed Oct 16 22:15:33 2013
New Revision: 1532924

URL: http://svn.apache.org/r1532924
Log:
HDFS-5096. Automatically cache new data added to a cached path (contributed by Colin Patrick McCabe)

Added:
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCachedBlocksList.java
Removed:
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateStoredBlocks.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java
Modified:
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt Wed Oct 16 22:15:33 2013
@@ -66,6 +66,9 @@ HDFS-4949 (Unreleased)
     HDFS-5359. Allow LightWeightGSet#Iterator to remove elements.
     (Contributed by Colin Patrick McCabe)
 
+    HDFS-5096. Automatically cache new data added to a cached path.
+    (Contributed by Colin Patrick McCabe)
+
   OPTIMIZATIONS
     HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Oct 16 22:15:33 2013
@@ -205,6 +205,9 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES =
       "dfs.namenode.list.cache.descriptors.num.responses";
   public static final int     DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT = 100;
+  public static final String  DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS =
+      "dfs.namenode.path.based.cache.refresh.interval.ms";
+  public static final long    DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT = 300000L;
 
   // Whether to enable datanode's stale state detection and usage for reads
   public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java Wed Oct 16 22:15:33 2013
@@ -61,17 +61,6 @@ public interface BlockCollection {
   public short getBlockReplication();
 
   /**
-   * Set cache replication factor for the collection
-   */
-  public void setCacheReplication(short cacheReplication);
-
-  /**
-   * Get cache replication factor for the collection
-   * @return cache replication value
-   */
-  public short getCacheReplication();
-
-  /**
    * Get the name of the collection.
    */
   public String getName();

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Wed Oct 16 22:15:33 2013
@@ -85,7 +85,7 @@ public class BlockInfo extends Block imp
     this.bc = bc;
   }
 
-  DatanodeDescriptor getDatanode(int index) {
+  public DatanodeDescriptor getDatanode(int index) {
     assert this.triplets != null : "BlockInfo is not initialized";
     assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
     return (DatanodeDescriptor)triplets[index*3];
@@ -153,7 +153,7 @@ public class BlockInfo extends Block imp
     return info;
   }
 
-  int getCapacity() {
+  public int getCapacity() {
     assert this.triplets != null : "BlockInfo is not initialized";
     assert triplets.length % 3 == 0 : "Malformed BlockInfo";
     return triplets.length / 3;

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Oct 16 22:15:33 2013
@@ -77,13 +77,14 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
  */
 @InterfaceAudience.Private
-public class BlockManager extends ReportProcessor {
+public class BlockManager {
 
   static final Log LOG = LogFactory.getLog(BlockManager.class);
   public static final Log blockLog = NameNode.blockStateChangeLog;
@@ -162,7 +163,7 @@ public class BlockManager extends Report
   final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
 
   /** Blocks to be invalidated. */
-  private final InvalidateStoredBlocks invalidateBlocks;
+  private final InvalidateBlocks invalidateBlocks;
   
   /**
    * After a failover, over-replicated blocks may not be handled
@@ -218,6 +219,7 @@ public class BlockManager extends Report
   final boolean encryptDataTransfer;
   
   // Max number of blocks to log info about during a block report.
+  private final long maxNumBlocksToLog;
 
   /**
    * When running inside a Standby node, the node may receive block reports
@@ -235,11 +237,10 @@ public class BlockManager extends Report
   
   public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
       final Configuration conf) throws IOException {
-    super(conf);
     this.namesystem = namesystem;
     datanodeManager = new DatanodeManager(this, namesystem, conf);
     heartbeatManager = datanodeManager.getHeartbeatManager();
-    invalidateBlocks = new InvalidateStoredBlocks(datanodeManager);
+    invalidateBlocks = new InvalidateBlocks(datanodeManager);
 
     // Compute the map capacity by allocating 2% of total memory
     blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
@@ -299,7 +300,11 @@ public class BlockManager extends Report
     this.encryptDataTransfer =
         conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
             DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
-
+    
+    this.maxNumBlocksToLog =
+        conf.getLong(DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
+            DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
+    
     LOG.info("defaultReplication         = " + defaultReplication);
     LOG.info("maxReplication             = " + maxReplication);
     LOG.info("minReplication             = " + minReplication);
@@ -999,7 +1004,6 @@ public class BlockManager extends Report
    * Adds block to list of blocks which will be invalidated on specified
    * datanode and log the operation
    */
-  @Override  // ReportProcessor
   void addToInvalidates(final Block block, final DatanodeInfo datanode) {
     invalidateBlocks.add(block, datanode, true);
   }
@@ -1045,8 +1049,7 @@ public class BlockManager extends Report
     markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn);
   }
 
-  @Override // ReportProcessor
-  void markBlockAsCorrupt(BlockToMarkCorrupt b,
+  private void markBlockAsCorrupt(BlockToMarkCorrupt b,
                                   DatanodeInfo dn) throws IOException {
     DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
@@ -1056,7 +1059,7 @@ public class BlockManager extends Report
 
     BlockCollection bc = b.corrupted.getBlockCollection();
     if (bc == null) {
-      blockLogInfo("#markBlockAsCorrupt: " + b
+      blockLog.info("BLOCK markBlockAsCorrupt: " + b
           + " cannot be marked as corrupt as it does not belong to any file");
       addToInvalidates(b.corrupted, node);
       return;
@@ -1120,9 +1123,6 @@ public class BlockManager extends Report
     this.shouldPostponeBlocksFromFuture  = postpone;
   }
 
-  public boolean shouldPostponeBlocksFromFuture() {
-    return this.shouldPostponeBlocksFromFuture;
-  }
 
   private void postponeBlock(Block blk) {
     if (postponedMisreplicatedBlocks.add(blk)) {
@@ -1544,6 +1544,61 @@ public class BlockManager extends Report
        */
     }
   }
+  
+  /**
+   * StatefulBlockInfo is used to build the "toUC" list, which is a list of
+   * updates to the information about under-construction blocks.
+   * Besides the block in question, it provides the ReplicaState
+   * reported by the datanode in the block report. 
+   */
+  private static class StatefulBlockInfo {
+    final BlockInfoUnderConstruction storedBlock;
+    final ReplicaState reportedState;
+    
+    StatefulBlockInfo(BlockInfoUnderConstruction storedBlock, 
+        ReplicaState reportedState) {
+      this.storedBlock = storedBlock;
+      this.reportedState = reportedState;
+    }
+  }
+  
+  /**
+   * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
+   * list of blocks that should be considered corrupt due to a block report.
+   */
+  private static class BlockToMarkCorrupt {
+    /** The corrupted block in a datanode. */
+    final BlockInfo corrupted;
+    /** The corresponding block stored in the BlockManager. */
+    final BlockInfo stored;
+    /** The reason to mark corrupt. */
+    final String reason;
+    
+    BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason) {
+      Preconditions.checkNotNull(corrupted, "corrupted is null");
+      Preconditions.checkNotNull(stored, "stored is null");
+
+      this.corrupted = corrupted;
+      this.stored = stored;
+      this.reason = reason;
+    }
+
+    BlockToMarkCorrupt(BlockInfo stored, String reason) {
+      this(stored, stored, reason);
+    }
+
+    BlockToMarkCorrupt(BlockInfo stored, long gs, String reason) {
+      this(new BlockInfo(stored), stored, reason);
+      //the corrupted block in datanode has a different generation stamp
+      corrupted.setGenerationStamp(gs);
+    }
+
+    @Override
+    public String toString() {
+      return corrupted + "("
+          + (corrupted == stored? "same as stored": "stored=" + stored) + ")";
+    }
+  }
 
   /**
    * The given datanode is reporting all its blocks.
@@ -1635,6 +1690,46 @@ public class BlockManager extends Report
     }
   }
   
+  private void processReport(final DatanodeDescriptor node,
+      final BlockListAsLongs report) throws IOException {
+    // Normal case:
+    // Modify the (block-->datanode) map, according to the difference
+    // between the old and new block report.
+    //
+    Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
+    Collection<Block> toRemove = new LinkedList<Block>();
+    Collection<Block> toInvalidate = new LinkedList<Block>();
+    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
+    Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+    reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
+
+    // Process the blocks on each queue
+    for (StatefulBlockInfo b : toUC) { 
+      addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
+    }
+    for (Block b : toRemove) {
+      removeStoredBlock(b, node);
+    }
+    int numBlocksLogged = 0;
+    for (BlockInfo b : toAdd) {
+      addStoredBlock(b, node, null, numBlocksLogged < maxNumBlocksToLog);
+      numBlocksLogged++;
+    }
+    if (numBlocksLogged > maxNumBlocksToLog) {
+      blockLog.info("BLOCK* processReport: logged info for " + maxNumBlocksToLog
+          + " of " + numBlocksLogged + " reported.");
+    }
+    for (Block b : toInvalidate) {
+      blockLog.info("BLOCK* processReport: "
+          + b + " on " + node + " size " + b.getNumBytes()
+          + " does not belong to any file");
+      addToInvalidates(b, node);
+    }
+    for (BlockToMarkCorrupt b : toCorrupt) {
+      markBlockAsCorrupt(b, node);
+    }
+  }
+
   /**
    * processFirstBlockReport is intended only for processing "initial" block
    * reports, the first block report received from a DN after it registers.
@@ -1697,6 +1792,44 @@ public class BlockManager extends Report
     }
   }
 
+  private void reportDiff(DatanodeDescriptor dn, 
+      BlockListAsLongs newReport, 
+      Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
+      Collection<Block> toRemove,           // remove from DatanodeDescriptor
+      Collection<Block> toInvalidate,       // should be removed from DN
+      Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
+      Collection<StatefulBlockInfo> toUC) { // add to under-construction list
+    // place a delimiter in the list which separates blocks 
+    // that have been reported from those that have not
+    BlockInfo delimiter = new BlockInfo(new Block(), 1);
+    boolean added = dn.addBlock(delimiter);
+    assert added : "Delimiting block cannot be present in the node";
+    int headIndex = 0; //currently the delimiter is in the head of the list
+    int curIndex;
+
+    if (newReport == null)
+      newReport = new BlockListAsLongs();
+    // scan the report and process newly reported blocks
+    BlockReportIterator itBR = newReport.getBlockReportIterator();
+    while(itBR.hasNext()) {
+      Block iblk = itBR.next();
+      ReplicaState iState = itBR.getCurrentReplicaState();
+      BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
+                                  toAdd, toInvalidate, toCorrupt, toUC);
+      // move block to the head of the list
+      if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) {
+        headIndex = dn.moveBlockToHead(storedBlock, curIndex, headIndex);
+      }
+    }
+    // collect blocks that have not been reported
+    // all of them are next to the delimiter
+    Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator(
+        delimiter.getNext(0), dn);
+    while(it.hasNext())
+      toRemove.add(it.next());
+    dn.removeBlock(delimiter);
+  }
+
   /**
    * Process a block replica reported by the data-node.
    * No side effects except adding to the passed-in Collections.
@@ -1728,8 +1861,7 @@ public class BlockManager extends Report
    * @return the up-to-date stored block, if it should be kept.
    *         Otherwise, null.
    */
-  @Override // ReportProcessor
-  BlockInfo processReportedBlock(final DatanodeDescriptor dn, 
+  private BlockInfo processReportedBlock(final DatanodeDescriptor dn, 
       final Block block, final ReplicaState reportedState, 
       final Collection<BlockInfo> toAdd, 
       final Collection<Block> toInvalidate, 
@@ -1956,7 +2088,6 @@ assert storedBlock.findDatanode(dn) < 0 
     }
   }
   
-  @Override // ReportProcessor
   void addStoredBlockUnderConstruction(
       BlockInfoUnderConstruction block, 
       DatanodeDescriptor node, 
@@ -2012,8 +2143,7 @@ assert storedBlock.findDatanode(dn) < 0 
    * needed replications if this takes care of the problem.
    * @return the block that is stored in blockMap.
    */
-  @Override // ReportProcessor
-  Block addStoredBlock(final BlockInfo block,
+  private Block addStoredBlock(final BlockInfo block,
                                DatanodeDescriptor node,
                                DatanodeDescriptor delNodeHint,
                                boolean logEveryBlock)
@@ -2028,7 +2158,7 @@ assert storedBlock.findDatanode(dn) < 0 
     }
     if (storedBlock == null || storedBlock.getBlockCollection() == null) {
       // If this block does not belong to anyfile, then we are done.
-      blockLogInfo("#addStoredBlock: " + block + " on "
+      blockLog.info("BLOCK* addStoredBlock: " + block + " on "
           + node + " size " + block.getNumBytes()
           + " but it does not belong to any file");
       // we could add this block to invalidate set of this datanode.
@@ -2050,7 +2180,7 @@ assert storedBlock.findDatanode(dn) < 0 
       }
     } else {
       curReplicaDelta = 0;
-      blockLogWarn("#addStoredBlock: "
+      blockLog.warn("BLOCK* addStoredBlock: "
           + "Redundant addStoredBlock request received for " + storedBlock
           + " on " + node + " size " + storedBlock.getNumBytes());
     }
@@ -2108,6 +2238,20 @@ assert storedBlock.findDatanode(dn) < 0 
     return storedBlock;
   }
 
+  private void logAddStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
+    if (!blockLog.isInfoEnabled()) {
+      return;
+    }
+    
+    StringBuilder sb = new StringBuilder(500);
+    sb.append("BLOCK* addStoredBlock: blockMap updated: ")
+      .append(node)
+      .append(" is added to ");
+    storedBlock.appendStringTo(sb);
+    sb.append(" size " )
+      .append(storedBlock.getNumBytes());
+    blockLog.info(sb);
+  }
   /**
    * Invalidate corrupt replicas.
    * <p>
@@ -2989,6 +3133,13 @@ assert storedBlock.findDatanode(dn) < 0 
         UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
   }
 
+  /**
+   * Get the replicas which are corrupt for a given block.
+   */
+  public Collection<DatanodeDescriptor> getCorruptReplicas(Block block) {
+    return corruptReplicas.getNodes(block);
+  }
+
   /** @return the size of UnderReplicatedBlocks */
   public int numOfUnderReplicatedBlocks() {
     return neededReplications.size();
@@ -3129,21 +3280,4 @@ assert storedBlock.findDatanode(dn) < 0 
   public void shutdown() {
     blocksMap.close();
   }
-
-  @Override // ReportProcessor
-  int moveBlockToHead(DatanodeDescriptor dn, BlockInfo storedBlock,
-      int curIndex, int headIndex) {
-    return dn.moveBlockToHead(storedBlock, curIndex, headIndex);
-  }
-
-  @Override // ReportProcessor
-  boolean addBlock(DatanodeDescriptor dn, BlockInfo block) {
-    return dn.addBlock(block);
-  }
-
-  @Override // ReportProcessor
-  boolean removeBlock(DatanodeDescriptor dn, BlockInfo block) {
-    return dn.removeBlock(block);
-  }
-
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Wed Oct 16 22:15:33 2013
@@ -19,284 +19,435 @@ package org.apache.hadoop.hdfs.server.bl
 
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.Map.Entry;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.Namesystem;
-import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.namenode.CacheManager;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.util.GSet;
+import org.apache.hadoop.util.Time;
 
 /**
- * Periodically computes new replication work. This consists of two tasks:
- * 
- * 1) Assigning blocks in the neededCacheBlocks to datanodes where they will be
- * cached. This moves them to the pendingCacheBlocks list.
- * 
- * 2) Placing caching tasks in pendingCacheBlocks that have timed out
- * back into neededCacheBlocks for reassignment.
+ * Scans the namesystem, scheduling blocks to be cached as appropriate.
+ *
+ * The CacheReplicationMonitor does a full scan when the NameNode first
+ * starts up, and at configurable intervals afterwards.
  */
 @InterfaceAudience.LimitedPrivate({"HDFS"})
-class CacheReplicationMonitor implements Runnable {
+public class CacheReplicationMonitor extends Thread implements Closeable {
 
   private static final Log LOG =
       LogFactory.getLog(CacheReplicationMonitor.class);
 
-  private static final Log blockLog = NameNode.blockStateChangeLog;
+  private final FSNamesystem namesystem;
 
-  private final Namesystem namesystem;
   private final BlockManager blockManager;
-  private final DatanodeManager datanodeManager;
-  private final CacheReplicationManager cacheReplManager;
 
-  private final UncacheBlocks blocksToUncache;
-  private final LightWeightHashSet<Block> neededCacheBlocks;
-  private final PendingReplicationBlocks pendingCacheBlocks;
+  private final CacheManager cacheManager;
+
+  private final GSet<CachedBlock, CachedBlock> cachedBlocks;
+
+  /**
+   * Pseudorandom number source
+   */
+  private final Random random = new Random();
+
+  /**
+   * The interval at which we scan the namesystem for caching changes.
+   */
+  private final long intervalMs;
+
+  /**
+   * True if we should rescan immediately, regardless of how much time
+   * elapsed since the previous scan.
+   */
+  private boolean rescanImmediately;
+
+  /**
+   * The monotonic time at which the current scan started.
+   */
+  private long scanTimeMs;
 
   /**
-   * Re-check period for computing cache replication work
+   * Mark status of the current scan.
    */
-  private final long cacheReplicationRecheckInterval;
+  private boolean mark = false;
 
-  public CacheReplicationMonitor(Namesystem namesystem,
-      BlockManager blockManager, DatanodeManager datanodeManager,
-      CacheReplicationManager cacheReplManager,
-      UncacheBlocks blocksToUncache,
-      LightWeightHashSet<Block> neededCacheBlocks,
-      PendingReplicationBlocks pendingCacheBlocks,
-      Configuration conf) {
+  /**
+   * True if this monitor should terminate.
+   */
+  private boolean shutdown;
+
+  /**
+   * Cache directives found in the previous scan.
+   */
+  private int scannedDirectives;
+
+  /**
+   * Blocks found in the previous scan.
+   */
+  private long scannedBlocks;
+  
+  public CacheReplicationMonitor(FSNamesystem namesystem,
+      CacheManager cacheManager, long intervalMs) {
     this.namesystem = namesystem;
-    this.blockManager = blockManager;
-    this.datanodeManager = datanodeManager;
-    this.cacheReplManager = cacheReplManager;
-
-    this.blocksToUncache = blocksToUncache;
-    this.neededCacheBlocks = neededCacheBlocks;
-    this.pendingCacheBlocks = pendingCacheBlocks;
-
-    this.cacheReplicationRecheckInterval = conf.getInt(
-        DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
-        DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
+    this.blockManager = namesystem.getBlockManager();
+    this.cacheManager = cacheManager;
+    this.cachedBlocks = cacheManager.getCachedBlocks();
+    this.intervalMs = intervalMs;
   }
 
   @Override
   public void run() {
-    LOG.info("CacheReplicationMonitor is starting");
-    while (namesystem.isRunning()) {
-      try {
-        computeCachingWork();
-        processPendingCachingWork();
-        Thread.sleep(cacheReplicationRecheckInterval);
-      } catch (Throwable t) {
-        if (!namesystem.isRunning()) {
-          LOG.info("Stopping CacheReplicationMonitor.");
-          if (!(t instanceof InterruptedException)) {
-            LOG.info("CacheReplicationMonitor received an exception"
-                + " while shutting down.", t);
+    shutdown = false;
+    rescanImmediately = true;
+    scanTimeMs = 0;
+    LOG.info("Starting CacheReplicationMonitor with interval " +
+             intervalMs + " milliseconds");
+    try {
+      long curTimeMs = Time.monotonicNow();
+      while (true) {
+        synchronized(this) {
+          while (true) {
+            if (shutdown) {
+              LOG.info("Shutting down CacheReplicationMonitor");
+              return;
+            }
+            if (rescanImmediately) {
+              LOG.info("Rescanning on request");
+              rescanImmediately = false;
+              break;
+            }
+            long delta = (scanTimeMs + intervalMs) - curTimeMs;
+            if (delta <= 0) {
+              LOG.info("Rescanning after " + (curTimeMs - scanTimeMs) +
+                  " milliseconds");
+              break;
+            }
+            this.wait(delta);
+            curTimeMs = Time.monotonicNow();
           }
-          break;
         }
-        LOG.fatal("ReplicationMonitor thread received Runtime exception. ", t);
-        terminate(1, t);
+        scanTimeMs = curTimeMs;
+        mark = !mark;
+        rescan();
+        curTimeMs = Time.monotonicNow();
+        LOG.info("Scanned " + scannedDirectives + " directive(s) and " +
+            scannedBlocks + " block(s) in " + (curTimeMs - scanTimeMs) + " " +
+            "millisecond(s).");
       }
+    } catch (Throwable t) {
+      LOG.fatal("Thread exiting", t);
+      terminate(1, t);
     }
   }
 
   /**
-   * Assigns under-cached blocks to new datanodes.
+   * Kick the monitor thread.
+   * 
+   * If it is sleeping, it will wake up and start scanning.
+   * If it is currently scanning, it will finish the scan and immediately do 
+   * another one.
    */
-  private void computeCachingWork() {
-    List<Block> blocksToCache = null;
-    namesystem.writeLock();
+  public synchronized void kick() {
+    rescanImmediately = true;
+    this.notifyAll();
+  }
+
+  /**
+   * Shut down and join the monitor thread.
+   */
+  @Override
+  public void close() throws IOException {
+    synchronized(this) {
+      if (shutdown) return;
+      shutdown = true;
+      this.notifyAll();
+    }
     try {
-      synchronized (neededCacheBlocks) {
-        blocksToCache = neededCacheBlocks.pollAll();
+      if (this.isAlive()) {
+        this.join(60000);
       }
-    } finally {
-      namesystem.writeUnlock();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
     }
-    computeCachingWorkForBlocks(blocksToCache);
-    computeUncacheWork();
   }
 
-  private void computeCachingWorkForBlocks(List<Block> blocksToCache) {
-    int requiredRepl, effectiveRepl, additionalRepl;
-    List<DatanodeDescriptor> cachedNodes, storedNodes, targets;
-
-    final HashMap<Block, List<DatanodeDescriptor>> work =
-        new HashMap<Block, List<DatanodeDescriptor>>();
+  private void rescan() {
+    scannedDirectives = 0;
+    scannedBlocks = 0;
     namesystem.writeLock();
     try {
-      synchronized (neededCacheBlocks) {
-        for (Block block: blocksToCache) {
-          // Required number of cached replicas
-          requiredRepl = cacheReplManager.getCacheReplication(block);
-          // Replicas that are safely cached
-          cachedNodes = cacheReplManager.getSafeReplicas(
-              cacheReplManager.cachedBlocksMap, block);
-          // Replicas that are safely stored on disk
-          storedNodes = cacheReplManager.getSafeReplicas(
-              blockManager.blocksMap, block);
-          // "effective" replication factor which includes pending
-          // replication work
-          effectiveRepl = cachedNodes.size()
-              + pendingCacheBlocks.getNumReplicas(block);
-          if (effectiveRepl >= requiredRepl) {
-            neededCacheBlocks.remove(block);
-            blockLog.info("BLOCK* Removing " + block
-                + " from neededCacheBlocks as it has enough cached replicas");
-              continue;
-          }
-          // Choose some replicas to cache if needed
-          additionalRepl = requiredRepl - effectiveRepl;
-          targets = new ArrayList<DatanodeDescriptor>(storedNodes.size());
-          // Only target replicas that aren't already cached.
-          for (DatanodeDescriptor dn: storedNodes) {
-            if (!cachedNodes.contains(dn)) {
-              targets.add(dn);
-            }
-          }
-          if (targets.size() < additionalRepl) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Block " + block + " cannot be cached on additional"
-                  + " nodes because there are no more available datanodes"
-                  + " with the block on disk.");
-            }
-          }
-          targets = CacheReplicationPolicy.chooseTargetsToCache(block, targets,
-              additionalRepl);
-          if (targets.size() < additionalRepl) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Block " + block + " cannot be cached on additional"
-                  + " nodes because there is not sufficient cache space on"
-                  + " available target datanodes.");
-            }
-          }
-          // Continue if we couldn't get more cache targets
-          if (targets.size() == 0) {
-            continue;
-          }
-
-          // Update datanodes and blocks that were scheduled for caching
-          work.put(block, targets);
-          // Schedule caching on the targets
-          for (DatanodeDescriptor target: targets) {
-            target.addBlockToBeCached(block);
-          }
-          // Add block to the pending queue
-          pendingCacheBlocks.increment(block,
-              targets.toArray(new DatanodeDescriptor[] {}));
-          if (blockLog.isDebugEnabled()) {
-            blockLog.debug("BLOCK* block " + block
-                + " is moved from neededCacheBlocks to pendingCacheBlocks");
-          }
-          // Remove from needed queue if it will be fully replicated
-          if (effectiveRepl + targets.size() >= requiredRepl) {
-            neededCacheBlocks.remove(block);
-          }
-        }
-      }
+      rescanPathBasedCacheEntries();
+    } finally {
+      namesystem.writeUnlock();
+    }
+    namesystem.writeLock();
+    try {
+      rescanCachedBlockMap();
     } finally {
       namesystem.writeUnlock();
     }
+  }
 
-    if (blockLog.isInfoEnabled()) {
-      // log which blocks have been scheduled for replication
-      for (Entry<Block, List<DatanodeDescriptor>> item : work.entrySet()) {
-        Block block = item.getKey();
-        List<DatanodeDescriptor> nodes = item.getValue();
-        StringBuilder targetList = new StringBuilder("datanode(s)");
-        for (DatanodeDescriptor node: nodes) {
-          targetList.append(' ');
-          targetList.append(node);
+  /**
+   * Scan all PathBasedCacheEntries.  Use the information to figure out
+   * what cache replication factor each block should have.
+   *
+   * @param mark       Whether the current scan is setting or clearing the mark
+   */
+  private void rescanPathBasedCacheEntries() {
+    FSDirectory fsDir = namesystem.getFSDirectory();
+    for (PathBasedCacheEntry pce : cacheManager.getEntriesById().values()) {
+      scannedDirectives++;
+      String path = pce.getPath();
+      INode node;
+      try {
+        node = fsDir.getINode(path);
+      } catch (UnresolvedLinkException e) {
+        // We don't cache through symlinks
+        continue;
+      }
+      if (node == null)  {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("No inode found at " + path);
+        }
+      } else if (node.isDirectory()) {
+        INodeDirectory dir = node.asDirectory();
+        ReadOnlyList<INode> children = dir.getChildrenList(null);
+        for (INode child : children) {
+          if (child.isFile()) {
+            rescanFile(pce, child.asFile());
+          }
+        }
+      } else if (node.isFile()) {
+        rescanFile(pce, node.asFile());
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ignoring non-directory, non-file inode " + node +
+                    " found at " + path);
         }
-        blockLog.info("BLOCK* ask " + targetList + " to cache " + block);
       }
     }
-
-    if (blockLog.isDebugEnabled()) {
-        blockLog.debug(
-          "BLOCK* neededCacheBlocks = " + neededCacheBlocks.size()
-          + " pendingCacheBlocks = " + pendingCacheBlocks.size());
+  }
+  
+  /**
+   * Apply a PathBasedCacheEntry to a file.
+   *
+   * @param pce       The PathBasedCacheEntry to apply.
+   * @param file      The file.
+   */
+  private void rescanFile(PathBasedCacheEntry pce, INodeFile file) {
+    BlockInfo[] blockInfos = file.getBlocks();
+    for (BlockInfo blockInfo : blockInfos) {
+      if (!blockInfo.getBlockUCState().equals(BlockUCState.COMPLETE)) {
+        // We don't try to cache blocks that are under construction.
+        continue;
+      }
+      Block block = new Block(blockInfo.getBlockId());
+      CachedBlock ncblock = new CachedBlock(block.getBlockId(),
+          pce.getReplication(), mark);
+      CachedBlock ocblock = cachedBlocks.get(ncblock);
+      if (ocblock == null) {
+        cachedBlocks.put(ncblock);
+      } else {
+        if (mark != ocblock.getMark()) {
+          // Mark hasn't been set in this scan, so update replication and mark.
+          ocblock.setReplicationAndMark(pce.getReplication(), mark);
+        } else {
+          // Mark already set in this scan.  Set replication to highest value in
+          // any PathBasedCacheEntry that covers this file.
+          ocblock.setReplicationAndMark((short)Math.max(
+              pce.getReplication(), ocblock.getReplication()), mark);
+        }
+      }
     }
   }
 
   /**
-   * Reassign pending caching work that has timed out
+   * Scan through the cached block map.
+   * Any blocks which are under-replicated should be assigned new Datanodes.
+   * Blocks that are over-replicated should be removed from Datanodes.
    */
-  private void processPendingCachingWork() {
-    Block[] timedOutItems = pendingCacheBlocks.getTimedOutBlocks();
-    if (timedOutItems != null) {
-      namesystem.writeLock();
-      try {
-        for (int i = 0; i < timedOutItems.length; i++) {
-          Block block = timedOutItems[i];
-          final short numCached = cacheReplManager.getNumCached(block);
-          final short cacheReplication =
-              cacheReplManager.getCacheReplication(block);
-          // Needs to be cached if under-replicated
-          if (numCached < cacheReplication) {
-            synchronized (neededCacheBlocks) {
-              neededCacheBlocks.add(block);
-            }
-          }
+  private void rescanCachedBlockMap() {
+    for (Iterator<CachedBlock> cbIter = cachedBlocks.iterator();
+        cbIter.hasNext(); ) {
+      scannedBlocks++;
+      CachedBlock cblock = cbIter.next();
+      List<DatanodeDescriptor> pendingCached =
+          cblock.getDatanodes(Type.PENDING_CACHED);
+      List<DatanodeDescriptor> cached =
+          cblock.getDatanodes(Type.CACHED);
+      List<DatanodeDescriptor> pendingUncached =
+          cblock.getDatanodes(Type.PENDING_UNCACHED);
+      // Remove nodes from PENDING_UNCACHED if they were actually uncached.
+      for (Iterator<DatanodeDescriptor> iter = pendingUncached.iterator();
+          iter.hasNext(); ) {
+        DatanodeDescriptor datanode = iter.next();
+        if (!cblock.isInList(datanode.getCached())) {
+          datanode.getPendingUncached().remove(cblock);
+          iter.remove();
         }
-      } finally {
-        namesystem.writeUnlock();
+      }
+      // If the block's mark doesn't match with the mark of this scan, that
+      // means that this block couldn't be reached during this scan.  That means
+      // it doesn't need to be cached any more.
+      int neededCached = (cblock.getMark() != mark) ?
+          0 : cblock.getReplication();
+      int numCached = cached.size();
+      if (numCached >= neededCached) {
+        // If we have enough replicas, drop all pending cached.
+        for (DatanodeDescriptor datanode : pendingCached) {
+          datanode.getPendingCached().remove(cblock);
+        }
+        pendingCached.clear();
+      }
+      if (numCached < neededCached) {
+        // If we don't have enough replicas, drop all pending uncached.
+        for (DatanodeDescriptor datanode : pendingUncached) {
+          datanode.getPendingUncached().remove(cblock);
+        }
+        pendingUncached.clear();
+      }
+      int neededUncached = numCached -
+          (pendingUncached.size() + neededCached);
+      if (neededUncached > 0) {
+        addNewPendingUncached(neededUncached, cblock, cached,
+            pendingUncached);
+      } else {
+        int additionalCachedNeeded = neededCached -
+            (numCached + pendingCached.size());
+        if (additionalCachedNeeded > 0) {
+          addNewPendingCached(additionalCachedNeeded, cblock, cached,
+              pendingCached);
+        }
+      }
+      if ((neededCached == 0) &&
+          pendingUncached.isEmpty() &&
+          pendingCached.isEmpty()) {
+        // we have nothing more to do with this block.
+        cbIter.remove();
       }
     }
   }
 
   /**
-   * Schedule blocks for uncaching at datanodes
-   * @return total number of block for deletion
+   * Add new entries to the PendingUncached list.
+   *
+   * @param neededUncached   The number of replicas that need to be uncached.
+   * @param cachedBlock      The block which needs to be uncached.
+   * @param cached           A list of DataNodes currently caching the block.
+   * @param pendingUncached  A list of DataNodes that will soon uncache the
+   *                         block.
    */
-  int computeUncacheWork() {
-    final List<String> nodes = blocksToUncache.getStorageIDs();
-    int blockCnt = 0;
-    for (String node: nodes) {
-      blockCnt += uncachingWorkForOneNode(node);
+  private void addNewPendingUncached(int neededUncached,
+      CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
+      List<DatanodeDescriptor> pendingUncached) {
+    if (!cacheManager.isActive()) {
+      return;
+    }
+    // Figure out which replicas can be uncached.
+    LinkedList<DatanodeDescriptor> possibilities =
+        new LinkedList<DatanodeDescriptor>();
+    for (DatanodeDescriptor datanode : cached) {
+      if (!pendingUncached.contains(datanode)) {
+        possibilities.add(datanode);
+      }
+    }
+    while (neededUncached > 0) {
+      if (possibilities.isEmpty()) {
+        LOG.warn("Logic error: we're trying to uncache more replicas than " +
+            "actually exist for " + cachedBlock);
+        return;
+      }
+      DatanodeDescriptor datanode =
+        possibilities.remove(random.nextInt(possibilities.size()));
+      pendingUncached.add(datanode);
+      boolean added = datanode.getPendingUncached().add(cachedBlock);
+      assert added;
+      neededUncached--;
     }
-    return blockCnt;
   }
-
+  
   /**
-   * Gets the list of blocks scheduled for uncaching at a datanode and
-   * schedules them for uncaching.
-   * 
-   * @return number of blocks scheduled for removal
+   * Add new entries to the PendingCached list.
+   *
+   * @param neededCached     The number of replicas that need to be cached.
+   * @param cachedBlock      The block which needs to be cached.
+   * @param cached           A list of DataNodes currently caching the block.
+   * @param pendingCached    A list of DataNodes that will soon cache the
+   *                         block.
    */
-  private int uncachingWorkForOneNode(String nodeId) {
-    final List<Block> toInvalidate;
-    final DatanodeDescriptor dn;
-
-    namesystem.writeLock();
-    try {
-      // get blocks to invalidate for the nodeId
-      assert nodeId != null;
-      dn = datanodeManager.getDatanode(nodeId);
-      if (dn == null) {
-        blocksToUncache.remove(nodeId);
-        return 0;
-      }
-      toInvalidate = blocksToUncache.invalidateWork(nodeId, dn);
-      if (toInvalidate == null) {
-        return 0;
+  private void addNewPendingCached(int neededCached,
+      CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
+      List<DatanodeDescriptor> pendingCached) {
+    if (!cacheManager.isActive()) {
+      return;
+    }
+    // To figure out which replicas can be cached, we consult the
+    // blocksMap.  We don't want to try to cache a corrupt replica, though.
+    BlockInfo blockInfo = blockManager.
+          getStoredBlock(new Block(cachedBlock.getBlockId()));
+    if (blockInfo == null) {
+      LOG.debug("Not caching block " + cachedBlock + " because it " +
+          "was deleted from all DataNodes.");
+      return;
+    }
+    if (!blockInfo.isComplete()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not caching block " + cachedBlock + " because it " +
+            "is not yet complete.");
+      }
+      return;
+    }
+    List<DatanodeDescriptor> possibilities = new LinkedList<DatanodeDescriptor>();
+    int numReplicas = blockInfo.getCapacity();
+    Collection<DatanodeDescriptor> corrupt =
+        blockManager.getCorruptReplicas(blockInfo);
+    for (int i = 0; i < numReplicas; i++) {
+      DatanodeDescriptor datanode = blockInfo.getDatanode(i);
+      if ((datanode != null) && 
+          ((!pendingCached.contains(datanode)) &&
+          ((corrupt == null) || (!corrupt.contains(datanode))))) {
+        possibilities.add(datanode);
       }
-    } finally {
-      namesystem.writeUnlock();
     }
-    if (blockLog.isInfoEnabled()) {
-      blockLog.info("BLOCK* " + getClass().getSimpleName()
-          + ": ask " + dn + " to uncache " + toInvalidate);
+    while (neededCached > 0) {
+      if (possibilities.isEmpty()) {
+        LOG.warn("We need " + neededCached + " more replica(s) than " +
+            "actually exist to provide a cache replication of " +
+            cachedBlock.getReplication() + " for " + cachedBlock);
+        return;
+      }
+      DatanodeDescriptor datanode =
+          possibilities.remove(random.nextInt(possibilities.size()));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AddNewPendingCached: datanode " + datanode + 
+            " will now cache block " + cachedBlock);
+      }
+      pendingCached.add(datanode);
+      boolean added = datanode.getPendingCached().add(cachedBlock);
+      assert added;
+      neededCached--;
     }
-    return toInvalidate.size();
   }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Wed Oct 16 22:15:33 2013
@@ -28,7 +28,9 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.util.IntrusiveCollection;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -98,22 +100,72 @@ public class DatanodeDescriptor extends 
   }
 
   /**
-   * Head of the list of blocks on the datanode
+   * A list of CachedBlock objects on this datanode.
    */
-  private volatile BlockInfo blockList = null;
+  public static class CachedBlocksList extends IntrusiveCollection<CachedBlock> {
+    public enum Type {
+      PENDING_CACHED,
+      CACHED,
+      PENDING_UNCACHED
+    }
+
+    private final DatanodeDescriptor datanode;
+
+    private final Type type;
+
+    CachedBlocksList(DatanodeDescriptor datanode, Type type) {
+      this.datanode = datanode;
+      this.type = type;
+    }
+
+    public DatanodeDescriptor getDatanode() {
+      return datanode;
+    }
+
+    public Type getType() {
+      return type;
+    }
+  }
+
   /**
-   * Number of blocks on the datanode
+   * The blocks which we want to cache on this DataNode.
    */
-  private int numBlocks = 0;
+  private final CachedBlocksList pendingCached = 
+      new CachedBlocksList(this, CachedBlocksList.Type.PENDING_CACHED);
 
   /**
-   * Head of the list of cached blocks on the datanode
+   * The blocks which we know are cached on this datanode.
+   * This list is updated by periodic cache reports.
    */
-  private volatile BlockInfo cachedBlockList = null;
+  private final CachedBlocksList cached = 
+      new CachedBlocksList(this, CachedBlocksList.Type.CACHED);
+
   /**
-   * Number of cached blocks on the datanode
+   * The blocks which we want to uncache on this DataNode.
    */
-  private int numCachedBlocks = 0;
+  private final CachedBlocksList pendingUncached = 
+      new CachedBlocksList(this, CachedBlocksList.Type.PENDING_UNCACHED);
+
+  public CachedBlocksList getPendingCached() {
+    return pendingCached;
+  }
+
+  public CachedBlocksList getCached() {
+    return cached;
+  }
+
+  public CachedBlocksList getPendingUncached() {
+    return pendingUncached;
+  }
+
+  /**
+   * Head of the list of blocks on the datanode
+   */
+  private volatile BlockInfo blockList = null;
+  /**
+   * Number of blocks on the datanode
+   */
+  private int numBlocks = 0;
 
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
@@ -154,12 +206,6 @@ public class DatanodeDescriptor extends 
   /** A set of blocks to be invalidated by this datanode */
   private LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>();
 
-  /** A queue of blocks to be cached by this datanode */
-  private BlockQueue<Block> cacheBlocks = new BlockQueue<Block>();
-  /** A set of blocks to be uncached by this datanode */
-  private LightWeightHashSet<Block> blocksToUncache =
-      new LightWeightHashSet<Block>();
-
   /* Variables for maintaining number of blocks scheduled to be written to
    * this datanode. This count is approximate and might be slightly bigger
    * in case of errors (e.g. datanode does not report if an error occurs
@@ -287,43 +333,6 @@ public class DatanodeDescriptor extends 
   }
 
   /**
-   * Add block to the list of cached blocks on the data-node.
-   * @return true if block was successfully added, false if already present
-   */
-  public boolean addCachedBlock(BlockInfo b) {
-    if (!b.addNode(this))
-      return false;
-    // add to the head of the data-node list
-    cachedBlockList = b.listInsert(cachedBlockList, this);
-    numCachedBlocks++;
-    return true;
-  }
-
-  /**
-   * Remove block from the list of cached blocks on the data-node.
-   * @return true if block was successfully removed, false if not present
-   */
-  public boolean removeCachedBlock(BlockInfo b) {
-    cachedBlockList = b.listRemove(cachedBlockList, this);
-    if (b.removeNode(this)) {
-      numCachedBlocks--;
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  /**
-   * Move block to the head of the list of cached blocks on the data-node.
-   * @return the index of the head of the blockList
-   */
-  int moveCachedBlockToHead(BlockInfo b, int curIndex, int headIndex) {
-    cachedBlockList = b.moveBlockToHead(cachedBlockList, this, curIndex,
-        headIndex);
-    return curIndex;
-  }
-
-  /**
    * Used for testing only
    * @return the head of the blockList
    */
@@ -332,11 +341,6 @@ public class DatanodeDescriptor extends 
     return blockList;
   }
 
-  @VisibleForTesting
-  protected BlockInfo getCachedHead() {
-    return cachedBlockList;
-  }
-
   /**
    * Replace specified old block with a new one in the DataNodeDescriptor.
    *
@@ -359,10 +363,13 @@ public class DatanodeDescriptor extends 
     setDfsUsed(0);
     setXceiverCount(0);
     this.blockList = null;
-    this.cachedBlockList = null;
     this.invalidateBlocks.clear();
-    this.blocksToUncache.clear();
     this.volumeFailures = 0;
+    // pendingCached, cached, and pendingUncached are protected by the
+    // FSN lock.
+    this.pendingCached.clear();
+    this.cached.clear();
+    this.pendingUncached.clear();
   }
   
   public void clearBlockQueues() {
@@ -371,20 +378,17 @@ public class DatanodeDescriptor extends 
       this.recoverBlocks.clear();
       this.replicateBlocks.clear();
     }
-    synchronized(blocksToUncache) {
-      this.blocksToUncache.clear();
-      this.cacheBlocks.clear();
-    }
+    // pendingCached, cached, and pendingUncached are protected by the
+    // FSN lock.
+    this.pendingCached.clear();
+    this.cached.clear();
+    this.pendingUncached.clear();
   }
 
   public int numBlocks() {
     return numBlocks;
   }
 
-  public int numCachedBlocks() {
-    return numCachedBlocks;
-  }
-
   /**
    * Updates stats from datanode heartbeat.
    */
@@ -438,10 +442,6 @@ public class DatanodeDescriptor extends 
     return new BlockIterator(this.blockList, this);
   }
 
-  public Iterator<BlockInfo> getCachedBlockIterator() {
-    return new BlockIterator(this.cachedBlockList, this);
-  }
-
   /**
    * Store block replication work.
    */
@@ -451,14 +451,6 @@ public class DatanodeDescriptor extends 
   }
 
   /**
-   * Store block caching work.
-   */
-  void addBlockToBeCached(Block block) {
-    assert(block != null);
-    cacheBlocks.offer(block);
-  }
-
-  /**
    * Store block recovery work.
    */
   void addBlockToBeRecovered(BlockInfoUnderConstruction block) {
@@ -483,18 +475,6 @@ public class DatanodeDescriptor extends 
   }
   
   /**
-   * Store block uncaching work.
-   */
-  void addBlocksToBeUncached(List<Block> blocklist) {
-    assert(blocklist != null && blocklist.size() > 0);
-    synchronized (blocksToUncache) {
-      for (Block blk : blocklist) {
-        blocksToUncache.add(blk);
-      }
-    }
-  }
-
-  /**
    * The number of work items that are pending to be replicated
    */
   int getNumberOfBlocksToBeReplicated() {
@@ -502,13 +482,6 @@ public class DatanodeDescriptor extends 
   }
 
   /**
-   * The number of pending cache work items
-   */
-  int getNumberOfBlocksToBeCached() {
-    return cacheBlocks.size();
-  }
-
-  /**
    * The number of block invalidation items that are pending to 
    * be sent to the datanode
    */
@@ -518,23 +491,10 @@ public class DatanodeDescriptor extends 
     }
   }
 
-  /**
-   * The number of pending uncache work items
-   */
-  int getNumberOfBlocksToBeUncached() {
-    synchronized (blocksToUncache) {
-      return blocksToUncache.size();
-    }
-  }
-
   public List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
     return replicateBlocks.poll(maxTransfers);
   }
 
-  public List<Block> getCacheBlocks() {
-    return cacheBlocks.poll(cacheBlocks.size());
-  }
-
   public BlockInfoUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) {
     List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
     if(blocks == null)
@@ -554,17 +514,6 @@ public class DatanodeDescriptor extends 
   }
 
   /**
-   * Remove up to the maximum number of blocks to be uncached
-   */
-  public Block[] getInvalidateCacheBlocks() {
-    synchronized (blocksToUncache) {
-      Block[] deleteList = blocksToUncache.pollToArray(
-          new Block[blocksToUncache.size()]);
-      return deleteList.length == 0 ? null : deleteList;
-    }
-  }
-
-  /**
    * @return Approximate number of blocks currently scheduled to be written 
    * to this datanode.
    */

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Wed Oct 16 22:15:33 2013
@@ -49,6 +49,8 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
 import org.apache.hadoop.hdfs.server.namenode.HostFileManager.Entry;
 import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet;
@@ -76,6 +78,7 @@ import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.IntrusiveCollection;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 
@@ -168,6 +171,12 @@ public class DatanodeManager {
   private boolean hasClusterEverBeenMultiRack = false;
 
   /**
+   * Whether we should tell datanodes what to cache in replies to
+   * heartbeat messages.
+   */
+  private boolean sendCachingCommands = false;
+
+  /**
    * The number of datanodes for each software version. This list should change
    * during rolling upgrades.
    * Software version -> Number of datanodes with this version
@@ -1305,26 +1314,17 @@ public class DatanodeManager {
           cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
               blockPoolId, blks));
         }
-        
-        // Check pending caching
-        List<Block> pendingCacheList = nodeinfo.getCacheBlocks();
-        if (pendingCacheList != null) {
-          long blockIds[] = new long[pendingCacheList.size()];
-          for (int i = 0; i < pendingCacheList.size(); i++) {
-            blockIds[i] = pendingCacheList.get(i).getBlockId();
-          }
-          cmds.add(new BlockIdCommand(DatanodeProtocol.DNA_CACHE, blockPoolId,
-              blockIds));
-        }
-        // Check cached block invalidation
-        blks = nodeinfo.getInvalidateCacheBlocks();
-        if (blks != null) {
-          long blockIds[] = new long[blks.length];
-          for (int i = 0; i < blks.length; i++) {
-            blockIds[i] = blks[i].getBlockId();
-          }
-          cmds.add(new BlockIdCommand(DatanodeProtocol.DNA_UNCACHE,
-              blockPoolId, blockIds));
+        DatanodeCommand pendingCacheCommand =
+            getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
+              DatanodeProtocol.DNA_CACHE, blockPoolId);
+        if (pendingCacheCommand != null) {
+          cmds.add(pendingCacheCommand);
+        }
+        DatanodeCommand pendingUncacheCommand =
+            getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
+              DatanodeProtocol.DNA_UNCACHE, blockPoolId);
+        if (pendingUncacheCommand != null) {
+          cmds.add(pendingUncacheCommand);
         }
 
         blockManager.addKeyUpdateCommand(cmds, nodeinfo);
@@ -1346,6 +1346,40 @@ public class DatanodeManager {
   }
 
   /**
+   * Convert a CachedBlockList into a DatanodeCommand with a list of blocks.
+   *
+   * @param list       The {@link CachedBlocksList}.  This function 
+   *                   clears the list.
+   * @param datanode   The datanode.
+   * @param action     The action to perform in the command.
+   * @param poolId     The block pool id.
+   * @return           A DatanodeCommand to be sent back to the DN, or null if
+   *                   there is nothing to be done.
+   */
+  private DatanodeCommand getCacheCommand(CachedBlocksList list,
+      DatanodeDescriptor datanode, int action, String poolId) {
+    int length = list.size();
+    if (length == 0) {
+      return null;
+    }
+    // Read and clear the existing cache commands.
+    long[] blockIds = new long[length];
+    int i = 0;
+    for (Iterator<CachedBlock> iter = list.iterator();
+            iter.hasNext(); ) {
+      CachedBlock cachedBlock = iter.next();
+      blockIds[i++] = cachedBlock.getBlockId();
+      iter.remove();
+    }
+    if (!sendCachingCommands) {
+      // Do not send caching commands unless the FSNamesystem told us we
+      // should.
+      return null;
+    }
+    return new BlockIdCommand(action, poolId, blockIds);
+  }
+
+  /**
    * Tell all datanodes to use a new, non-persistent bandwidth value for
    * dfs.balance.bandwidthPerSec.
    *
@@ -1393,4 +1427,8 @@ public class DatanodeManager {
   public String toString() {
     return getClass().getSimpleName() + ": " + host2DatanodeMap;
   }
+
+  public void setSendCachingCommands(boolean sendCachingCommands) {
+    this.sendCachingCommands = sendCachingCommands;
+  }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Wed Oct 16 22:15:33 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -34,22 +35,24 @@ import org.apache.hadoop.hdfs.util.Light
  * on the machine in question.
  */
 @InterfaceAudience.Private
-abstract class InvalidateBlocks {
+class InvalidateBlocks {
   /** Mapping: StorageID -> Collection of Blocks */
   private final Map<String, LightWeightHashSet<Block>> node2blocks =
       new TreeMap<String, LightWeightHashSet<Block>>();
   /** The total number of blocks in the map. */
   private long numBlocks = 0L;
 
+  private final DatanodeManager datanodeManager;
+
+  InvalidateBlocks(final DatanodeManager datanodeManager) {
+    this.datanodeManager = datanodeManager;
+  }
+
   /** @return the number of blocks to be invalidated . */
   synchronized long numBlocks() {
     return numBlocks;
   }
 
-  synchronized int numStorages() {
-    return node2blocks.size();
-  }
-
   /**
    * @return true if the given storage has the given block listed for
    * invalidation. Blocks are compared including their generation stamps:
@@ -108,22 +111,22 @@ abstract class InvalidateBlocks {
     }
   }
 
-  /**
-   * Polls up to <i>limit</i> blocks from the list of to-be-invalidated Blocks
-   * for a storage.
-   */
-  synchronized List<Block> pollNumBlocks(final String storageId, final int limit) {
-    final LightWeightHashSet<Block> set = node2blocks.get(storageId);
-    if (set == null) {
-      return null;
+  /** Print the contents to out. */
+  synchronized void dump(final PrintWriter out) {
+    final int size = node2blocks.values().size();
+    out.println("Metasave: Blocks " + numBlocks 
+        + " waiting deletion from " + size + " datanodes.");
+    if (size == 0) {
+      return;
     }
-    List<Block> polledBlocks = set.pollN(limit);
-    // Remove the storage if the set is now empty
-    if (set.isEmpty()) {
-      remove(storageId);
+
+    for(Map.Entry<String,LightWeightHashSet<Block>> entry : node2blocks.entrySet()) {
+      final LightWeightHashSet<Block> blocks = entry.getValue();
+      if (blocks.size() > 0) {
+        out.println(datanodeManager.getDatanode(entry.getKey()));
+        out.println(blocks);
+      }
     }
-    numBlocks -= polledBlocks.size();
-    return polledBlocks;
   }
 
   /** @return a list of the storage IDs. */
@@ -131,22 +134,26 @@ abstract class InvalidateBlocks {
     return new ArrayList<String>(node2blocks.keySet());
   }
 
-  /**
-   * Return the set of to-be-invalidated blocks for a storage.
-   */
-  synchronized LightWeightHashSet<Block> getBlocks(String storageId) {
-    return node2blocks.get(storageId);
-  }
+  synchronized List<Block> invalidateWork(
+      final String storageId, final DatanodeDescriptor dn) {
+    final LightWeightHashSet<Block> set = node2blocks.get(storageId);
+    if (set == null) {
+      return null;
+    }
 
-  /**
-   * Schedules invalidation work associated with a storage at the corresponding
-   * datanode.
-   * @param storageId Storage of blocks to be invalidated
-   * @param dn Datanode where invalidation work will be scheduled
-   * @return List of blocks scheduled for invalidation at the datanode
-   */
-  abstract List<Block> invalidateWork(final String storageId,
-      final DatanodeDescriptor dn);
+    // # blocks that can be sent in one message is limited
+    final int limit = datanodeManager.blockInvalidateLimit;
+    final List<Block> toInvalidate = set.pollN(limit);
+
+    // If we send everything in this message, remove this node entry
+    if (set.isEmpty()) {
+      remove(storageId);
+    }
+
+    dn.addBlocksToBeInvalidated(toInvalidate);
+    numBlocks -= toInvalidate.size();
+    return toInvalidate;
+  }
   
   synchronized void clear() {
     node2blocks.clear();

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java Wed Oct 16 22:15:33 2013
@@ -29,27 +29,20 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
-import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.util.Daemon;
 
-/**
- * PendingReplicationBlocks is used in the BlockManager to track blocks that are
- * currently being replicated on disk and in the CacheReplicationManager to
- * track blocks that are currently being cached.
- * 
- * <p>
- * PendingReplicationBlocks performs the following tasks:
- * </p>
- * 
- * <ol>
- * <li>tracks in-flight replication or caching requests for a block at target
- * datanodes.</li>
- * <li>identifies requests that have timed out and need to be rescheduled at a
- * different datanode.</li>
- * </ol>
- */
-@InterfaceAudience.LimitedPrivate({"HDFS"})
+/***************************************************
+ * PendingReplicationBlocks does the bookkeeping of all
+ * blocks that are getting replicated.
+ *
+ * It does the following:
+ * 1)  record blocks that are getting replicated at this instant.
+ * 2)  a coarse grain timer to track age of replication request
+ * 3)  a thread that periodically identifies replication-requests
+ *     that never made it.
+ *
+ ***************************************************/
 class PendingReplicationBlocks {
   private static final Log LOG = BlockManager.LOG;
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Wed Oct 16 22:15:33 2013
@@ -628,6 +628,8 @@ class BPOfferService {
     case DatanodeProtocol.DNA_FINALIZE:
     case DatanodeProtocol.DNA_RECOVERBLOCK:
     case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
+    case DatanodeProtocol.DNA_CACHE:
+    case DatanodeProtocol.DNA_UNCACHE:
       LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
       break;
     default:

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Wed Oct 16 22:15:33 2013
@@ -21,24 +21,10 @@ package org.apache.hadoop.hdfs.server.da
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.io.IOUtils;
@@ -50,10 +36,6 @@ import org.apache.hadoop.fs.ChecksumExce
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * Manages caching for an FsDatasetImpl by using the mmap(2) and mlock(2)