You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by wa...@apache.org on 2013/09/14 01:27:23 UTC

svn commit: r1523145 [2/2] - in /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ sr...

Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java?rev=1523145&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java Fri Sep 13 23:27:22 2013
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Handles common operations of processing a block report from a datanode,
+ * generating a diff of updates to the BlocksMap, and then feeding the diff
+ * to the subclass-implemented hooks.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+public abstract class ReportProcessor {
+
+  static final Log blockLog = NameNode.blockStateChangeLog;
+  private final String className = getClass().getSimpleName();
+  // Max number of blocks to log info about during a block report.
+  final long maxNumBlocksToLog;
+
+  void blockLogDebug(String message) {
+    if (blockLog.isDebugEnabled()) {
+      blockLog.info("BLOCK* " + className + message);
+    }
+  }
+
+  void blockLogInfo(String message) {
+    if (blockLog.isInfoEnabled()) {
+      blockLog.info("BLOCK* " + className + message);
+    }
+  }
+
+  void blockLogWarn(String message) {
+    blockLog.warn("BLOCK* " + className + message);
+  }
+
+  void logAddStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
+    if (!blockLog.isInfoEnabled()) {
+      return;
+    }
+    StringBuilder sb = new StringBuilder(500);
+    sb.append("BLOCK* " + className + "#addStoredBlock: blockMap updated: ")
+      .append(node)
+      .append(" is added to ");
+    storedBlock.appendStringTo(sb);
+    sb.append(" size " )
+      .append(storedBlock.getNumBytes());
+    blockLog.info(sb);
+  }
+
+  public ReportProcessor(Configuration conf) {
+    this.maxNumBlocksToLog = conf.getLong(
+        DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
+        DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
+  }
+
+  /**
+   * Processes a block report from a datanode, updating the block to
+   * datanode mapping, adding new blocks and removing invalid ones.
+   * Also computes and queues new replication and invalidation work.
+   * @param node Datanode sending the block report
+   * @param report as list of longs
+   * @throws IOException
+   */
+  final void processReport(final DatanodeDescriptor node,
+      final BlockListAsLongs report) throws IOException {
+    // Normal case:
+    // Modify the (block-->datanode) map, according to the difference
+    // between the old and new block report.
+    //
+    Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
+    Collection<Block> toRemove = new LinkedList<Block>();
+    Collection<Block> toInvalidate = new LinkedList<Block>();
+    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
+    Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+    reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
+
+    // Process the blocks on each queue
+    for (StatefulBlockInfo b : toUC) {
+      addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
+    }
+    for (Block b : toRemove) {
+      removeStoredBlock(b, node);
+    }
+    int numBlocksLogged = 0;
+    for (BlockInfo b : toAdd) {
+      addStoredBlock(b, node, null, numBlocksLogged < maxNumBlocksToLog);
+      numBlocksLogged++;
+    }
+
+    if (numBlocksLogged > maxNumBlocksToLog) {
+      blockLogInfo("#processReport: logged"
+          + " info for " + maxNumBlocksToLog
+          + " of " + numBlocksLogged + " reported.");
+    }
+    for (Block b : toInvalidate) {
+      blockLogInfo("#processReport: "
+          + b + " on " + node + " size " + b.getNumBytes()
+          + " does not belong to any file");
+      addToInvalidates(b, node);
+    }
+    for (BlockToMarkCorrupt b : toCorrupt) {
+      markBlockAsCorrupt(b, node);
+    }
+  }
+
+  /**
+   * Compute the difference between the current state of the datanode in the
+   * BlocksMap and the new reported state, categorizing changes into
+   * different groups (e.g. new blocks to be added, blocks that were removed,
+   * blocks that should be invalidated, etc.).
+   */
+  private void reportDiff(DatanodeDescriptor dn,
+      BlockListAsLongs newReport,
+      Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
+      Collection<Block> toRemove,           // remove from DatanodeDescriptor
+      Collection<Block> toInvalidate,       // should be removed from DN
+      Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
+      Collection<StatefulBlockInfo> toUC) { // add to under-construction list
+    // place a delimiter in the list which separates blocks
+    // that have been reported from those that have not
+    BlockInfo delimiter = new BlockInfo(new Block(), 1);
+    boolean added = addBlock(dn, delimiter);
+    assert added : "Delimiting block cannot be present in the node";
+    int headIndex = 0; //currently the delimiter is in the head of the list
+    int curIndex;
+
+    if (newReport == null) {
+      newReport = new BlockListAsLongs();
+    }
+    // scan the report and process newly reported blocks
+    BlockReportIterator itBR = newReport.getBlockReportIterator();
+    while (itBR.hasNext()) {
+      Block iblk = itBR.next();
+      ReplicaState iState = itBR.getCurrentReplicaState();
+      BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
+                                  toAdd, toInvalidate, toCorrupt, toUC);
+      // move block to the head of the list
+      if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) {
+        headIndex = moveBlockToHead(dn, storedBlock, curIndex, headIndex);
+      }
+    }
+    // collect blocks that have not been reported
+    // all of them are next to the delimiter
+    Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator(
+        delimiter.getNext(0), dn);
+    while (it.hasNext()) {
+      toRemove.add(it.next());
+    }
+    removeBlock(dn, delimiter);
+  }
+
+  // Operations on the blocks on a datanode
+
+  abstract int moveBlockToHead(DatanodeDescriptor dn, BlockInfo storedBlock,
+      int curIndex, int headIndex);
+
+  abstract boolean addBlock(DatanodeDescriptor dn, BlockInfo block);
+
+  abstract boolean removeBlock(DatanodeDescriptor dn, BlockInfo block);
+
+  // Cache report processing
+
+  abstract BlockInfo processReportedBlock(DatanodeDescriptor dn, Block iblk,
+      ReplicaState iState, Collection<BlockInfo> toAdd,
+      Collection<Block> toInvalidate, Collection<BlockToMarkCorrupt> toCorrupt,
+      Collection<StatefulBlockInfo> toUC);
+
+  // Hooks for processing the cache report diff
+
+  abstract Block addStoredBlock(final BlockInfo block,
+      DatanodeDescriptor node, DatanodeDescriptor delNodeHint,
+      boolean logEveryBlock) throws IOException;
+
+  abstract void removeStoredBlock(Block block, DatanodeDescriptor node);
+
+  abstract void markBlockAsCorrupt(BlockToMarkCorrupt b, DatanodeInfo dn)
+      throws IOException;
+
+  abstract void addToInvalidates(final Block b, final DatanodeInfo node);
+
+  abstract void addStoredBlockUnderConstruction(
+      BlockInfoUnderConstruction storedBlock, DatanodeDescriptor node,
+      ReplicaState reportedState) throws IOException;
+
+  /**
+   * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
+   * list of blocks that should be considered corrupt due to a block report.
+   */
+  static class BlockToMarkCorrupt {
+    /** The corrupted block in a datanode. */
+    final BlockInfo corrupted;
+    /** The corresponding block stored in the BlockManager. */
+    final BlockInfo stored;
+    /** The reason to mark corrupt. */
+    final String reason;
+
+    BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason) {
+      Preconditions.checkNotNull(corrupted, "corrupted is null");
+      Preconditions.checkNotNull(stored, "stored is null");
+
+      this.corrupted = corrupted;
+      this.stored = stored;
+      this.reason = reason;
+    }
+
+    BlockToMarkCorrupt(BlockInfo stored, String reason) {
+      this(stored, stored, reason);
+    }
+
+    BlockToMarkCorrupt(BlockInfo stored, long gs, String reason) {
+      this(new BlockInfo(stored), stored, reason);
+      //the corrupted block in datanode has a different generation stamp
+      corrupted.setGenerationStamp(gs);
+    }
+
+    @Override
+    public String toString() {
+      return corrupted + "("
+          + (corrupted == stored? "same as stored": "stored=" + stored) + ")";
+    }
+  }
+
+  /**
+   * StatefulBlockInfo is used to build the "toUC" list, which is a list of
+   * updates to the information about under-construction blocks.
+   * Besides the block in question, it provides the ReplicaState
+   * reported by the datanode in the block report.
+   */
+  static class StatefulBlockInfo {
+    final BlockInfoUnderConstruction storedBlock;
+    final ReplicaState reportedState;
+
+    StatefulBlockInfo(BlockInfoUnderConstruction storedBlock,
+        ReplicaState reportedState) {
+      this.storedBlock = storedBlock;
+      this.reportedState = reportedState;
+    }
+  }
+
+}

Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java?rev=1523145&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java Fri Sep 13 23:27:22 2013
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * Subclass of InvalidateBlocks used by the CacheReplicationManager to
+ * track blocks on each storage that are scheduled to be uncached.
+ */
+@InterfaceAudience.Private
+public class UncacheBlocks extends InvalidateBlocks {
+
+  UncacheBlocks() {
+  }
+
+  @Override
+  synchronized List<Block> invalidateWork(
+      final String storageId, final DatanodeDescriptor dn) {
+    final List<Block> toInvalidate = pollNumBlocks(storageId, Integer.MAX_VALUE);
+    if (toInvalidate != null) {
+      dn.addBlocksToBeUncached(toInvalidate);
+    }
+    return toInvalidate;
+  }
+}

Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Fri Sep 13 23:27:22 2013
@@ -552,10 +552,12 @@ class BPOfferService {
     case DatanodeProtocol.DNA_CACHE:
       LOG.info("DatanodeCommand action: DNA_CACHE");
       dn.getFSDataset().cache(bcmd.getBlockPoolId(), bcmd.getBlocks());
+      dn.metrics.incrBlocksCached(bcmd.getBlocks().length);
       break;
     case DatanodeProtocol.DNA_UNCACHE:
       LOG.info("DatanodeCommand action: DNA_UNCACHE");
       dn.getFSDataset().uncache(bcmd.getBlockPoolId(), bcmd.getBlocks());
+      dn.metrics.incrBlocksUncached(bcmd.getBlocks().length);
       break;
     case DatanodeProtocol.DNA_SHUTDOWN:
       // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Fri Sep 13 23:27:22 2013
@@ -449,11 +449,24 @@ class BPServiceActor implements Runnable
     DatanodeCommand cmd = null;
     long startTime = Time.monotonicNow();
     if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
-      // TODO: Implement me!
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Sending cacheReport from service actor: " + this);
+      }
+      lastCacheReport = startTime;
+
       String bpid = bpos.getBlockPoolId();
       BlockListAsLongs blocks = dn.getFSDataset().getCacheReport(bpid);
+      long createTime = Time.monotonicNow();
+
       cmd = bpNamenode.cacheReport(bpRegistration, bpid,
           blocks.getBlockListAsLongs());
+      long sendTime = Time.monotonicNow();
+      long createCost = createTime - startTime;
+      long sendCost = sendTime - createTime;
+      dn.getMetrics().addCacheReport(sendCost);
+      LOG.info("CacheReport of " + blocks.getNumberOfBlocks()
+          + " blocks took " + createCost + " msec to generate and "
+          + sendCost + " msecs for RPC and NN processing");
     }
     return cmd;
   }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Fri Sep 13 23:27:22 2013
@@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
@@ -114,9 +116,9 @@ public class DNConf {
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
     this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
-    DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
-    this.cacheReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
         DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
+    this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
+        DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT);
     
     long initBRDelay = conf.getLong(
         DFS_BLOCKREPORT_INITIAL_DELAY_KEY,

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Fri Sep 13 23:27:22 2013
@@ -105,10 +105,10 @@ public class FsDatasetCache {
    */
   List<Block> getCachedBlocks(String bpid) {
     List<Block> blocks = new ArrayList<Block>();
-    MappableBlock mapBlock = null;
     // ConcurrentHashMap iteration doesn't see latest updates, which is okay
-    for (Iterator<MappableBlock> it = cachedBlocks.values().iterator();
-        it.hasNext(); mapBlock = it.next()) {
+    Iterator<MappableBlock> it = cachedBlocks.values().iterator();
+    while (it.hasNext()) {
+      MappableBlock mapBlock = it.next();
       if (mapBlock.getBlockPoolId().equals(bpid)) {
         blocks.add(mapBlock.getBlock());
       }
@@ -174,12 +174,15 @@ public class FsDatasetCache {
         mapBlock.getBlockPoolId().equals(bpid) &&
         mapBlock.getBlock().equals(block)) {
       mapBlock.close();
-      cachedBlocks.remove(mapBlock);
+      cachedBlocks.remove(block.getBlockId());
       long bytes = mapBlock.getNumBytes();
       long used = usedBytes.get();
       while (!usedBytes.compareAndSet(used, used - bytes)) {
         used = usedBytes.get();
       }
+      LOG.info("Successfully uncached block " + block);
+    } else {
+      LOG.info("Could not uncache block " + block + ": unknown block.");
     }
   }
 
@@ -219,6 +222,7 @@ public class FsDatasetCache {
           used = usedBytes.get();
         }
       } else {
+        LOG.info("Successfully cached block " + block.getBlock());
         cachedBlocks.put(block.getBlock().getBlockId(), block);
       }
     }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Fri Sep 13 23:27:22 2013
@@ -57,6 +57,8 @@ public class DataNodeMetrics {
   @Metric MutableCounterLong blocksRemoved;
   @Metric MutableCounterLong blocksVerified;
   @Metric MutableCounterLong blockVerificationFailures;
+  @Metric MutableCounterLong blocksCached;
+  @Metric MutableCounterLong blocksUncached;
   @Metric MutableCounterLong readsFromLocalClient;
   @Metric MutableCounterLong readsFromRemoteClient;
   @Metric MutableCounterLong writesFromLocalClient;
@@ -74,6 +76,7 @@ public class DataNodeMetrics {
   @Metric MutableRate replaceBlockOp;
   @Metric MutableRate heartbeats;
   @Metric MutableRate blockReports;
+  @Metric MutableRate cacheReports;
   @Metric MutableRate packetAckRoundTripTimeNanos;
   MutableQuantiles[] packetAckRoundTripTimeNanosQuantiles;
   
@@ -151,6 +154,10 @@ public class DataNodeMetrics {
     blockReports.add(latency);
   }
 
+  public void addCacheReport(long latency) {
+    cacheReports.add(latency);
+  }
+
   public void incrBlocksReplicated(int delta) {
     blocksReplicated.incr(delta);
   }
@@ -175,6 +182,15 @@ public class DataNodeMetrics {
     blocksVerified.incr();
   }
 
+
+  public void incrBlocksCached(int delta) {
+    blocksCached.incr(delta);
+  }
+
+  public void incrBlocksUncached(int delta) {
+    blocksUncached.incr(delta);
+  }
+
   public void addReadBlockOp(long latency) {
     readBlockOp.add(latency);
   }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Fri Sep 13 23:27:22 2013
@@ -26,9 +26,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -51,7 +51,7 @@ import org.apache.hadoop.util.Fallible;
 /**
  * The Cache Manager handles caching on DataNodes.
  */
-final class CacheManager {
+public final class CacheManager {
   public static final Log LOG = LogFactory.getLog(CacheManager.class);
 
   /**
@@ -70,6 +70,12 @@ final class CacheManager {
       new TreeMap<PathBasedCacheDirective, PathBasedCacheEntry>();
 
   /**
+   * Cache entries, sorted by path
+   */
+  private final TreeMap<String, List<PathBasedCacheEntry>> entriesByPath =
+      new TreeMap<String, List<PathBasedCacheEntry>>();
+
+  /**
    * Cache pools, sorted by name.
    */
   private final TreeMap<String, CachePool> cachePools =
@@ -90,9 +96,14 @@ final class CacheManager {
    */
   private final int maxListCacheDirectivesResponses;
 
-  CacheManager(FSDirectory dir, Configuration conf) {
+  final private FSNamesystem namesystem;
+  final private FSDirectory dir;
+
+  CacheManager(FSNamesystem namesystem, FSDirectory dir, Configuration conf) {
     // TODO: support loading and storing of the CacheManager state
     clear();
+    this.namesystem = namesystem;
+    this.dir = dir;
     maxListCachePoolsResponses = conf.getInt(
         DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
         DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
@@ -104,6 +115,7 @@ final class CacheManager {
   synchronized void clear() {
     entriesById.clear();
     entriesByDirective.clear();
+    entriesByPath.clear();
     cachePools.clear();
     nextEntryId = 1;
   }
@@ -131,7 +143,8 @@ final class CacheManager {
     try {
       directive.validate();
     } catch (IOException ioe) {
-      LOG.info("addDirective " + directive + ": validation failed.");
+      LOG.info("addDirective " + directive + ": validation failed: "
+          + ioe.getClass().getName() + ": " + ioe.getMessage());
       return new Fallible<PathBasedCacheEntry>(ioe);
     }
     // Check if we already have this entry.
@@ -152,8 +165,34 @@ final class CacheManager {
     }
     LOG.info("addDirective " + directive + ": added cache directive "
         + directive);
+
+    // Success!
+    // First, add it to the various maps
     entriesByDirective.put(directive, entry);
     entriesById.put(entry.getEntryId(), entry);
+    String path = directive.getPath();
+    List<PathBasedCacheEntry> entryList = entriesByPath.get(path);
+    if (entryList == null) {
+      entryList = new ArrayList<PathBasedCacheEntry>(1);
+      entriesByPath.put(path, entryList);
+    }
+    entryList.add(entry);
+
+    // Next, set the path as cached in the namesystem
+    try {
+      INode node = dir.getINode(directive.getPath());
+      if (node.isFile()) {
+        INodeFile file = node.asFile();
+        // TODO: adjustable cache replication factor
+        namesystem.setCacheReplicationInt(directive.getPath(),
+            file.getBlockReplication());
+      }
+    } catch (IOException ioe) {
+      LOG.info("addDirective " + directive +": failed to cache file: " +
+          ioe.getClass().getName() +": " + ioe.getMessage());
+      return new Fallible<PathBasedCacheEntry>(ioe);
+    }
+
     return new Fallible<PathBasedCacheEntry>(entry);
   }
 
@@ -201,7 +240,31 @@ final class CacheManager {
       return new Fallible<Long>(
           new UnexpectedRemovePathBasedCacheEntryException(entryId));
     }
+    // Remove the corresponding entry in entriesByPath.
+    String path = existing.getDirective().getPath();
+    List<PathBasedCacheEntry> entries = entriesByPath.get(path);
+    if (entries == null || !entries.remove(existing)) {
+      return new Fallible<Long>(
+          new UnexpectedRemovePathBasedCacheEntryException(entryId));
+    }
+    if (entries.size() == 0) {
+      entriesByPath.remove(path);
+    }
     entriesById.remove(entryId);
+
+    // Set the path as uncached in the namesystem
+    try {
+      INode node = dir.getINode(existing.getDirective().getPath());
+      if (node.isFile()) {
+        namesystem.setCacheReplicationInt(existing.getDirective().getPath(),
+            (short) 0);
+      }
+    } catch (IOException e) {
+      LOG.warn("removeEntry " + entryId + ": failure while setting cache"
+          + " replication factor", e);
+      return new Fallible<Long>(e);
+    }
+    LOG.info("removeEntry successful for PathCacheEntry id " + entryId);
     return new Fallible<Long>(entryId);
   }
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Sep 13 23:27:22 2013
@@ -1092,6 +1092,52 @@ public class FSDirectory implements Clos
   }
 
   /**
+   * Set cache replication for a file
+   * 
+   * @param src file name
+   * @param replication new replication
+   * @param blockRepls block replications - output parameter
+   * @return array of file blocks
+   * @throws QuotaExceededException
+   * @throws SnapshotAccessControlException
+   */
+  Block[] setCacheReplication(String src, short replication, short[] blockRepls)
+      throws QuotaExceededException, UnresolvedLinkException,
+      SnapshotAccessControlException {
+    waitForReady();
+    writeLock();
+    try {
+      return unprotectedSetCacheReplication(src, replication, blockRepls);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  Block[] unprotectedSetCacheReplication(String src, short replication,
+      short[] blockRepls) throws QuotaExceededException,
+      UnresolvedLinkException, SnapshotAccessControlException {
+    assert hasWriteLock();
+
+    final INodesInPath iip = rootDir.getINodesInPath4Write(src, true);
+    final INode inode = iip.getLastINode();
+    if (inode == null || !inode.isFile()) {
+      return null;
+    }
+    INodeFile file = inode.asFile();
+    final short oldBR = file.getCacheReplication();
+
+    // TODO: Update quotas here as repl goes up or down
+    file.setCacheReplication(replication);
+    final short newBR = file.getCacheReplication();
+
+    if (blockRepls != null) {
+      blockRepls[0] = oldBR;
+      blockRepls[1] = newBR;
+    }
+    return file.getBlocks();
+  }
+
+  /**
    * @param path the file path
    * @return the block size of the file. 
    */

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Sep 13 23:27:22 2013
@@ -367,6 +367,7 @@ public class FSNamesystem implements Nam
   private final BlockManager blockManager;
   private final SnapshotManager snapshotManager;
   private final CacheManager cacheManager;
+  private final CacheReplicationManager cacheReplicationManager;
   private final DatanodeStatistics datanodeStatistics;
 
   // Block pool ID used by this namenode
@@ -694,7 +695,9 @@ public class FSNamesystem implements Nam
       this.dtSecretManager = createDelegationTokenSecretManager(conf);
       this.dir = new FSDirectory(fsImage, this, conf);
       this.snapshotManager = new SnapshotManager(dir);
-      this.cacheManager= new CacheManager(dir, conf);
+      this.cacheManager = new CacheManager(this, dir, conf);
+      this.cacheReplicationManager = new CacheReplicationManager(this,
+          blockManager, blockManager.getDatanodeManager(), this, conf);
       this.safeMode = new SafeModeInfo(conf);
       this.auditLoggers = initAuditLoggers(conf);
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
@@ -871,6 +874,7 @@ public class FSNamesystem implements Nam
         getCompleteBlocksTotal());
       setBlockTotal();
       blockManager.activate(conf);
+      cacheReplicationManager.activate();
     } finally {
       writeUnlock();
     }
@@ -887,6 +891,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       if (blockManager != null) blockManager.close();
+      if (cacheReplicationManager != null) cacheReplicationManager.close();
     } finally {
       writeUnlock();
     }
@@ -917,7 +922,9 @@ public class FSNamesystem implements Nam
         blockManager.getDatanodeManager().markAllDatanodesStale();
         blockManager.clearQueues();
         blockManager.processAllPendingDNMessages();
-        
+
+        cacheReplicationManager.clearQueues();
+
         if (!isInSafeMode() ||
             (isInSafeMode() && safeMode.isPopulatingReplQueues())) {
           LOG.info("Reprocessing replication and invalidation queues");
@@ -1910,6 +1917,42 @@ public class FSNamesystem implements Nam
     return isFile;
   }
 
+  boolean setCacheReplicationInt(String src, final short replication)
+      throws IOException {
+    final boolean isFile;
+    FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot set replication for " + src, safeMode);
+      }
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      if (isPermissionEnabled) {
+        checkPathAccess(pc, src, FsAction.WRITE);
+      }
+
+      final short[] blockRepls = new short[2]; // 0: old, 1: new
+      final Block[] blocks = dir.setCacheReplication(src, replication,
+          blockRepls);
+      isFile = (blocks != null);
+      if (isFile) {
+        cacheReplicationManager.setCacheReplication(blockRepls[0],
+            blockRepls[1], src, blocks);
+      }
+    } finally {
+      writeUnlock();
+    }
+
+    getEditLog().logSync();
+    if (isFile) {
+      logAuditEvent(true, "setReplication", src);
+    }
+    return isFile;
+  }
+
   long getPreferredBlockSize(String filename) 
       throws IOException, UnresolvedLinkException {
     FSPermissionChecker pc = getPermissionChecker();
@@ -6391,6 +6434,14 @@ public class FSNamesystem implements Nam
   public FSDirectory getFSDirectory() {
     return dir;
   }
+  /** @return the cache manager. */
+  public CacheManager getCacheManager() {
+    return cacheManager;
+  }
+  /** @return the cache replication manager. */
+  public CacheReplicationManager getCacheReplicationManager() {
+    return cacheReplicationManager;
+  }
 
   @Override  // NameNodeMXBean
   public String getCorruptFiles() {
@@ -6959,10 +7010,6 @@ public class FSNamesystem implements Nam
     return results;
   }
 
-  public CacheManager getCacheManager() {
-    return cacheManager;
-  }
-
   /**
    * Default AuditLogger implementation; used when no access logger is
    * defined in the config file. It can also be explicitly listed in the

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Fri Sep 13 23:27:22 2013
@@ -104,6 +104,8 @@ public class INodeFile extends INodeWith
 
   private BlockInfo[] blocks;
 
+  private short cacheReplication = 0;
+
   INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime, long atime,
       BlockInfo[] blklist, short replication, long preferredBlockSize) {
     super(id, name, permissions, mtime, atime);
@@ -199,6 +201,18 @@ public class INodeFile extends INodeWith
     return nodeToUpdate;
   }
 
+  @Override
+  public void setCacheReplication(short cacheReplication) {
+    Preconditions.checkArgument(cacheReplication <= getBlockReplication(),
+        "Cannot set cache replication higher than block replication factor");
+    this.cacheReplication = cacheReplication;
+  }
+
+  @Override
+  public short getCacheReplication() {
+    return cacheReplication;
+  }
+
   /** @return preferred block size (in bytes) of the file. */
   @Override
   public long getPreferredBlockSize() {

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Fri Sep 13 23:27:22 2013
@@ -968,7 +968,14 @@ class NameNodeRpcServer implements Namen
       String poolId, long[] blocks) throws IOException {
     verifyRequest(nodeReg);
     BlockListAsLongs blist = new BlockListAsLongs(blocks);
-    namesystem.getBlockManager().processCacheReport(nodeReg, poolId, blist);
+    if (blockStateChangeLog.isDebugEnabled()) {
+      blockStateChangeLog.debug("*BLOCK* NameNode.cacheReport: "
+           + "from " + nodeReg + " " + blist.getNumberOfBlocks()
+           + " blocks");
+    }
+
+    namesystem.getCacheReplicationManager()
+        .processCacheReport(nodeReg, poolId, blist);
     return null;
   }
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java Fri Sep 13 23:27:22 2013
@@ -79,6 +79,8 @@ public class NameNodeMetrics {
   MutableCounterLong transactionsBatchedInSync;
   @Metric("Block report") MutableRate blockReport;
   MutableQuantiles[] blockReportQuantiles;
+  @Metric("Cache report") MutableRate cacheReport;
+  MutableQuantiles[] cacheReportQuantiles;
 
   @Metric("Duration in SafeMode at startup") MutableGaugeInt safeModeTime;
   @Metric("Time loading FS Image at startup") MutableGaugeInt fsImageLoadTime;
@@ -89,6 +91,7 @@ public class NameNodeMetrics {
     final int len = intervals.length;
     syncsQuantiles = new MutableQuantiles[len];
     blockReportQuantiles = new MutableQuantiles[len];
+    cacheReportQuantiles = new MutableQuantiles[len];
     
     for (int i = 0; i < len; i++) {
       int interval = intervals[i];
@@ -98,6 +101,9 @@ public class NameNodeMetrics {
       blockReportQuantiles[i] = registry.newQuantiles(
           "blockReport" + interval + "s", 
           "Block report", "ops", "latency", interval);
+      cacheReportQuantiles[i] = registry.newQuantiles(
+          "cacheReport" + interval + "s",
+          "Cache report", "ops", "latency", interval);
     }
   }
 
@@ -227,6 +233,13 @@ public class NameNodeMetrics {
     }
   }
 
+  public void addCacheBlockReport(long latency) {
+    cacheReport.add(latency);
+    for (MutableQuantiles q : cacheReportQuantiles) {
+      q.add(latency);
+    }
+  }
+
   public void setSafeModeTime(long elapsed) {
     safeModeTime.set((int) elapsed);
   }

Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java?rev=1523145&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java Fri Sep 13 23:27:22 2013
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.Fallible;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCacheReplicationManager {
+
+  // Most Linux installs allow a default of 64KB locked memory
+  private static final long CACHE_CAPACITY = 64 * 1024;
+  private static final long BLOCK_SIZE = 4096;
+
+  private static Configuration conf;
+  private static MiniDFSCluster cluster = null;
+  private static FileSystem fs;
+  private static NameNode nn;
+  private static NamenodeProtocols nnRpc;
+  private static CacheReplicationManager cacheReplManager;
+  final private static FileSystemTestHelper helper = new FileSystemTestHelper();
+  private static Path rootDir;
+
+  @Before
+  public void setUp() throws Exception {
+
+    assumeTrue(NativeIO.isAvailable());
+
+    conf = new HdfsConfiguration();
+    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+        CACHE_CAPACITY);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true);
+    conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
+
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build();
+    cluster.waitActive();
+
+    fs = cluster.getFileSystem();
+    nn = cluster.getNameNode();
+    nnRpc = nn.getRpcServer();
+    cacheReplManager = nn.getNamesystem().getCacheReplicationManager();
+    rootDir = helper.getDefaultWorkingDirectory(fs);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.close();
+    }
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private int countNumCachedBlocks() {
+    return cacheReplManager.cachedBlocksMap.size();
+  }
+
+  private void waitForExpectedNumCachedBlocks(final int expected)
+      throws Exception {
+    int actual = countNumCachedBlocks();
+    while (expected != actual)  {
+      Thread.sleep(500);
+      actual = countNumCachedBlocks();
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testCachePaths() throws Exception {
+    // Create the pool
+    final String pool = "friendlyPool";
+    nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
+    // Create some test files
+    final int numFiles = 3;
+    final int numBlocksPerFile = 2;
+    final List<String> paths = new ArrayList<String>(numFiles);
+    for (int i=0; i<numFiles; i++) {
+      Path p = new Path(rootDir, "testCachePaths-" + i);
+      FileSystemTestHelper.createFile(fs, p, numBlocksPerFile, (int)BLOCK_SIZE);
+      paths.add(p.toUri().getPath());
+    }
+    // Check the initial statistics at the namenode
+    int expected = 0;
+    waitForExpectedNumCachedBlocks(expected);
+    // Cache and check each path in sequence
+    for (int i=0; i<numFiles; i++) {
+      List<PathBasedCacheDirective> toAdd =
+          new ArrayList<PathBasedCacheDirective>();
+      toAdd.add(new PathBasedCacheDirective(paths.get(i), pool));
+      List<Fallible<PathBasedCacheEntry>> fallibles =
+          nnRpc.addPathBasedCacheDirectives(toAdd);
+      assertEquals("Unexpected number of fallibles",
+          1, fallibles.size());
+      PathBasedCacheEntry entry = fallibles.get(0).get();
+      PathBasedCacheDirective directive = entry.getDirective();
+      assertEquals("Directive does not match requested path", paths.get(i),
+          directive.getPath());
+      assertEquals("Directive does not match requested pool", pool,
+          directive.getPool());
+      expected += numBlocksPerFile;
+      waitForExpectedNumCachedBlocks(expected);
+    }
+    // Uncache and check each path in sequence
+    RemoteIterator<PathBasedCacheEntry> entries =
+        nnRpc.listPathBasedCacheEntries(0, null, null);
+    for (int i=0; i<numFiles; i++) {
+      PathBasedCacheEntry entry = entries.next();
+      List<Long> toRemove = new ArrayList<Long>();
+      toRemove.add(entry.getEntryId());
+      List<Fallible<Long>> fallibles = nnRpc.removePathBasedCacheEntries(toRemove);
+      assertEquals("Unexpected number of fallibles", 1, fallibles.size());
+      Long l = fallibles.get(0).get();
+      assertEquals("Removed entryId does not match requested",
+          entry.getEntryId(), l.longValue());
+      expected -= numBlocksPerFile;
+      waitForExpectedNumCachedBlocks(expected);
+    }
+  }
+}

Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java
------------------------------------------------------------------------------
    svn:eol-style = native