You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by wa...@apache.org on 2013/09/14 01:27:23 UTC

svn commit: r1523145 [1/2] - in /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ sr...

Author: wang
Date: Fri Sep 13 23:27:22 2013
New Revision: 1523145

URL: http://svn.apache.org/r1523145
Log:
HDFS-5053. NameNode should invoke DataNode APIs to coordinate caching. (Andrew Wang)

Added:
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java   (with props)
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java   (with props)
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java   (with props)
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateStoredBlocks.java   (with props)
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java   (with props)
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java   (with props)
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java   (with props)
Modified:
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt Fri Sep 13 23:27:22 2013
@@ -33,6 +33,8 @@ HDFS-4949 (Unreleased)
     HDFS-5158. Add command-line support for manipulating cache directives.
     (Contributed by Colin Patrick McCabe)
 
+    HDFS-5053. NameNode should invoke DataNode APIs to coordinate caching.
+    (Andrew Wang)
 
   OPTIMIZATIONS
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Sep 13 23:27:22 2013
@@ -102,6 +102,8 @@ public class DFSConfigKeys extends Commo
   public static final long    DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0;
   public static final String  DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume";
   public static final int     DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
+  public static final String  DFS_NAMENODE_CACHING_ENABLED_KEY = "dfs.namenode.caching.enabled";
+  public static final boolean DFS_NAMENODE_CACHING_ENABLED_DEFAULT = false;
 
   public static final String  DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
   public static final int     DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Sep 13 23:27:22 2013
@@ -713,6 +713,12 @@ public class PBHelper {
     case DatanodeProtocol.DNA_SHUTDOWN:
       builder.setAction(BlockCommandProto.Action.SHUTDOWN);
       break;
+    case DatanodeProtocol.DNA_CACHE:
+      builder.setAction(BlockCommandProto.Action.CACHE);
+      break;
+    case DatanodeProtocol.DNA_UNCACHE:
+      builder.setAction(BlockCommandProto.Action.UNCACHE);
+      break;
     default:
       throw new AssertionError("Invalid action");
     }
@@ -765,6 +771,8 @@ public class PBHelper {
       break;
     case DatanodeProtocol.DNA_TRANSFER:
     case DatanodeProtocol.DNA_INVALIDATE:
+    case DatanodeProtocol.DNA_CACHE:
+    case DatanodeProtocol.DNA_UNCACHE:
     case DatanodeProtocol.DNA_SHUTDOWN:
       builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd(
           PBHelper.convert((BlockCommand) datanodeCommand));
@@ -818,6 +826,14 @@ public class PBHelper {
     case SHUTDOWN:
       action = DatanodeProtocol.DNA_SHUTDOWN;
       break;
+    case CACHE:
+      action = DatanodeProtocol.DNA_CACHE;
+      break;
+    case UNCACHE:
+      action = DatanodeProtocol.DNA_UNCACHE;
+      break;
+    default:
+      throw new AssertionError("Unknown action type: " + blkCmd.getAction());
     }
     return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
   }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java Fri Sep 13 23:27:22 2013
@@ -61,6 +61,17 @@ public interface BlockCollection {
   public short getBlockReplication();
 
   /**
+   * Set cache replication factor for the collection
+   */
+  public void setCacheReplication(short cacheReplication);
+
+  /**
+   * Get cache replication factor for the collection
+   * @return cache replication value
+   */
+  public short getCacheReplication();
+
+  /**
    * Get the name of the collection.
    */
   public String getName();

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Sep 13 23:27:22 2013
@@ -77,14 +77,13 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
  */
 @InterfaceAudience.Private
-public class BlockManager {
+public class BlockManager extends ReportProcessor {
 
   static final Log LOG = LogFactory.getLog(BlockManager.class);
   public static final Log blockLog = NameNode.blockStateChangeLog;
@@ -163,7 +162,7 @@ public class BlockManager {
   final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
 
   /** Blocks to be invalidated. */
-  private final InvalidateBlocks invalidateBlocks;
+  private final InvalidateStoredBlocks invalidateBlocks;
   
   /**
    * After a failover, over-replicated blocks may not be handled
@@ -219,7 +218,6 @@ public class BlockManager {
   final boolean encryptDataTransfer;
   
   // Max number of blocks to log info about during a block report.
-  private final long maxNumBlocksToLog;
 
   /**
    * When running inside a Standby node, the node may receive block reports
@@ -237,10 +235,11 @@ public class BlockManager {
   
   public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
       final Configuration conf) throws IOException {
+    super(conf);
     this.namesystem = namesystem;
     datanodeManager = new DatanodeManager(this, namesystem, conf);
     heartbeatManager = datanodeManager.getHeartbeatManager();
-    invalidateBlocks = new InvalidateBlocks(datanodeManager);
+    invalidateBlocks = new InvalidateStoredBlocks(datanodeManager);
 
     // Compute the map capacity by allocating 2% of total memory
     blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
@@ -300,11 +299,7 @@ public class BlockManager {
     this.encryptDataTransfer =
         conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
             DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
-    
-    this.maxNumBlocksToLog =
-        conf.getLong(DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
-            DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
-    
+
     LOG.info("defaultReplication         = " + defaultReplication);
     LOG.info("maxReplication             = " + maxReplication);
     LOG.info("minReplication             = " + minReplication);
@@ -1004,6 +999,7 @@ public class BlockManager {
    * Adds block to list of blocks which will be invalidated on specified
    * datanode and log the operation
    */
+  @Override  // ReportProcessor
   void addToInvalidates(final Block block, final DatanodeInfo datanode) {
     invalidateBlocks.add(block, datanode, true);
   }
@@ -1049,7 +1045,8 @@ public class BlockManager {
     markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn);
   }
 
-  private void markBlockAsCorrupt(BlockToMarkCorrupt b,
+  @Override // ReportProcessor
+  void markBlockAsCorrupt(BlockToMarkCorrupt b,
                                   DatanodeInfo dn) throws IOException {
     DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
@@ -1059,7 +1056,7 @@ public class BlockManager {
 
     BlockCollection bc = b.corrupted.getBlockCollection();
     if (bc == null) {
-      blockLog.info("BLOCK markBlockAsCorrupt: " + b
+      blockLogInfo("#markBlockAsCorrupt: " + b
           + " cannot be marked as corrupt as it does not belong to any file");
       addToInvalidates(b.corrupted, node);
       return;
@@ -1123,6 +1120,9 @@ public class BlockManager {
     this.shouldPostponeBlocksFromFuture  = postpone;
   }
 
+  public boolean shouldPostponeBlocksFromFuture() {
+    return this.shouldPostponeBlocksFromFuture;
+  }
 
   private void postponeBlock(Block blk) {
     if (postponedMisreplicatedBlocks.add(blk)) {
@@ -1544,61 +1544,6 @@ public class BlockManager {
        */
     }
   }
-  
-  /**
-   * StatefulBlockInfo is used to build the "toUC" list, which is a list of
-   * updates to the information about under-construction blocks.
-   * Besides the block in question, it provides the ReplicaState
-   * reported by the datanode in the block report. 
-   */
-  private static class StatefulBlockInfo {
-    final BlockInfoUnderConstruction storedBlock;
-    final ReplicaState reportedState;
-    
-    StatefulBlockInfo(BlockInfoUnderConstruction storedBlock, 
-        ReplicaState reportedState) {
-      this.storedBlock = storedBlock;
-      this.reportedState = reportedState;
-    }
-  }
-  
-  /**
-   * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
-   * list of blocks that should be considered corrupt due to a block report.
-   */
-  private static class BlockToMarkCorrupt {
-    /** The corrupted block in a datanode. */
-    final BlockInfo corrupted;
-    /** The corresponding block stored in the BlockManager. */
-    final BlockInfo stored;
-    /** The reason to mark corrupt. */
-    final String reason;
-    
-    BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason) {
-      Preconditions.checkNotNull(corrupted, "corrupted is null");
-      Preconditions.checkNotNull(stored, "stored is null");
-
-      this.corrupted = corrupted;
-      this.stored = stored;
-      this.reason = reason;
-    }
-
-    BlockToMarkCorrupt(BlockInfo stored, String reason) {
-      this(stored, stored, reason);
-    }
-
-    BlockToMarkCorrupt(BlockInfo stored, long gs, String reason) {
-      this(new BlockInfo(stored), stored, reason);
-      //the corrupted block in datanode has a different generation stamp
-      corrupted.setGenerationStamp(gs);
-    }
-
-    @Override
-    public String toString() {
-      return corrupted + "("
-          + (corrupted == stored? "same as stored": "stored=" + stored) + ")";
-    }
-  }
 
   /**
    * The given datanode is reporting all its blocks.
@@ -1660,15 +1605,6 @@ public class BlockManager {
   }
 
   /**
-   * The given datanode is reporting all of its cached blocks.
-   * Update the cache state of blocks in the block map.
-   */
-  public void processCacheReport(final DatanodeID nodeID, final String poolId,
-      final BlockListAsLongs newReport) throws IOException {
-    // TODO: Implement me!
-  }
-
-  /**
    * Rescan the list of blocks which were previously postponed.
    */
   private void rescanPostponedMisreplicatedBlocks() {
@@ -1699,46 +1635,6 @@ public class BlockManager {
     }
   }
   
-  private void processReport(final DatanodeDescriptor node,
-      final BlockListAsLongs report) throws IOException {
-    // Normal case:
-    // Modify the (block-->datanode) map, according to the difference
-    // between the old and new block report.
-    //
-    Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
-    Collection<Block> toRemove = new LinkedList<Block>();
-    Collection<Block> toInvalidate = new LinkedList<Block>();
-    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
-    Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
-    reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-
-    // Process the blocks on each queue
-    for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
-    }
-    for (Block b : toRemove) {
-      removeStoredBlock(b, node);
-    }
-    int numBlocksLogged = 0;
-    for (BlockInfo b : toAdd) {
-      addStoredBlock(b, node, null, numBlocksLogged < maxNumBlocksToLog);
-      numBlocksLogged++;
-    }
-    if (numBlocksLogged > maxNumBlocksToLog) {
-      blockLog.info("BLOCK* processReport: logged info for " + maxNumBlocksToLog
-          + " of " + numBlocksLogged + " reported.");
-    }
-    for (Block b : toInvalidate) {
-      blockLog.info("BLOCK* processReport: "
-          + b + " on " + node + " size " + b.getNumBytes()
-          + " does not belong to any file");
-      addToInvalidates(b, node);
-    }
-    for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b, node);
-    }
-  }
-
   /**
    * processFirstBlockReport is intended only for processing "initial" block
    * reports, the first block report received from a DN after it registers.
@@ -1801,44 +1697,6 @@ public class BlockManager {
     }
   }
 
-  private void reportDiff(DatanodeDescriptor dn, 
-      BlockListAsLongs newReport, 
-      Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
-      Collection<Block> toRemove,           // remove from DatanodeDescriptor
-      Collection<Block> toInvalidate,       // should be removed from DN
-      Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
-      Collection<StatefulBlockInfo> toUC) { // add to under-construction list
-    // place a delimiter in the list which separates blocks 
-    // that have been reported from those that have not
-    BlockInfo delimiter = new BlockInfo(new Block(), 1);
-    boolean added = dn.addBlock(delimiter);
-    assert added : "Delimiting block cannot be present in the node";
-    int headIndex = 0; //currently the delimiter is in the head of the list
-    int curIndex;
-
-    if (newReport == null)
-      newReport = new BlockListAsLongs();
-    // scan the report and process newly reported blocks
-    BlockReportIterator itBR = newReport.getBlockReportIterator();
-    while(itBR.hasNext()) {
-      Block iblk = itBR.next();
-      ReplicaState iState = itBR.getCurrentReplicaState();
-      BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
-                                  toAdd, toInvalidate, toCorrupt, toUC);
-      // move block to the head of the list
-      if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) {
-        headIndex = dn.moveBlockToHead(storedBlock, curIndex, headIndex);
-      }
-    }
-    // collect blocks that have not been reported
-    // all of them are next to the delimiter
-    Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator(
-        delimiter.getNext(0), dn);
-    while(it.hasNext())
-      toRemove.add(it.next());
-    dn.removeBlock(delimiter);
-  }
-
   /**
    * Process a block replica reported by the data-node.
    * No side effects except adding to the passed-in Collections.
@@ -1870,7 +1728,8 @@ public class BlockManager {
    * @return the up-to-date stored block, if it should be kept.
    *         Otherwise, null.
    */
-  private BlockInfo processReportedBlock(final DatanodeDescriptor dn, 
+  @Override // ReportProcessor
+  BlockInfo processReportedBlock(final DatanodeDescriptor dn, 
       final Block block, final ReplicaState reportedState, 
       final Collection<BlockInfo> toAdd, 
       final Collection<Block> toInvalidate, 
@@ -2097,6 +1956,7 @@ assert storedBlock.findDatanode(dn) < 0 
     }
   }
   
+  @Override // ReportProcessor
   void addStoredBlockUnderConstruction(
       BlockInfoUnderConstruction block, 
       DatanodeDescriptor node, 
@@ -2152,7 +2012,8 @@ assert storedBlock.findDatanode(dn) < 0 
    * needed replications if this takes care of the problem.
    * @return the block that is stored in blockMap.
    */
-  private Block addStoredBlock(final BlockInfo block,
+  @Override // ReportProcessor
+  Block addStoredBlock(final BlockInfo block,
                                DatanodeDescriptor node,
                                DatanodeDescriptor delNodeHint,
                                boolean logEveryBlock)
@@ -2167,7 +2028,7 @@ assert storedBlock.findDatanode(dn) < 0 
     }
     if (storedBlock == null || storedBlock.getBlockCollection() == null) {
       // If this block does not belong to anyfile, then we are done.
-      blockLog.info("BLOCK* addStoredBlock: " + block + " on "
+      blockLogInfo("#addStoredBlock: " + block + " on "
           + node + " size " + block.getNumBytes()
           + " but it does not belong to any file");
       // we could add this block to invalidate set of this datanode.
@@ -2189,7 +2050,7 @@ assert storedBlock.findDatanode(dn) < 0 
       }
     } else {
       curReplicaDelta = 0;
-      blockLog.warn("BLOCK* addStoredBlock: "
+      blockLogWarn("#addStoredBlock: "
           + "Redundant addStoredBlock request received for " + storedBlock
           + " on " + node + " size " + storedBlock.getNumBytes());
     }
@@ -2247,20 +2108,6 @@ assert storedBlock.findDatanode(dn) < 0 
     return storedBlock;
   }
 
-  private void logAddStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
-    if (!blockLog.isInfoEnabled()) {
-      return;
-    }
-    
-    StringBuilder sb = new StringBuilder(500);
-    sb.append("BLOCK* addStoredBlock: blockMap updated: ")
-      .append(node)
-      .append(" is added to ");
-    storedBlock.appendStringTo(sb);
-    sb.append(" size " )
-      .append(storedBlock.getNumBytes());
-    blockLog.info(sb);
-  }
   /**
    * Invalidate corrupt replicas.
    * <p>
@@ -3282,4 +3129,21 @@ assert storedBlock.findDatanode(dn) < 0 
   public void shutdown() {
     blocksMap.close();
   }
+
+  @Override // ReportProcessor
+  int moveBlockToHead(DatanodeDescriptor dn, BlockInfo storedBlock,
+      int curIndex, int headIndex) {
+    return dn.moveBlockToHead(storedBlock, curIndex, headIndex);
+  }
+
+  @Override // ReportProcessor
+  boolean addBlock(DatanodeDescriptor dn, BlockInfo block) {
+    return dn.addBlock(block);
+  }
+
+  @Override // ReportProcessor
+  boolean removeBlock(DatanodeDescriptor dn, BlockInfo block) {
+    return dn.removeBlock(block);
+  }
+
 }

Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java?rev=1523145&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java Fri Sep 13 23:27:22 2013
@@ -0,0 +1,595 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Analogue of the BlockManager class for cached replicas. Maintains the mapping
+ * of cached blocks to datanodes via processing datanode cache reports. Based on
+ * these reports and addition and removal of caching directives in the
+ * CacheManager, the CacheReplicationManager will schedule caching and uncaching
+ * work.
+ * 
+ * The CacheReplicationManager does not have a separate lock, so depends on
+ * taking the namesystem lock as appropriate.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+public class CacheReplicationManager extends ReportProcessor {
+
+  private static final Log LOG =
+      LogFactory.getLog(CacheReplicationManager.class);
+
+  // Statistics
+  private volatile long pendingCacheBlocksCount = 0L;
+  private volatile long underCachedBlocksCount = 0L;
+  private volatile long scheduledCacheBlocksCount = 0L;
+
+  /** Used by metrics */
+  public long getPendingCacheBlocksCount() {
+    return pendingCacheBlocksCount;
+  }
+  /** Used by metrics */
+  public long getUnderCachedBlocksCount() {
+    return underCachedBlocksCount;
+  }
+  /** Used by metrics */
+  public long getScheduledCacheBlocksCount() {
+    return scheduledCacheBlocksCount;
+  }
+  /** Used by metrics */
+  public long getPendingBlocksToUncacheCount() {
+    return blocksToUncache.numBlocks();
+  }
+
+  private final Namesystem namesystem;
+  private final BlockManager blockManager;
+  private final DatanodeManager datanodeManager;
+  private final boolean isCachingEnabled;
+
+  /**
+   * Mapping of blocks to datanodes where the block is cached
+   */
+  final BlocksMap cachedBlocksMap;
+  /**
+   * Blocks to be uncached
+   */
+  private final UncacheBlocks blocksToUncache;
+  /**
+   * Blocks that need to be cached
+   */
+  private final LightWeightHashSet<Block> neededCacheBlocks;
+  /**
+   * Blocks that are being cached
+   */
+  private final PendingReplicationBlocks pendingCacheBlocks;
+
+  /**
+   * Executor for the CacheReplicationMonitor thread
+   */
+  private ExecutorService monitor = null;
+
+  private final Configuration conf;
+
+  public CacheReplicationManager(final Namesystem namesystem,
+      final BlockManager blockManager, final DatanodeManager datanodeManager,
+      final FSClusterStats stats, final Configuration conf) throws IOException {
+    super(conf);
+    this.namesystem = namesystem;
+    this.blockManager = blockManager;
+    this.datanodeManager = datanodeManager;
+    this.conf = conf;
+    isCachingEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY,
+        DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_DEFAULT);
+    if (isCachingEnabled) {
+      cachedBlocksMap = new BlocksMap(BlockManager.DEFAULT_MAP_LOAD_FACTOR);
+      blocksToUncache = new UncacheBlocks();
+      pendingCacheBlocks = new PendingReplicationBlocks(1000 * conf.getInt(
+          DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
+          DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT));
+      neededCacheBlocks = new LightWeightHashSet<Block>();
+    } else {
+      cachedBlocksMap = null;
+      blocksToUncache = null;
+      pendingCacheBlocks = null;
+      neededCacheBlocks = null;
+    }
+  }
+
+  public void activate() {
+    if (isCachingEnabled) {
+      pendingCacheBlocks.start();
+      this.monitor = Executors.newSingleThreadExecutor(
+          new ThreadFactoryBuilder()
+          .setDaemon(true)
+          .setNameFormat(CacheReplicationMonitor.class.toString())
+          .build());
+      monitor.submit(new CacheReplicationMonitor(namesystem, blockManager,
+          datanodeManager, this, blocksToUncache, neededCacheBlocks,
+          pendingCacheBlocks, conf));
+      monitor.shutdown();
+    }
+  }
+
+  public void close() {
+    if (isCachingEnabled) {
+      monitor.shutdownNow();
+      try {
+        monitor.awaitTermination(3000, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+      }
+      pendingCacheBlocks.stop();
+      cachedBlocksMap.close();
+    }
+  }
+
+  public void clearQueues() {
+    blocksToUncache.clear();
+    synchronized (neededCacheBlocks) {
+      neededCacheBlocks.clear();
+    }
+    pendingCacheBlocks.clear();
+  }
+
+  public boolean isCachingEnabled() {
+    return isCachingEnabled;
+  }
+
+  /**
+   * @return desired cache replication factor of the block
+   */
+  short getCacheReplication(Block block) {
+    final BlockCollection bc = blockManager.blocksMap.getBlockCollection(block);
+    return bc == null ? 0 : bc.getCacheReplication();
+  }
+
+  /**
+   * Returns the number of cached replicas of a block
+   */
+  short getNumCached(Block block) {
+    Iterator<DatanodeDescriptor> it = cachedBlocksMap.nodeIterator(block);
+    short numCached = 0;
+    while (it.hasNext()) {
+      it.next();
+      numCached++;
+    }
+    return numCached;
+  }
+
+  /**
+   * The given datanode is reporting all of its cached blocks.
+   * Update the cache state of blocks in the block map.
+   */
+  public void processCacheReport(final DatanodeID nodeID, final String poolId,
+      final BlockListAsLongs newReport) throws IOException {
+    if (!isCachingEnabled) {
+      String error = "cacheReport received from datanode " + nodeID
+          + " but caching is disabled on the namenode ("
+          + DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY + ")";
+      LOG.warn(error + ", ignoring");
+      throw new IOException(error);
+    }
+    namesystem.writeLock();
+    final long startTime = Time.now(); //after acquiring write lock
+    final long endTime;
+    try {
+      final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
+      if (node == null || !node.isAlive) {
+        throw new IOException(
+            "processCacheReport from dead or unregistered node: " + nodeID);
+      }
+
+      // TODO: do an optimized initial cache report while in startup safemode
+      if (namesystem.isInStartupSafeMode()) {
+        blockLogInfo("#processCacheReport: "
+            + "discarded cache report from " + nodeID
+            + " because namenode still in startup phase");
+        return;
+      }
+
+      processReport(node, newReport);
+
+      // TODO: process postponed blocks reported while a standby
+      //rescanPostponedMisreplicatedBlocks();
+    } finally {
+      endTime = Time.now();
+      namesystem.writeUnlock();
+    }
+
+    // Log the block report processing stats from Namenode perspective
+    final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
+    if (metrics != null) {
+      metrics.addCacheBlockReport((int) (endTime - startTime));
+    }
+    blockLogInfo("#processCacheReport: from "
+        + nodeID + ", blocks: " + newReport.getNumberOfBlocks()
+        + ", processing time: " + (endTime - startTime) + " msecs");
+  }
+
+  @Override // ReportProcessor
+  void markBlockAsCorrupt(BlockToMarkCorrupt b, DatanodeInfo dn)
+      throws IOException {
+    throw new UnsupportedOperationException("Corrupt blocks should not be in"
+        + " the cache report");
+  }
+
+  @Override // ReportProcessor
+  void addToInvalidates(final Block b, final DatanodeInfo node) {
+    blocksToUncache.add(b, node, true);
+  }
+
+  @Override // ReportProcessor
+  void addStoredBlockUnderConstruction(
+      BlockInfoUnderConstruction storedBlock, DatanodeDescriptor node,
+      ReplicaState reportedState) {
+    throw new UnsupportedOperationException("Under-construction blocks"
+        + " should not be in the cache report");
+  }
+
+  @Override // ReportProcessor
+  int moveBlockToHead(DatanodeDescriptor dn, BlockInfo storedBlock,
+      int curIndex, int headIndex) {
+    return dn.moveCachedBlockToHead(storedBlock, curIndex, headIndex);
+  }
+
+  @Override // ReportProcessor
+  boolean addBlock(DatanodeDescriptor dn, BlockInfo block) {
+    return dn.addCachedBlock(block);
+  }
+
+  @Override // ReportProcessor
+  boolean removeBlock(DatanodeDescriptor dn, BlockInfo block) {
+    return dn.removeCachedBlock(block);
+  }
+
+  /**
+   * Similar to processReportedBlock. Simpler since it doesn't need to worry
+   * about under construction and corrupt replicas.
+   * 
+   * @return Updated BlockInfo for the block if it should be kept, null if
+   * it is to be invalidated.
+   */
+  @Override // ReportProcessor
+  BlockInfo processReportedBlock(final DatanodeDescriptor dn,
+      final Block block, final ReplicaState reportedState,
+      final Collection<BlockInfo> toAdd,
+      final Collection<Block> toInvalidate,
+      Collection<BlockToMarkCorrupt> toCorrupt,
+      Collection<StatefulBlockInfo> toUC) {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Reported cached block " + block
+          + " on " + dn + " size " + block.getNumBytes()
+          + " replicaState = " + reportedState);
+    }
+
+    final boolean shouldPostponeBlocksFromFuture =
+        blockManager.shouldPostponeBlocksFromFuture();
+    if (shouldPostponeBlocksFromFuture &&
+        namesystem.isGenStampInFuture(block)) {
+      // TODO: queuing cache operations on the standby
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("processReportedBlock: block " + block + " has a "
+            + "genstamp from the future and namenode is in standby mode,"
+            + " ignoring");
+      }
+      return null;
+    }
+
+    BlockInfo storedBlock = blockManager.blocksMap.getStoredBlock(block);
+    if (storedBlock == null) {
+      // If blocksMap does not contain reported block id,
+      // the BlockManager will take care of invalidating it, and the datanode
+      // will automatically uncache at that point.
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("processReportedBlock: block " + block + " not found "
+            + "in blocksMap, ignoring");
+      }
+      return null;
+    }
+
+    BlockUCState ucState = storedBlock.getBlockUCState();
+
+    // Datanodes currently only will cache completed replicas.
+    // Let's just invalidate anything that's not completed and the right
+    // genstamp and number of bytes.
+    if (!ucState.equals(BlockUCState.COMPLETE) ||
+        block.getGenerationStamp() != storedBlock.getGenerationStamp() ||
+        block.getNumBytes() != storedBlock.getNumBytes()) {
+      if (shouldPostponeBlocksFromFuture) {
+        // TODO: queuing cache operations on the standby
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("processReportedBlock: block " + block + " has a "
+              + "mismatching genstamp or length and namenode is in standby"
+              + " mode, ignoring");
+        }
+        return null;
+      } else {
+        toInvalidate.add(block);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("processReportedBlock: block " + block + " scheduled"
+              + " for uncaching because it is misreplicated"
+              + " or under construction.");
+        }
+        return null;
+      }
+    }
+
+    // It's a keeper
+
+    // Could be present in blocksMap and not in cachedBlocksMap, add it
+    BlockInfo cachedBlock = cachedBlocksMap.getStoredBlock(block);
+    if (cachedBlock == null) {
+      cachedBlock = new BlockInfo(block, 0);
+      cachedBlocksMap.addBlockCollection(cachedBlock,
+          storedBlock.getBlockCollection());
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("In memory blockUCState = " + ucState);
+    }
+
+    // Ignore replicas that are already scheduled for removal
+    if (blocksToUncache.contains(dn.getStorageID(), block)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("processReportedBlock: block " + block + " is already"
+            + " scheduled to be uncached, not adding it to the cachedBlocksMap");
+      }
+      return cachedBlock;
+    }
+
+    // add replica if not already present in the cached block map
+    if (reportedState == ReplicaState.FINALIZED
+        && cachedBlock.findDatanode(dn) < 0) {
+      toAdd.add(cachedBlock);
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("processReportedBlock: block " + block + " scheduled"
+          + " to be added to cachedBlocksMap");
+    }
+    return cachedBlock;
+  }
+
+  /**
+   * Modify (cached block-->datanode) map with a newly cached block. Remove
+   * block from set of needed cache replications if this takes care of the
+   * problem.
+   * 
+   * @return the block that is stored in cachedBlockMap.
+   */
+  @Override // ReportProcessor
+  Block addStoredBlock(final BlockInfo block, DatanodeDescriptor node,
+      DatanodeDescriptor delNodeHint, boolean logEveryBlock) throws IOException {
+    assert block != null && namesystem.hasWriteLock();
+    BlockInfo cachedBlock = block;
+    if (cachedBlock == null || cachedBlock.getBlockCollection() == null) {
+      // If this block does not belong to anyfile, then we are done.
+      blockLogInfo("#addStoredBlock: " + block + " on "
+          + node + " size " + block.getNumBytes()
+          + " but it does not belong to any file");
+      // we could add this block to invalidate set of this datanode.
+      // it will happen in next block report otherwise.
+      return block;
+    }
+
+    BlockCollection bc = cachedBlock.getBlockCollection();
+
+    // add block to the datanode
+    boolean added = node.addCachedBlock(cachedBlock);
+
+    int curReplicaDelta;
+    if (added) {
+      curReplicaDelta = 1;
+      if (logEveryBlock) {
+        logAddStoredBlock(cachedBlock, node);
+      }
+    } else {
+      curReplicaDelta = 0;
+      blockLogWarn("#addStoredBlock: "
+          + "Redundant addCachedBlock request received for " + cachedBlock
+          + " on " + node + " size " + cachedBlock.getNumBytes());
+    }
+
+    // Remove it from pending list if present
+    pendingCacheBlocks.decrement(block, node);
+
+    // Now check for completion of blocks and safe block count
+    int numCachedReplicas = getNumCached(cachedBlock);
+    int numEffectiveCachedReplica = numCachedReplicas
+      + pendingCacheBlocks.getNumReplicas(cachedBlock);
+
+    // if file is under construction, then done for now
+    if (bc instanceof MutableBlockCollection) {
+      return cachedBlock;
+    }
+
+    // do not try to handle over/under-replicated blocks during first safe mode
+    if (!namesystem.isPopulatingReplQueues()) {
+      return cachedBlock;
+    }
+
+    // Under-replicated
+    short cacheReplication = bc.getCacheReplication();
+    if (numEffectiveCachedReplica >= cacheReplication) {
+      synchronized (neededCacheBlocks) {
+        neededCacheBlocks.remove(cachedBlock);
+      }
+    } else {
+      updateNeededCaching(cachedBlock, curReplicaDelta, 0);
+    }
+
+    // Over-replicated, we don't need this new replica
+    if (numEffectiveCachedReplica > cacheReplication) {
+      blocksToUncache.add(cachedBlock, node, true);
+    }
+
+    return cachedBlock;
+  }
+
+  /**
+   * Modify (cached block-->datanode) map. Possibly generate replication tasks,
+   * if the removed block is still valid.
+   */
+  @Override // ReportProcessor
+  void removeStoredBlock(Block block, DatanodeDescriptor node) {
+    blockLogDebug("#removeStoredBlock: " + block + " from " + node);
+    assert (namesystem.hasWriteLock());
+    {
+      if (!cachedBlocksMap.removeNode(block, node)) {
+        blockLogDebug("#removeStoredBlock: "
+            + block + " has already been removed from node " + node);
+        return;
+      }
+
+      // Prune the block from the map if it's the last cache replica
+      if (cachedBlocksMap.getStoredBlock(block).numNodes() == 0) {
+        cachedBlocksMap.removeBlock(block);
+      }
+
+      //
+      // It's possible that the block was removed because of a datanode
+      // failure. If the block is still valid, check if replication is
+      // necessary. In that case, put block on a possibly-will-
+      // be-replicated list.
+      //
+      BlockCollection bc = blockManager.blocksMap.getBlockCollection(block);
+      if (bc != null) {
+        updateNeededCaching(block, -1, 0);
+      }
+    }
+  }
+
+  /**
+   * Reduce cache replication factor to the new replication by randomly
+   * choosing replicas to invalidate.
+   */
+  private void processOverCachedBlock(final Block block,
+      final short replication) {
+    assert namesystem.hasWriteLock();
+    List<DatanodeDescriptor> nodes = getSafeReplicas(cachedBlocksMap, block);
+    List<DatanodeDescriptor> targets =
+        CacheReplicationPolicy.chooseTargetsToUncache(nodes, replication);
+    for (DatanodeDescriptor dn: targets) {
+      blocksToUncache.add(block, dn, true);
+    }
+  }
+
+  /** Set replication for the blocks. */
+  public void setCacheReplication(final short oldRepl, final short newRepl,
+      final String src, final Block... blocks) {
+    if (!isCachingEnabled) {
+      LOG.warn("Attempted to set cache replication for " + src + " but caching"
+          + " is disabled (" + DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY
+          + "), ignoring");
+      return;
+    }
+    if (newRepl == oldRepl) {
+      return;
+    }
+
+    // update needReplication priority queues
+    for (Block b : blocks) {
+      updateNeededCaching(b, 0, newRepl-oldRepl);
+    }
+
+    if (oldRepl > newRepl) {
+      // old replication > the new one; need to remove copies
+      LOG.info("Decreasing cache replication from " + oldRepl + " to " + newRepl
+          + " for " + src);
+      for (Block b : blocks) {
+        processOverCachedBlock(b, newRepl);
+      }
+    } else { // replication factor is increased
+      LOG.info("Increasing cache replication from " + oldRepl + " to " + newRepl
+          + " for " + src);
+    }
+  }
+
+  /** updates a block in under replicated queue */
+  private void updateNeededCaching(final Block block,
+      final int curReplicasDelta, int expectedReplicasDelta) {
+    namesystem.writeLock();
+    try {
+      if (!namesystem.isPopulatingReplQueues()) {
+        return;
+      }
+      final int numCached = getNumCached(block);
+      final int curExpectedReplicas = getCacheReplication(block);
+      if (numCached < curExpectedReplicas) {
+        neededCacheBlocks.add(block);
+      } else {
+        synchronized (neededCacheBlocks) {
+          neededCacheBlocks.remove(block);
+        }
+      }
+    } finally {
+      namesystem.writeUnlock();
+    }
+  }
+
+  /**
+   * Return the safely cached replicas of a block in a BlocksMap
+   */
+  List<DatanodeDescriptor> getSafeReplicas(BlocksMap map, Block block) {
+    List<DatanodeDescriptor> nodes = new ArrayList<DatanodeDescriptor>(3);
+    Collection<DatanodeDescriptor> corrupted =
+        blockManager.corruptReplicas.getNodes(block);
+    Iterator<DatanodeDescriptor> it = map.nodeIterator(block);
+    while (it.hasNext()) {
+      DatanodeDescriptor dn = it.next();
+      // Don't count a decommissioned or decommissioning nodes
+      if (dn.isDecommissioned() || dn.isDecommissionInProgress()) {
+        continue;
+      }
+      // Don't count a corrupted node
+      if (corrupted != null && corrupted.contains(dn)) {
+        continue;
+      }
+      nodes.add(dn);
+    }
+    return nodes;
+  }
+}

Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1523145&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Fri Sep 13 23:27:22 2013
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.apache.hadoop.util.ExitUtil.terminate;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+
+/**
+ * Periodically computes new replication work. This consists of two tasks:
+ * 
+ * 1) Assigning blocks in the neededCacheBlocks to datanodes where they will be
+ * cached. This moves them to the pendingCacheBlocks list.
+ * 
+ * 2) Placing caching tasks in pendingCacheBlocks that have timed out
+ * back into neededCacheBlocks for reassignment.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+class CacheReplicationMonitor implements Runnable {
+
+  private static final Log LOG =
+      LogFactory.getLog(CacheReplicationMonitor.class);
+
+  private static final Log blockLog = NameNode.blockStateChangeLog;
+
+  private final Namesystem namesystem;
+  private final BlockManager blockManager;
+  private final DatanodeManager datanodeManager;
+  private final CacheReplicationManager cacheReplManager;
+
+  private final UncacheBlocks blocksToUncache;
+  private final LightWeightHashSet<Block> neededCacheBlocks;
+  private final PendingReplicationBlocks pendingCacheBlocks;
+
+  /**
+   * Re-check period for computing cache replication work
+   */
+  private final long cacheReplicationRecheckInterval;
+
+  public CacheReplicationMonitor(Namesystem namesystem,
+      BlockManager blockManager, DatanodeManager datanodeManager,
+      CacheReplicationManager cacheReplManager,
+      UncacheBlocks blocksToUncache,
+      LightWeightHashSet<Block> neededCacheBlocks,
+      PendingReplicationBlocks pendingCacheBlocks,
+      Configuration conf) {
+    this.namesystem = namesystem;
+    this.blockManager = blockManager;
+    this.datanodeManager = datanodeManager;
+    this.cacheReplManager = cacheReplManager;
+
+    this.blocksToUncache = blocksToUncache;
+    this.neededCacheBlocks = neededCacheBlocks;
+    this.pendingCacheBlocks = pendingCacheBlocks;
+
+    this.cacheReplicationRecheckInterval = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
+  }
+
+  @Override
+  public void run() {
+    LOG.info("CacheReplicationMonitor is starting");
+    while (namesystem.isRunning()) {
+      try {
+        computeCachingWork();
+        processPendingCachingWork();
+        Thread.sleep(cacheReplicationRecheckInterval);
+      } catch (Throwable t) {
+        if (!namesystem.isRunning()) {
+          LOG.info("Stopping CacheReplicationMonitor.");
+          if (!(t instanceof InterruptedException)) {
+            LOG.info("CacheReplicationMonitor received an exception"
+                + " while shutting down.", t);
+          }
+          break;
+        }
+        LOG.fatal("ReplicationMonitor thread received Runtime exception. ", t);
+        terminate(1, t);
+      }
+    }
+  }
+
+  /**
+   * Assigns under-cached blocks to new datanodes.
+   */
+  private void computeCachingWork() {
+    List<Block> blocksToCache = null;
+    namesystem.writeLock();
+    try {
+      synchronized (neededCacheBlocks) {
+        blocksToCache = neededCacheBlocks.pollAll();
+      }
+    } finally {
+      namesystem.writeUnlock();
+    }
+    computeCachingWorkForBlocks(blocksToCache);
+    computeUncacheWork();
+  }
+
+  private void computeCachingWorkForBlocks(List<Block> blocksToCache) {
+    int requiredRepl, effectiveRepl, additionalRepl;
+    List<DatanodeDescriptor> cachedNodes, storedNodes, targets;
+
+    final HashMap<Block, List<DatanodeDescriptor>> work =
+        new HashMap<Block, List<DatanodeDescriptor>>();
+    namesystem.writeLock();
+    try {
+      synchronized (neededCacheBlocks) {
+        for (Block block: blocksToCache) {
+          // Required number of cached replicas
+          requiredRepl = cacheReplManager.getCacheReplication(block);
+          // Replicas that are safely cached
+          cachedNodes = cacheReplManager.getSafeReplicas(
+              cacheReplManager.cachedBlocksMap, block);
+          // Replicas that are safely stored on disk
+          storedNodes = cacheReplManager.getSafeReplicas(
+              blockManager.blocksMap, block);
+          // "effective" replication factor which includes pending
+          // replication work
+          effectiveRepl = cachedNodes.size()
+              + pendingCacheBlocks.getNumReplicas(block);
+          if (effectiveRepl >= requiredRepl) {
+            neededCacheBlocks.remove(block);
+            blockLog.info("BLOCK* Removing " + block
+                + " from neededCacheBlocks as it has enough cached replicas");
+              continue;
+          }
+          // Choose some replicas to cache if needed
+          additionalRepl = requiredRepl - effectiveRepl;
+          targets = new ArrayList<DatanodeDescriptor>(storedNodes);
+          // Only target replicas that aren't already cached.
+          for (DatanodeDescriptor dn: storedNodes) {
+            if (!cachedNodes.contains(dn)) {
+              targets.add(dn);
+            }
+          }
+          if (targets.size() < additionalRepl) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Block " + block + " cannot be cached on additional"
+                  + " nodes because there are no more available datanodes"
+                  + " with the block on disk.");
+            }
+          }
+          targets = CacheReplicationPolicy.chooseTargetsToCache(block, targets,
+              additionalRepl);
+          if (targets.size() < additionalRepl) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Block " + block + " cannot be cached on additional"
+                  + " nodes because there is not sufficient cache space on"
+                  + " available target datanodes.");
+            }
+          }
+          // Continue if we couldn't get more cache targets
+          if (targets.size() == 0) {
+            continue;
+          }
+
+          // Update datanodes and blocks that were scheduled for caching
+          work.put(block, targets);
+          // Schedule caching on the targets
+          for (DatanodeDescriptor target: targets) {
+            target.addBlockToBeCached(block);
+          }
+          // Add block to the pending queue
+          pendingCacheBlocks.increment(block,
+              targets.toArray(new DatanodeDescriptor[] {}));
+          if (blockLog.isDebugEnabled()) {
+            blockLog.debug("BLOCK* block " + block
+                + " is moved from neededCacheBlocks to pendingCacheBlocks");
+          }
+          // Remove from needed queue if it will be fully replicated
+          if (effectiveRepl + targets.size() >= requiredRepl) {
+            neededCacheBlocks.remove(block);
+          }
+        }
+      }
+    } finally {
+      namesystem.writeUnlock();
+    }
+
+    if (blockLog.isInfoEnabled()) {
+      // log which blocks have been scheduled for replication
+      for (Entry<Block, List<DatanodeDescriptor>> item : work.entrySet()) {
+        Block block = item.getKey();
+        List<DatanodeDescriptor> nodes = item.getValue();
+        StringBuilder targetList = new StringBuilder("datanode(s)");
+        for (DatanodeDescriptor node: nodes) {
+          targetList.append(' ');
+          targetList.append(node);
+        }
+        blockLog.info("BLOCK* ask " + targetList + " to cache " + block);
+      }
+    }
+
+    if (blockLog.isDebugEnabled()) {
+        blockLog.debug(
+          "BLOCK* neededCacheBlocks = " + neededCacheBlocks.size()
+          + " pendingCacheBlocks = " + pendingCacheBlocks.size());
+    }
+  }
+
+  /**
+   * Reassign pending caching work that has timed out
+   */
+  private void processPendingCachingWork() {
+    Block[] timedOutItems = pendingCacheBlocks.getTimedOutBlocks();
+    if (timedOutItems != null) {
+      namesystem.writeLock();
+      try {
+        for (int i = 0; i < timedOutItems.length; i++) {
+          Block block = timedOutItems[i];
+          final short numCached = cacheReplManager.getNumCached(block);
+          final short cacheReplication =
+              cacheReplManager.getCacheReplication(block);
+          // Needs to be cached if under-replicated
+          if (numCached < cacheReplication) {
+            synchronized (neededCacheBlocks) {
+              neededCacheBlocks.add(block);
+            }
+          }
+        }
+      } finally {
+        namesystem.writeUnlock();
+      }
+    }
+  }
+
+  /**
+   * Schedule blocks for uncaching at datanodes
+   * @return total number of block for deletion
+   */
+  int computeUncacheWork() {
+    final List<String> nodes = blocksToUncache.getStorageIDs();
+    int blockCnt = 0;
+    for (String node: nodes) {
+      blockCnt += uncachingWorkForOneNode(node);
+    }
+    return blockCnt;
+  }
+
+  /**
+   * Gets the list of blocks scheduled for uncaching at a datanode and
+   * schedules them for uncaching.
+   * 
+   * @return number of blocks scheduled for removal
+   */
+  private int uncachingWorkForOneNode(String nodeId) {
+    final List<Block> toInvalidate;
+    final DatanodeDescriptor dn;
+
+    namesystem.writeLock();
+    try {
+      // get blocks to invalidate for the nodeId
+      assert nodeId != null;
+      dn = datanodeManager.getDatanode(nodeId);
+      if (dn == null) {
+        blocksToUncache.remove(nodeId);
+        return 0;
+      }
+      toInvalidate = blocksToUncache.invalidateWork(nodeId, dn);
+      if (toInvalidate == null) {
+        return 0;
+      }
+    } finally {
+      namesystem.writeUnlock();
+    }
+    if (blockLog.isInfoEnabled()) {
+      blockLog.info("BLOCK* " + getClass().getSimpleName()
+          + ": ask " + dn + " to uncache " + toInvalidate);
+    }
+    return toInvalidate.size();
+  }
+}

Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java?rev=1523145&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java Fri Sep 13 23:27:22 2013
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.commons.math.random.RandomData;
+import org.apache.commons.math.random.RandomDataImpl;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * Helper class used by the CacheReplicationManager and CacheReplicationMonitor
+ * to select datanodes where blocks should be cached or uncached.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+public class CacheReplicationPolicy {
+
+  /**
+   * @return List of datanodes with sufficient capacity to cache the block
+   */
+  private static List<DatanodeDescriptor> selectSufficientCapacity(Block block,
+      List<DatanodeDescriptor> targets) {
+    List<DatanodeDescriptor> sufficient =
+        new ArrayList<DatanodeDescriptor>(targets.size());
+    for (DatanodeDescriptor dn: targets) {
+      long remaining = dn.getCacheRemaining();
+      if (remaining >= block.getNumBytes()) {
+        sufficient.add(dn);
+      }
+    }
+    return sufficient;
+  }
+
+  /**
+   * Returns a random datanode from targets, weighted by the amount of free
+   * cache capacity on the datanode. Prunes unsuitable datanodes from the
+   * targets list.
+   * 
+   * @param block Block to be cached
+   * @param targets List of potential cache targets
+   * @return a random DN, or null if no datanodes are available or have enough
+   *         cache capacity.
+   */
+  private static DatanodeDescriptor randomDatanodeByRemainingCache(Block block,
+      List<DatanodeDescriptor> targets) {
+    // Hold a lottery biased by the amount of free space to decide
+    // who gets the block
+    Collections.shuffle(targets);
+    TreeMap<Long, DatanodeDescriptor> lottery =
+        new TreeMap<Long, DatanodeDescriptor>();
+    long totalCacheAvailable = 0;
+    for (DatanodeDescriptor dn: targets) {
+      long remaining = dn.getCacheRemaining();
+      totalCacheAvailable += remaining;
+      lottery.put(totalCacheAvailable, dn);
+    }
+    // Pick our lottery winner
+    RandomData r = new RandomDataImpl();
+    long winningTicket = r.nextLong(0, totalCacheAvailable - 1);
+    Entry<Long, DatanodeDescriptor> winner = lottery.higherEntry(winningTicket);
+    return winner.getValue();
+  }
+
+  /**
+   * Chooses numTargets new cache replicas for a block from a list of targets.
+   * Will return fewer targets than requested if not enough nodes are available.
+   * 
+   * @return List of target datanodes
+   */
+  static List<DatanodeDescriptor> chooseTargetsToCache(Block block,
+      List<DatanodeDescriptor> targets, int numTargets) {
+    List<DatanodeDescriptor> sufficient =
+        selectSufficientCapacity(block, targets);
+    List<DatanodeDescriptor> chosen =
+        new ArrayList<DatanodeDescriptor>(numTargets);
+    for (int i = 0; i < numTargets && !sufficient.isEmpty(); i++) {
+      chosen.add(randomDatanodeByRemainingCache(block, sufficient));
+    }
+    return chosen;
+  }
+
+  /**
+   * Given a list cache replicas where a block is cached, choose replicas to
+   * uncache to drop the cache replication factor down to replication.
+   * 
+   * @param nodes list of datanodes where the block is currently cached
+   * @param replication desired replication factor
+   * @return List of datanodes to uncache
+   */
+  public static List<DatanodeDescriptor> chooseTargetsToUncache(
+      List<DatanodeDescriptor> nodes, short replication) {
+    final int effectiveReplication = nodes.size();
+    List<DatanodeDescriptor> targets =
+        new ArrayList<DatanodeDescriptor>(effectiveReplication);
+    Collections.shuffle(nodes);
+    final int additionalTargetsNeeded = effectiveReplication - replication;
+    int chosen = 0;
+    while (chosen < additionalTargetsNeeded && !nodes.isEmpty()) {
+      targets.add(nodes.get(chosen));
+      chosen++;
+    }
+    return targets;
+  }
+
+}

Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Fri Sep 13 23:27:22 2013
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -30,6 +31,9 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
 /**
  * This class extends the DatanodeInfo class with ephemeral information (eg
  * health, capacity, what blocks are associated with the Datanode) that is
@@ -93,8 +97,24 @@ public class DatanodeDescriptor extends 
     }
   }
 
+  /**
+   * Head of the list of blocks on the datanode
+   */
   private volatile BlockInfo blockList = null;
+  /**
+   * Number of blocks on the datanode
+   */
   private int numBlocks = 0;
+
+  /**
+   * Head of the list of cached blocks on the datanode
+   */
+  private volatile BlockInfo cachedBlockList = null;
+  /**
+   * Number of cached blocks on the datanode
+   */
+  private int numCachedBlocks = 0;
+
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   public boolean isAlive = false;
@@ -134,6 +154,12 @@ public class DatanodeDescriptor extends 
   /** A set of blocks to be invalidated by this datanode */
   private LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>();
 
+  /** A queue of blocks to be cached by this datanode */
+  private BlockQueue<Block> cacheBlocks = new BlockQueue<Block>();
+  /** A set of blocks to be uncached by this datanode */
+  private LightWeightHashSet<Block> blocksToUncache =
+      new LightWeightHashSet<Block>();
+
   /* Variables for maintaining number of blocks scheduled to be written to
    * this datanode. This count is approximate and might be slightly bigger
    * in case of errors (e.g. datanode does not report if an error occurs
@@ -261,13 +287,56 @@ public class DatanodeDescriptor extends 
   }
 
   /**
+   * Add block to the list of cached blocks on the data-node.
+   * @return true if block was successfully added, false if already present
+   */
+  public boolean addCachedBlock(BlockInfo b) {
+    if (!b.addNode(this))
+      return false;
+    // add to the head of the data-node list
+    cachedBlockList = b.listInsert(cachedBlockList, this);
+    numCachedBlocks++;
+    return true;
+  }
+
+  /**
+   * Remove block from the list of cached blocks on the data-node.
+   * @return true if block was successfully removed, false if not present
+   */
+  public boolean removeCachedBlock(BlockInfo b) {
+    cachedBlockList = b.listRemove(cachedBlockList, this);
+    if (b.removeNode(this)) {
+      numCachedBlocks--;
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Move block to the head of the list of cached blocks on the data-node.
+   * @return the index of the head of the blockList
+   */
+  int moveCachedBlockToHead(BlockInfo b, int curIndex, int headIndex) {
+    cachedBlockList = b.moveBlockToHead(cachedBlockList, this, curIndex,
+        headIndex);
+    return curIndex;
+  }
+
+  /**
    * Used for testing only
    * @return the head of the blockList
    */
+  @VisibleForTesting
   protected BlockInfo getHead(){
     return blockList;
   }
 
+  @VisibleForTesting
+  protected BlockInfo getCachedHead() {
+    return cachedBlockList;
+  }
+
   /**
    * Replace specified old block with a new one in the DataNodeDescriptor.
    *
@@ -290,7 +359,9 @@ public class DatanodeDescriptor extends 
     setDfsUsed(0);
     setXceiverCount(0);
     this.blockList = null;
+    this.cachedBlockList = null;
     this.invalidateBlocks.clear();
+    this.blocksToUncache.clear();
     this.volumeFailures = 0;
   }
   
@@ -300,12 +371,20 @@ public class DatanodeDescriptor extends 
       this.recoverBlocks.clear();
       this.replicateBlocks.clear();
     }
+    synchronized(blocksToUncache) {
+      this.blocksToUncache.clear();
+      this.cacheBlocks.clear();
+    }
   }
 
   public int numBlocks() {
     return numBlocks;
   }
 
+  public int numCachedBlocks() {
+    return numCachedBlocks;
+  }
+
   /**
    * Updates stats from datanode heartbeat.
    */
@@ -358,7 +437,11 @@ public class DatanodeDescriptor extends 
   public Iterator<BlockInfo> getBlockIterator() {
     return new BlockIterator(this.blockList, this);
   }
-  
+
+  public Iterator<BlockInfo> getCachedBlockIterator() {
+    return new BlockIterator(this.cachedBlockList, this);
+  }
+
   /**
    * Store block replication work.
    */
@@ -368,6 +451,14 @@ public class DatanodeDescriptor extends 
   }
 
   /**
+   * Store block caching work.
+   */
+  void addBlockToBeCached(Block block) {
+    assert(block != null);
+    cacheBlocks.offer(block);
+  }
+
+  /**
    * Store block recovery work.
    */
   void addBlockToBeRecovered(BlockInfoUnderConstruction block) {
@@ -390,6 +481,18 @@ public class DatanodeDescriptor extends 
       }
     }
   }
+  
+  /**
+   * Store block uncaching work.
+   */
+  void addBlocksToBeUncached(List<Block> blocklist) {
+    assert(blocklist != null && blocklist.size() > 0);
+    synchronized (blocksToUncache) {
+      for (Block blk : blocklist) {
+        blocksToUncache.add(blk);
+      }
+    }
+  }
 
   /**
    * The number of work items that are pending to be replicated
@@ -399,6 +502,13 @@ public class DatanodeDescriptor extends 
   }
 
   /**
+   * The number of pending cache work items
+   */
+  int getNumberOfBlocksToBeCached() {
+    return cacheBlocks.size();
+  }
+
+  /**
    * The number of block invalidation items that are pending to 
    * be sent to the datanode
    */
@@ -407,11 +517,24 @@ public class DatanodeDescriptor extends 
       return invalidateBlocks.size();
     }
   }
-  
+
+  /**
+   * The number of pending uncache work items
+   */
+  int getNumberOfBlocksToBeUncached() {
+    synchronized (blocksToUncache) {
+      return blocksToUncache.size();
+    }
+  }
+
   public List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
     return replicateBlocks.poll(maxTransfers);
   }
 
+  public List<Block> getCacheBlocks() {
+    return cacheBlocks.poll(cacheBlocks.size());
+  }
+
   public BlockInfoUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) {
     List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
     if(blocks == null)
@@ -431,6 +554,17 @@ public class DatanodeDescriptor extends 
   }
 
   /**
+   * Remove up to the maximum number of blocks to be uncached
+   */
+  public Block[] getInvalidateCacheBlocks() {
+    synchronized (blocksToUncache) {
+      Block[] deleteList = blocksToUncache.pollToArray(
+          new Block[blocksToUncache.size()]);
+      return deleteList.length == 0 ? null : deleteList;
+    }
+  }
+
+  /**
    * @return Approximate number of blocks currently scheduled to be written 
    * to this datanode.
    */

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Sep 13 23:27:22 2013
@@ -1230,6 +1230,19 @@ public class DatanodeManager {
               blockPoolId, blks));
         }
         
+        // Check pending caching
+        List<Block> pendingCacheList = nodeinfo.getCacheBlocks();
+        if (pendingCacheList != null) {
+          cmds.add(new BlockCommand(DatanodeProtocol.DNA_CACHE, blockPoolId,
+              pendingCacheList.toArray(new Block[] {})));
+        }
+        // Check cached block invalidation
+        blks = nodeinfo.getInvalidateCacheBlocks();
+        if (blks != null) {
+          cmds.add(new BlockCommand(DatanodeProtocol.DNA_UNCACHE,
+              blockPoolId, blks));
+        }
+
         blockManager.addKeyUpdateCommand(cmds, nodeinfo);
 
         // check for balancer bandwidth update

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Fri Sep 13 23:27:22 2013
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -35,24 +34,22 @@ import org.apache.hadoop.hdfs.util.Light
  * on the machine in question.
  */
 @InterfaceAudience.Private
-class InvalidateBlocks {
+abstract class InvalidateBlocks {
   /** Mapping: StorageID -> Collection of Blocks */
   private final Map<String, LightWeightHashSet<Block>> node2blocks =
       new TreeMap<String, LightWeightHashSet<Block>>();
   /** The total number of blocks in the map. */
   private long numBlocks = 0L;
 
-  private final DatanodeManager datanodeManager;
-
-  InvalidateBlocks(final DatanodeManager datanodeManager) {
-    this.datanodeManager = datanodeManager;
-  }
-
   /** @return the number of blocks to be invalidated . */
   synchronized long numBlocks() {
     return numBlocks;
   }
 
+  synchronized int numStorages() {
+    return node2blocks.size();
+  }
+
   /**
    * @return true if the given storage has the given block listed for
    * invalidation. Blocks are compared including their generation stamps:
@@ -111,22 +108,22 @@ class InvalidateBlocks {
     }
   }
 
-  /** Print the contents to out. */
-  synchronized void dump(final PrintWriter out) {
-    final int size = node2blocks.values().size();
-    out.println("Metasave: Blocks " + numBlocks 
-        + " waiting deletion from " + size + " datanodes.");
-    if (size == 0) {
-      return;
+  /**
+   * Polls up to <i>limit</i> blocks from the list of to-be-invalidated Blocks
+   * for a storage.
+   */
+  synchronized List<Block> pollNumBlocks(final String storageId, final int limit) {
+    final LightWeightHashSet<Block> set = node2blocks.get(storageId);
+    if (set == null) {
+      return null;
     }
-
-    for(Map.Entry<String,LightWeightHashSet<Block>> entry : node2blocks.entrySet()) {
-      final LightWeightHashSet<Block> blocks = entry.getValue();
-      if (blocks.size() > 0) {
-        out.println(datanodeManager.getDatanode(entry.getKey()));
-        out.println(blocks);
-      }
+    List<Block> polledBlocks = set.pollN(limit);
+    // Remove the storage if the set is now empty
+    if (set.isEmpty()) {
+      remove(storageId);
     }
+    numBlocks -= polledBlocks.size();
+    return polledBlocks;
   }
 
   /** @return a list of the storage IDs. */
@@ -134,26 +131,22 @@ class InvalidateBlocks {
     return new ArrayList<String>(node2blocks.keySet());
   }
 
-  synchronized List<Block> invalidateWork(
-      final String storageId, final DatanodeDescriptor dn) {
-    final LightWeightHashSet<Block> set = node2blocks.get(storageId);
-    if (set == null) {
-      return null;
-    }
-
-    // # blocks that can be sent in one message is limited
-    final int limit = datanodeManager.blockInvalidateLimit;
-    final List<Block> toInvalidate = set.pollN(limit);
-
-    // If we send everything in this message, remove this node entry
-    if (set.isEmpty()) {
-      remove(storageId);
-    }
-
-    dn.addBlocksToBeInvalidated(toInvalidate);
-    numBlocks -= toInvalidate.size();
-    return toInvalidate;
+  /**
+   * Return the set of to-be-invalidated blocks for a storage.
+   */
+  synchronized LightWeightHashSet<Block> getBlocks(String storageId) {
+    return node2blocks.get(storageId);
   }
+
+  /**
+   * Schedules invalidation work associated with a storage at the corresponding
+   * datanode.
+   * @param storageId Storage of blocks to be invalidated
+   * @param dn Datanode where invalidation work will be scheduled
+   * @return List of blocks scheduled for invalidation at the datanode
+   */
+  abstract List<Block> invalidateWork(final String storageId,
+      final DatanodeDescriptor dn);
   
   synchronized void clear() {
     node2blocks.clear();

Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateStoredBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateStoredBlocks.java?rev=1523145&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateStoredBlocks.java (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateStoredBlocks.java Fri Sep 13 23:27:22 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.PrintWriter;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+
+/**
+ * Subclass of InvalidateBlocks used by the BlockManager to
+ * track blocks on each storage that are scheduled to be invalidated.
+ */
+public class InvalidateStoredBlocks extends InvalidateBlocks {
+
+  private final DatanodeManager datanodeManager;
+
+  InvalidateStoredBlocks(DatanodeManager datanodeManager) {
+    this.datanodeManager = datanodeManager;
+  }
+
+  /** Print the contents to out. */
+  synchronized void dump(final PrintWriter out) {
+    final int size = numStorages();
+    out.println("Metasave: Blocks " + numBlocks() 
+        + " waiting deletion from " + size + " datanodes.");
+    if (size == 0) {
+      return;
+    }
+
+    List<String> storageIds = getStorageIDs();
+    for (String storageId: storageIds) {
+      LightWeightHashSet<Block> blocks = getBlocks(storageId);
+      if (blocks != null && !blocks.isEmpty()) {
+        out.println(datanodeManager.getDatanode(storageId));
+        out.println(blocks);
+      }
+    }
+  }
+
+  @Override
+  synchronized List<Block> invalidateWork(
+      final String storageId, final DatanodeDescriptor dn) {
+    final List<Block> toInvalidate = pollNumBlocks(storageId,
+        datanodeManager.blockInvalidateLimit);
+    if (toInvalidate != null) {
+      dn.addBlocksToBeInvalidated(toInvalidate);
+    }
+    return toInvalidate;
+  }
+}

Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateStoredBlocks.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java Fri Sep 13 23:27:22 2013
@@ -29,20 +29,27 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.util.Daemon;
 
-/***************************************************
- * PendingReplicationBlocks does the bookkeeping of all
- * blocks that are getting replicated.
- *
- * It does the following:
- * 1)  record blocks that are getting replicated at this instant.
- * 2)  a coarse grain timer to track age of replication request
- * 3)  a thread that periodically identifies replication-requests
- *     that never made it.
- *
- ***************************************************/
+/**
+ * PendingReplicationBlocks is used in the BlockManager to track blocks that are
+ * currently being replicated on disk and in the CacheReplicationManager to
+ * track blocks that are currently being cached.
+ * 
+ * <p>
+ * PendingReplicationBlocks performs the following tasks:
+ * </p>
+ * 
+ * <ol>
+ * <li>tracks in-flight replication or caching requests for a block at target
+ * datanodes.</li>
+ * <li>identifies requests that have timed out and need to be rescheduled at a
+ * different datanode.</li>
+ * </ol>
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
 class PendingReplicationBlocks {
   private static final Log LOG = BlockManager.LOG;