You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/10/17 00:15:34 UTC

svn commit: r1532924 [2/2] - in /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/datanod...

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Wed Oct 16 22:15:33 2013
@@ -17,53 +17,97 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT;
 
+import java.io.Closeable;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
-import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
+import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
+import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.UnexpectedAddPathBasedCacheDirectiveException;
+import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError;
 import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
 import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException;
 import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException;
-import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.RemovePermissionDeniedException;
 import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.UnexpectedRemovePathBasedCacheDescriptorException;
+import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.RemovePermissionDeniedException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.GSet;
+import org.apache.hadoop.util.LightWeightGSet;
+import org.apache.hadoop.util.Time;
 
-import com.google.common.base.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * The Cache Manager handles caching on DataNodes.
+ *
+ * This class is instantiated by the FSNamesystem when caching is enabled.
+ * It maintains the mapping of cached blocks to datanodes via processing
+ * datanode cache reports. Based on these reports and addition and removal of
+ * caching directives, we will schedule caching and uncaching work.
  */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
 public final class CacheManager {
   public static final Log LOG = LogFactory.getLog(CacheManager.class);
 
+  // TODO: add pending / underCached / schedule cached blocks stats.
+
+  /**
+   * The FSNamesystem that contains this CacheManager.
+   */
+  private final FSNamesystem namesystem;
+
+  /**
+   * The BlockManager associated with the FSN that owns this CacheManager.
+   */
+  private final BlockManager blockManager;
+
   /**
    * Cache entries, sorted by ID.
    *
@@ -74,6 +118,12 @@ public final class CacheManager {
       new TreeMap<Long, PathBasedCacheEntry>();
 
   /**
+   * The entry ID to use for a new entry.  Entry IDs always increase, and are
+   * never reused.
+   */
+  private long nextEntryId;
+
+  /**
    * Cache entries, sorted by path
    */
   private final TreeMap<String, List<PathBasedCacheEntry>> entriesByPath =
@@ -86,11 +136,6 @@ public final class CacheManager {
       new TreeMap<String, CachePool>();
 
   /**
-   * The entry ID to use for a new entry.
-   */
-  private long nextEntryId;
-
-  /**
    * Maximum number of cache pools to list in one operation.
    */
   private final int maxListCachePoolsResponses;
@@ -100,44 +145,129 @@ public final class CacheManager {
    */
   private final int maxListCacheDescriptorsResponses;
 
-  final private FSNamesystem namesystem;
-  final private FSDirectory dir;
+  /**
+   * Interval between scans in milliseconds.
+   */
+  private final long scanIntervalMs;
 
-  CacheManager(FSNamesystem namesystem, FSDirectory dir, Configuration conf) {
-    clear();
+  /**
+   * Whether caching is enabled.
+   *
+   * If caching is disabled, we will not process cache reports or store
+   * information about what is cached where.  We also do not start the
+   * CacheReplicationMonitor thread.  This will save resources, but provide
+   * less functionality.
+   *     
+   * Even when caching is disabled, we still store path-based cache
+   * information.  This information is stored in the edit log and fsimage.  We
+   * don't want to lose it just because a configuration setting was turned off.
+   * However, we will not act on this information if caching is disabled.
+   */
+  private final boolean enabled;
+
+  /**
+   * Whether the CacheManager is active.
+   * 
+   * When the CacheManager is active, it tells the DataNodes what to cache
+   * and uncache.  The CacheManager cannot become active if enabled = false.
+   */
+  private boolean active = false;
+
+  /**
+   * All cached blocks.
+   */
+  private final GSet<CachedBlock, CachedBlock> cachedBlocks;
+
+  /**
+   * The CacheReplicationMonitor.
+   */
+  private CacheReplicationMonitor monitor;
+
+  CacheManager(FSNamesystem namesystem, Configuration conf,
+      BlockManager blockManager) {
     this.namesystem = namesystem;
-    this.dir = dir;
-    maxListCachePoolsResponses = conf.getInt(
+    this.blockManager = blockManager;
+    this.nextEntryId = 1;
+    this.maxListCachePoolsResponses = conf.getInt(
         DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
         DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
-    maxListCacheDescriptorsResponses = conf.getInt(
+    this.maxListCacheDescriptorsResponses = conf.getInt(
         DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES,
         DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT);
+    scanIntervalMs = conf.getLong(
+        DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS,
+        DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT);
+    this.enabled = conf.getBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY,
+        DFS_NAMENODE_CACHING_ENABLED_DEFAULT);
+    this.cachedBlocks = !enabled ? null :
+        new LightWeightGSet<CachedBlock, CachedBlock>(
+            LightWeightGSet.computeCapacity(0.25, "cachedBlocks"));
   }
 
-  synchronized void clear() {
-    entriesById.clear();
-    entriesByPath.clear();
-    cachePools.clear();
-    nextEntryId = 1;
+  /**
+   * Activate the cache manager.
+   * 
+   * When the cache manager is active, tell the datanodes where to cache files.
+   */
+  public void activate() {
+    assert namesystem.hasWriteLock();
+    if (enabled && (!active)) {
+      LOG.info("Activating CacheManager.  " +
+          "Starting replication monitor thread...");
+      active = true;
+      monitor = new CacheReplicationMonitor(namesystem, this,
+         scanIntervalMs);
+      monitor.start();
+    }
   }
 
   /**
-   * Returns the next entry ID to be used for a PathBasedCacheEntry
+   * Deactivate the cache manager.
+   * 
+   * When the cache manager is inactive, it does not tell the datanodes where to
+   * cache files.
    */
-  synchronized long getNextEntryId() {
-    Preconditions.checkArgument(nextEntryId != Long.MAX_VALUE);
-    return nextEntryId++;
+  public void deactivate() {
+    assert namesystem.hasWriteLock();
+    if (active) {
+      LOG.info("Deactivating CacheManager.  " +
+          "stopping CacheReplicationMonitor thread...");
+      active = false;
+      IOUtils.closeQuietly(monitor);
+      monitor = null;
+      LOG.info("CacheReplicationMonitor thread stopped and deactivated.");
+    }
   }
 
   /**
-   * Returns the PathBasedCacheEntry corresponding to a PathBasedCacheEntry.
-   * 
-   * @param directive Lookup directive
-   * @return Corresponding PathBasedCacheEntry, or null if not present.
+   * Return true only if the cache manager is active.
+   * Must be called under the FSN read or write lock.
    */
-  private synchronized PathBasedCacheEntry
-      findEntry(PathBasedCacheDirective directive) {
+  public boolean isActive() {
+    return active;
+  }
+
+  public TreeMap<Long, PathBasedCacheEntry> getEntriesById() {
+    assert namesystem.hasReadOrWriteLock();
+    return entriesById;
+  }
+  
+  @VisibleForTesting
+  public GSet<CachedBlock, CachedBlock> getCachedBlocks() {
+    assert namesystem.hasReadOrWriteLock();
+    return cachedBlocks;
+  }
+
+  private long getNextEntryId() throws IOException {
+    assert namesystem.hasWriteLock();
+    if (nextEntryId == Long.MAX_VALUE) {
+      throw new IOException("No more available IDs");
+    }
+    return nextEntryId++;
+  }
+
+  private PathBasedCacheEntry findEntry(PathBasedCacheDirective directive) {
+    assert namesystem.hasReadOrWriteLock();
     List<PathBasedCacheEntry> existing =
         entriesByPath.get(directive.getPath().toUri().getPath());
     if (existing == null) {
@@ -151,56 +281,10 @@ public final class CacheManager {
     return null;
   }
 
-  /**
-   * Add a new PathBasedCacheEntry, skipping any validation checks. Called
-   * directly when reloading CacheManager state from FSImage.
-   * 
-   * @throws IOException if unable to cache the entry
-   */
-  private void unprotectedAddEntry(PathBasedCacheEntry entry)
-      throws IOException {
-    assert namesystem.hasWriteLock();
-    // Add it to the various maps
-    entriesById.put(entry.getEntryId(), entry);
-    String path = entry.getPath();
-    List<PathBasedCacheEntry> entryList = entriesByPath.get(path);
-    if (entryList == null) {
-      entryList = new ArrayList<PathBasedCacheEntry>(1);
-      entriesByPath.put(path, entryList);
-    }
-    entryList.add(entry);
-    // Set the path as cached in the namesystem
-    try {
-      INode node = dir.getINode(entry.getPath());
-      if (node != null && node.isFile()) {
-        INodeFile file = node.asFile();
-        // TODO: adjustable cache replication factor
-        namesystem.setCacheReplicationInt(entry.getPath(),
-            file.getBlockReplication());
-      } else {
-        LOG.warn("Path " + entry.getPath() + " is not a file");
-      }
-    } catch (IOException ioe) {
-      LOG.info("unprotectedAddEntry " + entry +": failed to cache file: " +
-          ioe.getClass().getName() +": " + ioe.getMessage());
-      throw ioe;
-    }
-  }
-
-  /**
-   * Add a new PathBasedCacheDirective if valid, returning a corresponding
-   * PathBasedCacheDescriptor to the user.
-   * 
-   * @param directive Directive describing the cache entry being added
-   * @param pc Permission checker used to validate that the calling user has
-   *          access to the destination cache pool
-   * @return Corresponding PathBasedCacheDescriptor for the new cache entry
-   * @throws IOException if the directive is invalid or was otherwise
-   *           unsuccessful
-   */
-  public synchronized PathBasedCacheDescriptor addDirective(
+  public PathBasedCacheDescriptor addDirective(
       PathBasedCacheDirective directive, FSPermissionChecker pc)
       throws IOException {
+    assert namesystem.hasWriteLock();
     CachePool pool = cachePools.get(directive.getPool());
     if (pool == null) {
       LOG.info("addDirective " + directive + ": pool not found.");
@@ -225,47 +309,37 @@ public final class CacheManager {
           "existing directive " + existing + " in this pool.");
       return existing.getDescriptor();
     }
-
-    // Success!
-    PathBasedCacheDescriptor d = unprotectedAddDirective(directive);
-    LOG.info("addDirective " + directive + ": added cache directive "
-        + directive);
-    return d;
-  }
-
-  /**
-   * Assigns a new entry ID to a validated PathBasedCacheDirective and adds
-   * it to the CacheManager. Called directly when replaying the edit log.
-   * 
-   * @param directive Directive being added
-   * @return PathBasedCacheDescriptor for the directive
-   * @throws IOException
-   */
-  PathBasedCacheDescriptor unprotectedAddDirective(
-      PathBasedCacheDirective directive) throws IOException {
-    assert namesystem.hasWriteLock();
-    CachePool pool = cachePools.get(directive.getPool());
     // Add a new entry with the next available ID.
     PathBasedCacheEntry entry;
-    entry = new PathBasedCacheEntry(getNextEntryId(),
-        directive.getPath().toUri().getPath(),
-        directive.getReplication(), pool);
-
-    unprotectedAddEntry(entry);
+    try {
+      entry = new PathBasedCacheEntry(getNextEntryId(),
+          directive.getPath().toUri().getPath(),
+          directive.getReplication(), pool);
+    } catch (IOException ioe) {
+      throw new UnexpectedAddPathBasedCacheDirectiveException(directive);
+    }
+    LOG.info("addDirective " + directive + ": added cache directive "
+        + directive);
 
+    // Success!
+    // First, add it to the various maps
+    entriesById.put(entry.getEntryId(), entry);
+    String path = directive.getPath().toUri().getPath();
+    List<PathBasedCacheEntry> entryList = entriesByPath.get(path);
+    if (entryList == null) {
+      entryList = new ArrayList<PathBasedCacheEntry>(1);
+      entriesByPath.put(path, entryList);
+    }
+    entryList.add(entry);
+    if (monitor != null) {
+      monitor.kick();
+    }
     return entry.getDescriptor();
   }
 
-  /**
-   * Remove the PathBasedCacheEntry corresponding to a descriptor ID from
-   * the CacheManager.
-   * 
-   * @param id of the PathBasedCacheDescriptor
-   * @param pc Permissions checker used to validated the request
-   * @throws IOException
-   */
-  public synchronized void removeDescriptor(long id, FSPermissionChecker pc)
+  public void removeDescriptor(long id, FSPermissionChecker pc)
       throws IOException {
+    assert namesystem.hasWriteLock();
     // Check for invalid IDs.
     if (id <= 0) {
       LOG.info("removeDescriptor " + id + ": invalid non-positive " +
@@ -290,20 +364,6 @@ public final class CacheManager {
       throw new RemovePermissionDeniedException(id);
     }
     
-    unprotectedRemoveDescriptor(id);
-  }
-
-  /**
-   * Unchecked internal method used to remove a PathBasedCacheEntry from the
-   * CacheManager. Called directly when replaying the edit log.
-   * 
-   * @param id of the PathBasedCacheDescriptor corresponding to the entry that
-   *          is being removed
-   * @throws IOException
-   */
-  void unprotectedRemoveDescriptor(long id) throws IOException {
-    assert namesystem.hasWriteLock();
-    PathBasedCacheEntry existing = entriesById.get(id);
     // Remove the corresponding entry in entriesByPath.
     String path = existing.getDescriptor().getPath().toUri().getPath();
     List<PathBasedCacheEntry> entries = entriesByPath.get(path);
@@ -314,26 +374,16 @@ public final class CacheManager {
       entriesByPath.remove(path);
     }
     entriesById.remove(id);
-
-    // Set the path as uncached in the namesystem
-    try {
-      INode node = dir.getINode(existing.getDescriptor().getPath().toUri().
-          getPath());
-      if (node != null && node.isFile()) {
-        namesystem.setCacheReplicationInt(existing.getDescriptor().getPath().
-            toUri().getPath(), (short) 0);
-      }
-    } catch (IOException e) {
-      LOG.warn("removeDescriptor " + id + ": failure while setting cache"
-          + " replication factor", e);
-      throw e;
+    if (monitor != null) {
+      monitor.kick();
     }
     LOG.info("removeDescriptor successful for PathCacheEntry id " + id);
   }
 
-  public synchronized BatchedListEntries<PathBasedCacheDescriptor> 
+  public BatchedListEntries<PathBasedCacheDescriptor> 
         listPathBasedCacheDescriptors(long prevId, String filterPool,
             String filterPath, FSPermissionChecker pc) throws IOException {
+    assert namesystem.hasReadOrWriteLock();
     final int NUM_PRE_ALLOCATED_ENTRIES = 16;
     if (filterPath != null) {
       if (!DFSUtil.isValidName(filterPath)) {
@@ -370,12 +420,13 @@ public final class CacheManager {
    * Create a cache pool.
    * 
    * Only the superuser should be able to call this function.
-   * 
-   * @param info The info for the cache pool to create.
-   * @return the created CachePool
+   *
+   * @param info    The info for the cache pool to create.
+   * @return        Information about the cache pool we created.
    */
-  public synchronized CachePoolInfo addCachePool(CachePoolInfo info)
+  public CachePoolInfo addCachePool(CachePoolInfo info)
       throws IOException {
+    assert namesystem.hasWriteLock();
     CachePoolInfo.validate(info);
     String poolName = info.getPoolName();
     CachePool pool = cachePools.get(poolName);
@@ -384,20 +435,8 @@ public final class CacheManager {
     }
     pool = CachePool.createFromInfoAndDefaults(info);
     cachePools.put(pool.getPoolName(), pool);
-    return pool.getInfo(true);
-  }
-
-  /**
-   * Internal unchecked method used to add a CachePool. Called directly when
-   * reloading CacheManager state from the FSImage or edit log.
-   * 
-   * @param pool to be added
-   */
-  void unprotectedAddCachePool(CachePoolInfo info) {
-    assert namesystem.hasWriteLock();
-    CachePool pool = CachePool.createFromInfo(info);
-    cachePools.put(pool.getPoolName(), pool);
     LOG.info("created new cache pool " + pool);
+    return pool.getInfo(true);
   }
 
   /**
@@ -408,8 +447,9 @@ public final class CacheManager {
    * @param info
    *          The info for the cache pool to modify.
    */
-  public synchronized void modifyCachePool(CachePoolInfo info)
+  public void modifyCachePool(CachePoolInfo info)
       throws IOException {
+    assert namesystem.hasWriteLock();
     CachePoolInfo.validate(info);
     String poolName = info.getPoolName();
     CachePool pool = cachePools.get(poolName);
@@ -455,8 +495,9 @@ public final class CacheManager {
    * @param poolName
    *          The name for the cache pool to remove.
    */
-  public synchronized void removeCachePool(String poolName)
+  public void removeCachePool(String poolName)
       throws IOException {
+    assert namesystem.hasWriteLock();
     CachePoolInfo.validateName(poolName);
     CachePool pool = cachePools.remove(poolName);
     if (pool == null) {
@@ -475,10 +516,14 @@ public final class CacheManager {
         iter.remove();
       }
     }
+    if (monitor != null) {
+      monitor.kick();
+    }
   }
 
-  public synchronized BatchedListEntries<CachePoolInfo>
+  public BatchedListEntries<CachePoolInfo>
       listCachePools(FSPermissionChecker pc, String prevKey) {
+    assert namesystem.hasReadOrWriteLock();
     final int NUM_PRE_ALLOCATED_ENTRIES = 16;
     ArrayList<CachePoolInfo> results = 
         new ArrayList<CachePoolInfo>(NUM_PRE_ALLOCATED_ENTRIES);
@@ -497,9 +542,104 @@ public final class CacheManager {
     return new BatchedListEntries<CachePoolInfo>(results, false);
   }
 
-  /*
-   * FSImage related serialization and deserialization code
-   */
+  public void setCachedLocations(LocatedBlock block) {
+    if (!enabled) {
+      return;
+    }
+    CachedBlock cachedBlock =
+        new CachedBlock(block.getBlock().getBlockId(),
+            (short)0, false);
+    cachedBlock = cachedBlocks.get(cachedBlock);
+    if (cachedBlock == null) {
+      return;
+    }
+    List<DatanodeDescriptor> datanodes = cachedBlock.getDatanodes(Type.CACHED);
+    for (DatanodeDescriptor datanode : datanodes) {
+      block.addCachedLoc(datanode);
+    }
+  }
+
+  public final void processCacheReport(final DatanodeID datanodeID,
+      final BlockListAsLongs report) throws IOException {
+    if (!enabled) {
+      LOG.info("Ignoring cache report from " + datanodeID +
+          " because " + DFS_NAMENODE_CACHING_ENABLED_KEY + " = false. " +
+          "number of blocks: " + report.getNumberOfBlocks());
+      return;
+    }
+    namesystem.writeLock();
+    final long startTime = Time.monotonicNow();
+    final long endTime;
+    try {
+      final DatanodeDescriptor datanode = 
+          blockManager.getDatanodeManager().getDatanode(datanodeID);
+      if (datanode == null || !datanode.isAlive) {
+        throw new IOException(
+            "processCacheReport from dead or unregistered datanode: " + datanode);
+      }
+      processCacheReportImpl(datanode, report);
+    } finally {
+      endTime = Time.monotonicNow();
+      namesystem.writeUnlock();
+    }
+
+    // Log the block report processing stats from Namenode perspective
+    final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
+    if (metrics != null) {
+      metrics.addCacheBlockReport((int) (endTime - startTime));
+    }
+    LOG.info("Processed cache report from "
+        + datanodeID + ", blocks: " + report.getNumberOfBlocks()
+        + ", processing time: " + (endTime - startTime) + " msecs");
+  }
+
+  private void processCacheReportImpl(final DatanodeDescriptor datanode,
+      final BlockListAsLongs report) {
+    CachedBlocksList cached = datanode.getCached();
+    cached.clear();
+    BlockReportIterator itBR = report.getBlockReportIterator();
+    while (itBR.hasNext()) {
+      Block block = itBR.next();
+      ReplicaState iState = itBR.getCurrentReplicaState();
+      if (iState != ReplicaState.FINALIZED) {
+        LOG.error("Cached block report contained unfinalized block " + block);
+        continue;
+      }
+      BlockInfo blockInfo = blockManager.getStoredBlock(block);
+      if (blockInfo.getGenerationStamp() < block.getGenerationStamp()) {
+        // The NameNode will eventually remove or update the out-of-date block.
+        // Until then, we pretend that it isn't cached.
+        LOG.warn("Genstamp in cache report disagrees with our genstamp for " +
+          block + ": expected genstamp " + blockInfo.getGenerationStamp());
+        continue;
+      }
+      Collection<DatanodeDescriptor> corruptReplicas =
+          blockManager.getCorruptReplicas(blockInfo);
+      if ((corruptReplicas != null) && corruptReplicas.contains(datanode)) {
+        // The NameNode will eventually remove or update the corrupt block.
+        // Until then, we pretend that it isn't cached.
+        LOG.warn("Ignoring cached replica on " + datanode + " of " + block +
+            " because it is corrupt.");
+        continue;
+      }
+      CachedBlock cachedBlock =
+          new CachedBlock(block.getBlockId(), (short)0, false);
+      CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock);
+      // Use the existing CachedBlock if it's present; otherwise,
+      // insert a new one.
+      if (prevCachedBlock != null) {
+        cachedBlock = prevCachedBlock;
+      } else {
+        cachedBlocks.put(cachedBlock);
+      }
+      if (!cachedBlock.isPresent(datanode.getCached())) {
+        datanode.getCached().add(cachedBlock);
+      }
+      if (cachedBlock.isPresent(datanode.getPendingCached())) {
+        datanode.getPendingCached().remove(cachedBlock);
+      }
+    }
+  }
 
   /**
    * Saves the current state of the CacheManager to the DataOutput. Used
@@ -508,7 +648,7 @@ public final class CacheManager {
    * @param sdPath path of the storage directory
    * @throws IOException
    */
-  public synchronized void saveState(DataOutput out, String sdPath)
+  public void saveState(DataOutput out, String sdPath)
       throws IOException {
     out.writeLong(nextEntryId);
     savePools(out, sdPath);
@@ -521,7 +661,8 @@ public final class CacheManager {
    * @param in DataInput from which to restore state
    * @throws IOException
    */
-  public synchronized void loadState(DataInput in) throws IOException {
+  public void loadState(DataInput in) throws IOException {
+    assert namesystem.hasWriteLock();
     nextEntryId = in.readLong();
     // pools need to be loaded first since entries point to their parent pool
     loadPools(in);
@@ -531,7 +672,7 @@ public final class CacheManager {
   /**
    * Save cache pools to fsimage
    */
-  private synchronized void savePools(DataOutput out,
+  private void savePools(DataOutput out,
       String sdPath) throws IOException {
     StartupProgress prog = NameNode.getStartupProgress();
     Step step = new Step(StepType.CACHE_POOLS, sdPath);
@@ -549,7 +690,7 @@ public final class CacheManager {
   /*
    * Save cache entries to fsimage
    */
-  private synchronized void saveEntries(DataOutput out, String sdPath)
+  private void saveEntries(DataOutput out, String sdPath)
       throws IOException {
     StartupProgress prog = NameNode.getStartupProgress();
     Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
@@ -560,6 +701,7 @@ public final class CacheManager {
     for (PathBasedCacheEntry entry: entriesById.values()) {
       out.writeLong(entry.getEntryId());
       Text.writeString(out, entry.getPath());
+      out.writeShort(entry.getReplication());
       Text.writeString(out, entry.getPool().getPoolName());
       counter.increment();
     }
@@ -569,7 +711,7 @@ public final class CacheManager {
   /**
    * Load cache pools from fsimage
    */
-  private synchronized void loadPools(DataInput in)
+  private void loadPools(DataInput in)
       throws IOException {
     StartupProgress prog = NameNode.getStartupProgress();
     Step step = new Step(StepType.CACHE_POOLS);
@@ -578,8 +720,7 @@ public final class CacheManager {
     prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools);
     Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
     for (int i = 0; i < numberOfPools; i++) {
-      CachePoolInfo info = CachePoolInfo.readFrom(in);
-      unprotectedAddCachePool(info);
+      addCachePool(CachePoolInfo.readFrom(in));
       counter.increment();
     }
     prog.endStep(Phase.LOADING_FSIMAGE, step);
@@ -588,7 +729,7 @@ public final class CacheManager {
   /**
    * Load cache entries from the fsimage
    */
-  private synchronized void loadEntries(DataInput in) throws IOException {
+  private void loadEntries(DataInput in) throws IOException {
     StartupProgress prog = NameNode.getStartupProgress();
     Step step = new Step(StepType.CACHE_ENTRIES);
     prog.beginStep(Phase.LOADING_FSIMAGE, step);
@@ -602,12 +743,24 @@ public final class CacheManager {
       String poolName = Text.readString(in);
       // Get pool reference by looking it up in the map
       CachePool pool = cachePools.get(poolName);
+      if (pool != null) {
+        throw new IOException("Entry refers to pool " + poolName +
+            ", which does not exist.");
+      }
       PathBasedCacheEntry entry =
-        new PathBasedCacheEntry(entryId, path, replication, pool);
-      unprotectedAddEntry(entry);
+          new PathBasedCacheEntry(entryId, path, replication, pool);
+      if (entriesById.put(entry.getEntryId(), entry) != null) {
+        throw new IOException("An entry with ID " + entry.getEntryId() +
+            " already exists");
+      }
+      List<PathBasedCacheEntry> entries = entriesByPath.get(entry.getPath());
+      if (entries == null) {
+        entries = new LinkedList<PathBasedCacheEntry>();
+        entriesByPath.put(entry.getPath(), entries);
+      }
+      entries.add(entry);
       counter.increment();
     }
     prog.endStep(Phase.LOADING_FSIMAGE, step);
   }
-
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed Oct 16 22:15:33 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
@@ -1093,52 +1094,6 @@ public class FSDirectory implements Clos
   }
 
   /**
-   * Set cache replication for a file
-   * 
-   * @param src file name
-   * @param replication new replication
-   * @param blockRepls block replications - output parameter
-   * @return array of file blocks
-   * @throws QuotaExceededException
-   * @throws SnapshotAccessControlException
-   */
-  Block[] setCacheReplication(String src, short replication, short[] blockRepls)
-      throws QuotaExceededException, UnresolvedLinkException,
-      SnapshotAccessControlException {
-    waitForReady();
-    writeLock();
-    try {
-      return unprotectedSetCacheReplication(src, replication, blockRepls);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  Block[] unprotectedSetCacheReplication(String src, short replication,
-      short[] blockRepls) throws QuotaExceededException,
-      UnresolvedLinkException, SnapshotAccessControlException {
-    assert hasWriteLock();
-
-    final INodesInPath iip = rootDir.getINodesInPath4Write(src, true);
-    final INode inode = iip.getLastINode();
-    if (inode == null || !inode.isFile()) {
-      return null;
-    }
-    INodeFile file = inode.asFile();
-    final short oldBR = file.getCacheReplication();
-
-    // TODO: Update quotas here as repl goes up or down
-    file.setCacheReplication(replication);
-    final short newBR = file.getCacheReplication();
-
-    if (blockRepls != null) {
-      blockRepls[0] = oldBR;
-      blockRepls[1] = newBR;
-    }
-    return file.getBlocks();
-  }
-
-  /**
    * @param path the file path
    * @return the block size of the file. 
    */
@@ -2638,12 +2593,19 @@ public class FSDirectory implements Clos
     int childrenNum = node.isDirectory() ? 
         node.asDirectory().getChildrenNum(snapshot) : 0;
         
-    return new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
-        blocksize, node.getModificationTime(snapshot),
-        node.getAccessTime(snapshot), node.getFsPermission(snapshot),
-        node.getUserName(snapshot), node.getGroupName(snapshot),
-        node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
-        node.getId(), loc, childrenNum);
+    HdfsLocatedFileStatus status =
+        new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
+          blocksize, node.getModificationTime(snapshot),
+          node.getAccessTime(snapshot), node.getFsPermission(snapshot),
+          node.getUserName(snapshot), node.getGroupName(snapshot),
+          node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
+          node.getId(), loc, childrenNum);
+        // Set caching information for the located blocks.
+    CacheManager cacheManager = namesystem.getCacheManager();
+    for (LocatedBlock lb: loc.getLocatedBlocks()) {
+      cacheManager.setCachedLocations(lb);
+    }
+    return status;
   }
 
     

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Wed Oct 16 22:15:33 2013
@@ -648,8 +648,7 @@ public class FSEditLogLoader {
           setPool(addOp.pool).
           build();
       PathBasedCacheDescriptor descriptor =
-          fsNamesys.getCacheManager().unprotectedAddDirective(d);
-
+          fsNamesys.getCacheManager().addDirective(d, null);
       if (toAddRetryCache) {
         fsNamesys.addCacheEntryWithPayload(op.rpcClientId, op.rpcCallId,
             descriptor);
@@ -659,8 +658,7 @@ public class FSEditLogLoader {
     case OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR: {
       RemovePathBasedCacheDescriptorOp removeOp =
           (RemovePathBasedCacheDescriptorOp) op;
-      fsNamesys.getCacheManager().unprotectedRemoveDescriptor(removeOp.id);
-
+      fsNamesys.getCacheManager().removeDescriptor(removeOp.id, null);
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
       }
@@ -668,8 +666,7 @@ public class FSEditLogLoader {
     }
     case OP_ADD_CACHE_POOL: {
       AddCachePoolOp addOp = (AddCachePoolOp) op;
-      fsNamesys.getCacheManager().unprotectedAddCachePool(addOp.info);
-
+      fsNamesys.getCacheManager().addCachePool(addOp.info);
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
       }
@@ -678,7 +675,6 @@ public class FSEditLogLoader {
     case OP_MODIFY_CACHE_POOL: {
       ModifyCachePoolOp modifyOp = (ModifyCachePoolOp) op;
       fsNamesys.getCacheManager().modifyCachePool(modifyOp.info);
-
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
       }
@@ -687,7 +683,6 @@ public class FSEditLogLoader {
     case OP_REMOVE_CACHE_POOL: {
       RemoveCachePoolOp removeOp = (RemoveCachePoolOp) op;
       fsNamesys.getCacheManager().removeCachePool(removeOp.poolName);
-
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
       }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Oct 16 22:15:33 2013
@@ -374,7 +374,6 @@ public class FSNamesystem implements Nam
   private final BlockManager blockManager;
   private final SnapshotManager snapshotManager;
   private final CacheManager cacheManager;
-  private final CacheReplicationManager cacheReplicationManager;
   private final DatanodeStatistics datanodeStatistics;
 
   // Block pool ID used by this namenode
@@ -702,9 +701,12 @@ public class FSNamesystem implements Nam
       this.dtSecretManager = createDelegationTokenSecretManager(conf);
       this.dir = new FSDirectory(fsImage, this, conf);
       this.snapshotManager = new SnapshotManager(dir);
-      this.cacheManager = new CacheManager(this, dir, conf);
-      this.cacheReplicationManager = new CacheReplicationManager(this,
-          blockManager, blockManager.getDatanodeManager(), this, conf);
+      writeLock();
+      try {
+        this.cacheManager = new CacheManager(this, conf, blockManager);
+      } finally {
+        writeUnlock();
+      }
       this.safeMode = new SafeModeInfo(conf);
       this.auditLoggers = initAuditLoggers(conf);
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
@@ -881,7 +883,6 @@ public class FSNamesystem implements Nam
         getCompleteBlocksTotal());
       setBlockTotal();
       blockManager.activate(conf);
-      cacheReplicationManager.activate();
     } finally {
       writeUnlock();
     }
@@ -898,7 +899,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       if (blockManager != null) blockManager.close();
-      if (cacheReplicationManager != null) cacheReplicationManager.close();
+      cacheManager.deactivate();
     } finally {
       writeUnlock();
     }
@@ -930,8 +931,6 @@ public class FSNamesystem implements Nam
         blockManager.clearQueues();
         blockManager.processAllPendingDNMessages();
 
-        cacheReplicationManager.clearQueues();
-
         if (!isInSafeMode() ||
             (isInSafeMode() && safeMode.isPopulatingReplQueues())) {
           LOG.info("Reprocessing replication and invalidation queues");
@@ -964,6 +963,8 @@ public class FSNamesystem implements Nam
       //ResourceMonitor required only at ActiveNN. See HDFS-2914
       this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
       nnrmthread.start();
+      cacheManager.activate();
+      blockManager.getDatanodeManager().setSendCachingCommands(true);
     } finally {
       writeUnlock();
     }
@@ -998,6 +999,8 @@ public class FSNamesystem implements Nam
         // so that the tailer starts from the right spot.
         dir.fsImage.updateLastAppliedTxIdFromWritten();
       }
+      cacheManager.deactivate();
+      blockManager.getDatanodeManager().setSendCachingCommands(false);
     } finally {
       writeUnlock();
     }
@@ -1442,10 +1445,6 @@ public class FSNamesystem implements Nam
         blockManager.getDatanodeManager().sortLocatedBlocks(
                               clientMachine, lastBlockList);
       }
-      // Set caching information for the block list
-      for (LocatedBlock lb: blocks.getLocatedBlocks()) {
-        cacheReplicationManager.setCachedLocations(lb);
-      }
     }
     return blocks;
   }
@@ -1553,8 +1552,14 @@ public class FSNamesystem implements Nam
           length = Math.min(length, fileSize - offset);
           isUc = false;
         }
-        return blockManager.createLocatedBlocks(inode.getBlocks(), fileSize,
+        LocatedBlocks blocks =
+          blockManager.createLocatedBlocks(inode.getBlocks(), fileSize,
             isUc, offset, length, needBlockToken, iip.isSnapshot());
+        // Set caching information for the located blocks.
+        for (LocatedBlock lb: blocks.getLocatedBlocks()) {
+          cacheManager.setCachedLocations(lb);
+        }
+        return blocks;
       } finally {
         if (isReadOp) {
           readUnlock();
@@ -1928,42 +1933,6 @@ public class FSNamesystem implements Nam
     return isFile;
   }
 
-  boolean setCacheReplicationInt(String src, final short replication)
-      throws IOException {
-    final boolean isFile;
-    FSPermissionChecker pc = getPermissionChecker();
-    checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    writeLock();
-    try {
-      checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot set replication for " + src, safeMode);
-      }
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
-      if (isPermissionEnabled) {
-        checkPathAccess(pc, src, FsAction.WRITE);
-      }
-
-      final short[] blockRepls = new short[2]; // 0: old, 1: new
-      final Block[] blocks = dir.setCacheReplication(src, replication,
-          blockRepls);
-      isFile = (blocks != null);
-      if (isFile) {
-        cacheReplicationManager.setCacheReplication(blockRepls[0],
-            blockRepls[1], src, blocks);
-      }
-    } finally {
-      writeUnlock();
-    }
-
-    getEditLog().logSync();
-    if (isFile) {
-      logAuditEvent(true, "setCacheReplication", src);
-    }
-    return isFile;
-  }
-
   long getPreferredBlockSize(String filename) 
       throws IOException, UnresolvedLinkException {
     FSPermissionChecker pc = getPermissionChecker();
@@ -6506,10 +6475,6 @@ public class FSNamesystem implements Nam
   public CacheManager getCacheManager() {
     return cacheManager;
   }
-  /** @return the cache replication manager. */
-  public CacheReplicationManager getCacheReplicationManager() {
-    return cacheReplicationManager;
-  }
 
   @Override  // NameNodeMXBean
   public String getCorruptFiles() {

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Wed Oct 16 22:15:33 2013
@@ -104,8 +104,6 @@ public class INodeFile extends INodeWith
 
   private BlockInfo[] blocks;
 
-  private short cacheReplication = 0;
-
   INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime, long atime,
       BlockInfo[] blklist, short replication, long preferredBlockSize) {
     super(id, name, permissions, mtime, atime);
@@ -201,18 +199,6 @@ public class INodeFile extends INodeWith
     return nodeToUpdate;
   }
 
-  @Override
-  public void setCacheReplication(short cacheReplication) {
-    Preconditions.checkArgument(cacheReplication <= getBlockReplication(),
-        "Cannot set cache replication higher than block replication factor");
-    this.cacheReplication = cacheReplication;
-  }
-
-  @Override
-  public short getCacheReplication() {
-    return cacheReplication;
-  }
-
   /** @return preferred block size (in bytes) of the file. */
   @Override
   public long getPreferredBlockSize() {

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Oct 16 22:15:33 2013
@@ -689,8 +689,13 @@ public class NameNode implements NameNod
     try {
       initializeGenericKeys(conf, nsId, namenodeId);
       initialize(conf);
-      state.prepareToEnterState(haContext);
-      state.enterState(haContext);
+      try {
+        haContext.writeLock();
+        state.prepareToEnterState(haContext);
+        state.enterState(haContext);
+      } finally {
+        haContext.writeUnlock();
+      }
     } catch (IOException e) {
       this.stop();
       throw e;

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Wed Oct 16 22:15:33 2013
@@ -997,9 +997,7 @@ class NameNodeRpcServer implements Namen
            + "from " + nodeReg + " " + blist.getNumberOfBlocks()
            + " blocks");
     }
-
-    namesystem.getCacheReplicationManager()
-        .processCacheReport(nodeReg, poolId, blist);
+    namesystem.getCacheManager().processCacheReport(nodeReg, blist);
     return null;
   }
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Wed Oct 16 22:15:33 2013
@@ -1471,6 +1471,18 @@
 </property>
 
 <property>
+  <name>dfs.namenode.path.based.cache.refresh.interval.ms</name>
+  <value>300000</value>
+  <description>
+    The amount of milliseconds between subsequent path cache rescans.  Path
+    cache rescans are when we calculate which blocks should be cached, and on
+    what datanodes.
+
+    By default, this parameter is set to 300000, which is five minutes.
+  </description>
+</property>
+
+<property>
   <name>dfs.datanode.fsdatasetcache.max.threads.per.volume</name>
   <value>4</value>
   <description>

Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCachedBlocksList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCachedBlocksList.java?rev=1532924&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCachedBlocksList.java (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCachedBlocksList.java Wed Oct 16 22:15:33 2013
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCachedBlocksList {
+  public static final Log LOG = LogFactory.getLog(TestCachedBlocksList.class);
+
+  @Test(timeout=60000)
+  public void testSingleList() {
+    DatanodeDescriptor dn = new DatanodeDescriptor(
+      new DatanodeID("127.0.0.1", "localhost", "abcd", 5000, 5001, 5002));
+    CachedBlock[] blocks = new CachedBlock[] {
+          new CachedBlock(0L, (short)1, true),
+          new CachedBlock(1L, (short)1, true),
+          new CachedBlock(2L, (short)1, true),
+      };
+    // check that lists are empty
+    Assert.assertTrue("expected pending cached list to start off empty.", 
+        !dn.getPendingCached().iterator().hasNext());
+    Assert.assertTrue("expected cached list to start off empty.", 
+        !dn.getCached().iterator().hasNext());
+    Assert.assertTrue("expected pending uncached list to start off empty.", 
+        !dn.getPendingUncached().iterator().hasNext());
+    // add a block to the back
+    Assert.assertTrue(dn.getCached().add(blocks[0]));
+    Assert.assertTrue("expected pending cached list to still be empty.", 
+        !dn.getPendingCached().iterator().hasNext());
+    Assert.assertEquals("failed to insert blocks[0]", blocks[0],
+        dn.getCached().iterator().next());
+    Assert.assertTrue("expected pending uncached list to still be empty.", 
+        !dn.getPendingUncached().iterator().hasNext());
+    // add another block to the back
+    Assert.assertTrue(dn.getCached().add(blocks[1]));
+    Iterator<CachedBlock> iter = dn.getCached().iterator();
+    Assert.assertEquals(blocks[0], iter.next());
+    Assert.assertEquals(blocks[1], iter.next());
+    Assert.assertTrue(!iter.hasNext());
+    // add a block to the front
+    Assert.assertTrue(dn.getCached().addFirst(blocks[2]));
+    iter = dn.getCached().iterator();
+    Assert.assertEquals(blocks[2], iter.next());
+    Assert.assertEquals(blocks[0], iter.next());
+    Assert.assertEquals(blocks[1], iter.next());
+    Assert.assertTrue(!iter.hasNext());
+    // remove a block from the middle
+    Assert.assertTrue(dn.getCached().remove(blocks[0]));
+    iter = dn.getCached().iterator();
+    Assert.assertEquals(blocks[2], iter.next());
+    Assert.assertEquals(blocks[1], iter.next());
+    Assert.assertTrue(!iter.hasNext());
+    // remove all blocks
+    dn.getCached().clear();
+    Assert.assertTrue("expected cached list to be empty after clear.", 
+        !dn.getPendingCached().iterator().hasNext());
+  }
+
+  private void testAddElementsToList(CachedBlocksList list,
+      CachedBlock[] blocks) {
+    Assert.assertTrue("expected list to start off empty.", 
+        !list.iterator().hasNext());
+    for (CachedBlock block : blocks) {
+      Assert.assertTrue(list.add(block));
+    }
+  }
+
+  private void testRemoveElementsFromList(Random r,
+      CachedBlocksList list, CachedBlock[] blocks) {
+    int i = 0;
+    for (Iterator<CachedBlock> iter = list.iterator(); iter.hasNext(); ) {
+      Assert.assertEquals(blocks[i], iter.next());
+      i++;
+    }
+    if (r.nextBoolean()) {
+      LOG.info("Removing via iterator");
+      for (Iterator<CachedBlock> iter = list.iterator(); iter.hasNext() ;) {
+        iter.next();
+        iter.remove();
+      }
+    } else {
+      LOG.info("Removing in pseudo-random order");
+      CachedBlock[] remainingBlocks = Arrays.copyOf(blocks, blocks.length);
+      for (int removed = 0; removed < remainingBlocks.length; ) {
+        int toRemove = r.nextInt(remainingBlocks.length);
+        if (remainingBlocks[toRemove] != null) {
+          Assert.assertTrue(list.remove(remainingBlocks[toRemove]));
+          remainingBlocks[toRemove] = null;
+          removed++;
+        }
+      }
+    }
+    Assert.assertTrue("expected list to be empty after everything " +
+        "was removed.", !list.iterator().hasNext());
+  }
+
+  @Test(timeout=60000)
+  public void testMultipleLists() {
+    DatanodeDescriptor[] datanodes = new DatanodeDescriptor[] {
+      new DatanodeDescriptor(
+        new DatanodeID("127.0.0.1", "localhost", "abcd", 5000, 5001, 5002)),
+      new DatanodeDescriptor(
+        new DatanodeID("127.0.1.1", "localhost", "efgh", 6000, 6001, 6002)),
+    };
+    CachedBlocksList[] lists = new CachedBlocksList[] {
+        datanodes[0].getPendingCached(),
+        datanodes[0].getCached(),
+        datanodes[1].getPendingCached(),
+        datanodes[1].getCached(),
+        datanodes[1].getPendingUncached(),
+    };
+    final int NUM_BLOCKS = 8000;
+    CachedBlock[] blocks = new CachedBlock[NUM_BLOCKS];
+    for (int i = 0; i < NUM_BLOCKS; i++) {
+      blocks[i] = new CachedBlock(i, (short)i, true);
+    }
+    Random r = new Random(654);
+    for (CachedBlocksList list : lists) {
+      testAddElementsToList(list, blocks);
+    }
+    for (CachedBlocksList list : lists) {
+      testRemoveElementsFromList(r, list, blocks);
+    }
+  }
+}

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java Wed Oct 16 22:15:33 2013
@@ -17,6 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
 import static junit.framework.Assert.assertTrue;
 import static junit.framework.Assert.fail;
 import static org.junit.Assert.assertEquals;
@@ -24,6 +30,10 @@ import static org.junit.Assert.assertFal
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 
 import junit.framework.Assert;
 
@@ -31,6 +41,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -46,13 +57,19 @@ import org.apache.hadoop.hdfs.protocol.P
 import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
 import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException;
 import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.GSet;
 import org.junit.After;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.base.Supplier;
+
 public class TestPathBasedCacheRequests {
   static final Log LOG = LogFactory.getLog(TestPathBasedCacheRequests.class);
 
@@ -83,7 +100,7 @@ public class TestPathBasedCacheRequests 
     }
   }
 
-  @Test
+  @Test(timeout=60000)
   public void testBasicPoolOperations() throws Exception {
     final String poolName = "pool1";
     CachePoolInfo info = new CachePoolInfo(poolName).
@@ -218,7 +235,7 @@ public class TestPathBasedCacheRequests 
     dfs.addCachePool(info);
   }
 
-  @Test
+  @Test(timeout=60000)
   public void testCreateAndModifyPools() throws Exception {
     String poolName = "pool1";
     String ownerName = "abc";
@@ -301,7 +318,7 @@ public class TestPathBasedCacheRequests 
         });
   }
 
-  @Test
+  @Test(timeout=60000)
   public void testAddRemoveDirectives() throws Exception {
     proto.addCachePool(new CachePoolInfo("pool1").
         setMode(new FsPermission((short)0777)));
@@ -366,6 +383,7 @@ public class TestPathBasedCacheRequests 
     try {
       addAsUnprivileged(new PathBasedCacheDirective.Builder().
           setPath(new Path("/emptypoolname")).
+          setReplication((short)1).
           setPool("").
           build());
       Assert.fail("expected an error when adding a PathBasedCache " +
@@ -424,4 +442,302 @@ public class TestPathBasedCacheRequests 
     iter = dfs.listPathBasedCacheDescriptors(null, null);
     assertFalse(iter.hasNext());
   }
+
+  @Test(timeout=60000)
+  public void testCacheManagerRestart() throws Exception {
+    HdfsConfiguration conf = createCachingConf();
+    MiniDFSCluster cluster =
+      new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+
+    try {
+      cluster.waitActive();
+      DistributedFileSystem dfs = cluster.getFileSystem();
+
+      // Create and validate a pool
+      final String pool = "poolparty";
+      String groupName = "partygroup";
+      FsPermission mode = new FsPermission((short)0777);
+      int weight = 747;
+      dfs.addCachePool(new CachePoolInfo(pool)
+          .setGroupName(groupName)
+          .setMode(mode)
+          .setWeight(weight));
+      RemoteIterator<CachePoolInfo> pit = dfs.listCachePools();
+      assertTrue("No cache pools found", pit.hasNext());
+      CachePoolInfo info = pit.next();
+      assertEquals(pool, info.getPoolName());
+      assertEquals(groupName, info.getGroupName());
+      assertEquals(mode, info.getMode());
+      assertEquals(weight, (int)info.getWeight());
+      assertFalse("Unexpected # of cache pools found", pit.hasNext());
+  
+      // Create some cache entries
+      int numEntries = 10;
+      String entryPrefix = "/party-";
+      for (int i=0; i<numEntries; i++) {
+        dfs.addPathBasedCacheDirective(
+            new PathBasedCacheDirective.Builder().
+              setPath(new Path(entryPrefix + i)).setPool(pool).build());
+      }
+      RemoteIterator<PathBasedCacheDescriptor> dit
+          = dfs.listPathBasedCacheDescriptors(null, null);
+      for (int i=0; i<numEntries; i++) {
+        assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
+        PathBasedCacheDescriptor cd = dit.next();
+        assertEquals(i+1, cd.getEntryId());
+        assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
+        assertEquals(pool, cd.getPool());
+      }
+      assertFalse("Unexpected # of cache descriptors found", dit.hasNext());
+  
+      // Restart namenode
+      cluster.restartNameNode();
+  
+      // Check that state came back up
+      pit = dfs.listCachePools();
+      assertTrue("No cache pools found", pit.hasNext());
+      info = pit.next();
+      assertEquals(pool, info.getPoolName());
+      assertEquals(pool, info.getPoolName());
+      assertEquals(groupName, info.getGroupName());
+      assertEquals(mode, info.getMode());
+      assertEquals(weight, (int)info.getWeight());
+      assertFalse("Unexpected # of cache pools found", pit.hasNext());
+  
+      dit = dfs.listPathBasedCacheDescriptors(null, null);
+      for (int i=0; i<numEntries; i++) {
+        assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
+        PathBasedCacheDescriptor cd = dit.next();
+        assertEquals(i+1, cd.getEntryId());
+        assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
+        assertEquals(pool, cd.getPool());
+      }
+      assertFalse("Unexpected # of cache descriptors found", dit.hasNext());
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  private static void waitForCachedBlocks(NameNode nn,
+      final int expectedCachedBlocks, final int expectedCachedReplicas) 
+          throws Exception {
+    final FSNamesystem namesystem = nn.getNamesystem();
+    final CacheManager cacheManager = namesystem.getCacheManager();
+    LOG.info("Waiting for " + expectedCachedBlocks + " blocks with " +
+             expectedCachedReplicas + " replicas.");
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        int numCachedBlocks = 0, numCachedReplicas = 0;
+        namesystem.readLock();
+        try {
+          GSet<CachedBlock, CachedBlock> cachedBlocks =
+              cacheManager.getCachedBlocks();
+          if (cachedBlocks != null) {
+            for (Iterator<CachedBlock> iter = cachedBlocks.iterator();
+                iter.hasNext(); ) {
+              CachedBlock cachedBlock = iter.next();
+              numCachedBlocks++;
+              numCachedReplicas += cachedBlock.getDatanodes(Type.CACHED).size();
+            }
+          }
+        } finally {
+          namesystem.readUnlock();
+        }
+        if ((numCachedBlocks == expectedCachedBlocks) && 
+            (numCachedReplicas == expectedCachedReplicas)) {
+          return true;
+        } else {
+          LOG.info("cached blocks: have " + numCachedBlocks +
+              " / " + expectedCachedBlocks);
+          LOG.info("cached replicas: have " + numCachedReplicas +
+              " / " + expectedCachedReplicas);
+          return false;
+        }
+      }
+    }, 500, 60000);
+  }
+
+  private static final long BLOCK_SIZE = 512;
+  private static final int NUM_DATANODES = 4;
+
+  // Most Linux installs will allow non-root users to lock 64KB.
+  private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
+
+  /**
+   * Return true if we can test DN caching.
+   */
+  private static boolean canTestDatanodeCaching() {
+    if (!NativeIO.isAvailable()) {
+      // Need NativeIO in order to cache blocks on the DN.
+      return false;
+    }
+    if (NativeIO.getMemlockLimit() < CACHE_CAPACITY) {
+      return false;
+    }
+    return true;
+  }
+
+  private static HdfsConfiguration createCachingConf() {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true);
+    conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
+    conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
+    return conf;
+  }
+
+  @Test(timeout=120000)
+  public void testWaitForCachedReplicas() throws Exception {
+    Assume.assumeTrue(canTestDatanodeCaching());
+    HdfsConfiguration conf = createCachingConf();
+    FileSystemTestHelper helper = new FileSystemTestHelper();
+    MiniDFSCluster cluster =
+      new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+    try {
+      cluster.waitActive();
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      NameNode namenode = cluster.getNameNode();
+      NamenodeProtocols nnRpc = namenode.getRpcServer();
+      Path rootDir = helper.getDefaultWorkingDirectory(dfs);
+      // Create the pool
+      final String pool = "friendlyPool";
+      nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
+      // Create some test files
+      final int numFiles = 2;
+      final int numBlocksPerFile = 2;
+      final List<String> paths = new ArrayList<String>(numFiles);
+      for (int i=0; i<numFiles; i++) {
+        Path p = new Path(rootDir, "testCachePaths-" + i);
+        FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
+            (int)BLOCK_SIZE);
+        paths.add(p.toUri().getPath());
+      }
+      // Check the initial statistics at the namenode
+      waitForCachedBlocks(namenode, 0, 0);
+      // Cache and check each path in sequence
+      int expected = 0;
+      for (int i=0; i<numFiles; i++) {
+        PathBasedCacheDirective directive =
+            new PathBasedCacheDirective.Builder().
+              setPath(new Path(paths.get(i))).
+              setPool(pool).
+              build();
+        PathBasedCacheDescriptor descriptor =
+            nnRpc.addPathBasedCacheDirective(directive);
+        assertEquals("Descriptor does not match requested path",
+            new Path(paths.get(i)), descriptor.getPath());
+        assertEquals("Descriptor does not match requested pool", pool,
+            descriptor.getPool());
+        expected += numBlocksPerFile;
+        waitForCachedBlocks(namenode, expected, expected);
+      }
+      // Uncache and check each path in sequence
+      RemoteIterator<PathBasedCacheDescriptor> entries =
+          nnRpc.listPathBasedCacheDescriptors(0, null, null);
+      for (int i=0; i<numFiles; i++) {
+        PathBasedCacheDescriptor descriptor = entries.next();
+        nnRpc.removePathBasedCacheDescriptor(descriptor.getEntryId());
+        expected -= numBlocksPerFile;
+        waitForCachedBlocks(namenode, expected, expected);
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout=120000)
+  public void testAddingPathBasedCacheDirectivesWhenCachingIsDisabled()
+      throws Exception {
+    HdfsConfiguration conf = createCachingConf();
+    conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, false);
+    MiniDFSCluster cluster =
+      new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+    try {
+      cluster.waitActive();
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      NameNode namenode = cluster.getNameNode();
+      // Create the pool
+      String pool = "pool1";
+      namenode.getRpcServer().addCachePool(new CachePoolInfo(pool));
+      // Create some test files
+      final int numFiles = 2;
+      final int numBlocksPerFile = 2;
+      final List<String> paths = new ArrayList<String>(numFiles);
+      for (int i=0; i<numFiles; i++) {
+        Path p = new Path("/testCachePaths-" + i);
+        FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
+            (int)BLOCK_SIZE);
+        paths.add(p.toUri().getPath());
+      }
+      // Check the initial statistics at the namenode
+      waitForCachedBlocks(namenode, 0, 0);
+      // Cache and check each path in sequence
+      int expected = 0;
+      for (int i=0; i<numFiles; i++) {
+        PathBasedCacheDirective directive =
+            new PathBasedCacheDirective.Builder().
+              setPath(new Path(paths.get(i))).
+              setPool(pool).
+              build();
+        dfs.addPathBasedCacheDirective(directive);
+        waitForCachedBlocks(namenode, expected, 0);
+      }
+      Thread.sleep(20000);
+      waitForCachedBlocks(namenode, expected, 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout=120000)
+  public void testWaitForCachedReplicasInDirectory() throws Exception {
+    Assume.assumeTrue(canTestDatanodeCaching());
+    HdfsConfiguration conf = createCachingConf();
+    MiniDFSCluster cluster =
+      new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+    try {
+      cluster.waitActive();
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      NameNode namenode = cluster.getNameNode();
+      // Create the pool
+      final String pool = "friendlyPool";
+      dfs.addCachePool(new CachePoolInfo(pool));
+      // Create some test files
+      final List<Path> paths = new LinkedList<Path>();
+      paths.add(new Path("/foo/bar"));
+      paths.add(new Path("/foo/baz"));
+      paths.add(new Path("/foo2/bar2"));
+      paths.add(new Path("/foo2/baz2"));
+      dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
+      dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
+      final int numBlocksPerFile = 2;
+      for (Path path : paths) {
+        FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
+            (int)BLOCK_SIZE, (short)3, false);
+      }
+      waitForCachedBlocks(namenode, 0, 0);
+      // cache entire directory
+      PathBasedCacheDescriptor descriptor = dfs.addPathBasedCacheDirective(
+            new PathBasedCacheDirective.Builder().
+              setPath(new Path("/foo")).
+              setReplication((short)2).
+              setPool(pool).
+              build());
+      assertEquals("Descriptor does not match requested pool", pool,
+          descriptor.getPool());
+      waitForCachedBlocks(namenode, 4, 8);
+      // remove and watch numCached go to 0
+      dfs.removePathBasedCacheDescriptor(descriptor);
+      waitForCachedBlocks(namenode, 0, 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
 }