You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/11/21 18:12:59 UTC

svn commit: r1544252 [2/3] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/h...

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Nov 21 17:12:58 2013
@@ -38,15 +38,15 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddPathBasedCacheDirectiveOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyPathBasedCacheDirectiveOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCacheDirectiveInfoOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
@@ -63,7 +63,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemovePathBasedCacheDirectiveOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCacheDirectiveInfoOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
@@ -954,27 +954,27 @@ public class FSEditLog implements LogsPu
     logEdit(op);
   }
   
-  void logAddPathBasedCacheDirective(PathBasedCacheDirective directive,
+  void logAddCacheDirectiveInfo(CacheDirectiveInfo directive,
       boolean toLogRpcIds) {
-    AddPathBasedCacheDirectiveOp op =
-        AddPathBasedCacheDirectiveOp.getInstance(cache.get())
+    AddCacheDirectiveInfoOp op =
+        AddCacheDirectiveInfoOp.getInstance(cache.get())
             .setDirective(directive);
     logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
 
-  void logModifyPathBasedCacheDirective(
-      PathBasedCacheDirective directive, boolean toLogRpcIds) {
-    ModifyPathBasedCacheDirectiveOp op =
-        ModifyPathBasedCacheDirectiveOp.getInstance(
+  void logModifyCacheDirectiveInfo(
+      CacheDirectiveInfo directive, boolean toLogRpcIds) {
+    ModifyCacheDirectiveInfoOp op =
+        ModifyCacheDirectiveInfoOp.getInstance(
             cache.get()).setDirective(directive);
     logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
 
-  void logRemovePathBasedCacheDirective(Long id, boolean toLogRpcIds) {
-    RemovePathBasedCacheDirectiveOp op =
-        RemovePathBasedCacheDirectiveOp.getInstance(cache.get()).setId(id);
+  void logRemoveCacheDirectiveInfo(Long id, boolean toLogRpcIds) {
+    RemoveCacheDirectiveInfoOp op =
+        RemoveCacheDirectiveInfoOp.getInstance(cache.get()).setId(id);
     logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Thu Nov 21 17:12:58 2013
@@ -36,13 +36,13 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddPathBasedCacheDirectiveOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
@@ -56,10 +56,10 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyPathBasedCacheDirectiveOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCacheDirectiveInfoOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemovePathBasedCacheDirectiveOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCacheDirectiveInfoOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
@@ -639,8 +639,8 @@ public class FSEditLogLoader {
       break;
     }
     case OP_ADD_PATH_BASED_CACHE_DIRECTIVE: {
-      AddPathBasedCacheDirectiveOp addOp = (AddPathBasedCacheDirectiveOp) op;
-      PathBasedCacheDirective result = fsNamesys.
+      AddCacheDirectiveInfoOp addOp = (AddCacheDirectiveInfoOp) op;
+      CacheDirectiveInfo result = fsNamesys.
           getCacheManager().addDirective(addOp.directive, null);
       if (toAddRetryCache) {
         Long id = result.getId();
@@ -649,8 +649,8 @@ public class FSEditLogLoader {
       break;
     }
     case OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE: {
-      ModifyPathBasedCacheDirectiveOp modifyOp =
-          (ModifyPathBasedCacheDirectiveOp) op;
+      ModifyCacheDirectiveInfoOp modifyOp =
+          (ModifyCacheDirectiveInfoOp) op;
       fsNamesys.getCacheManager().modifyDirective(
           modifyOp.directive, null);
       if (toAddRetryCache) {
@@ -659,8 +659,8 @@ public class FSEditLogLoader {
       break;
     }
     case OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE: {
-      RemovePathBasedCacheDirectiveOp removeOp =
-          (RemovePathBasedCacheDirectiveOp) op;
+      RemoveCacheDirectiveInfoOp removeOp =
+          (RemoveCacheDirectiveInfoOp) op;
       fsNamesys.getCacheManager().removeDirective(removeOp.id, null);
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Thu Nov 21 17:12:58 2013
@@ -86,7 +86,7 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.util.XMLUtils;
 import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
@@ -166,11 +166,11 @@ public abstract class FSEditLogOp {
       inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op());
       inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp());
       inst.put(OP_ADD_PATH_BASED_CACHE_DIRECTIVE,
-          new AddPathBasedCacheDirectiveOp());
+          new AddCacheDirectiveInfoOp());
       inst.put(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE,
-          new ModifyPathBasedCacheDirectiveOp());
+          new ModifyCacheDirectiveInfoOp());
       inst.put(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE,
-          new RemovePathBasedCacheDirectiveOp());
+          new RemoveCacheDirectiveInfoOp());
       inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp());
       inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp());
       inst.put(OP_REMOVE_CACHE_POOL, new RemoveCachePoolOp());
@@ -2868,22 +2868,22 @@ public abstract class FSEditLogOp {
 
   /**
    * {@literal @AtMostOnce} for
-   * {@link ClientProtocol#addPathBasedCacheDirective}
+   * {@link ClientProtocol#addCacheDirective}
    */
-  static class AddPathBasedCacheDirectiveOp extends FSEditLogOp {
-    PathBasedCacheDirective directive;
+  static class AddCacheDirectiveInfoOp extends FSEditLogOp {
+    CacheDirectiveInfo directive;
 
-    public AddPathBasedCacheDirectiveOp() {
+    public AddCacheDirectiveInfoOp() {
       super(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
     }
 
-    static AddPathBasedCacheDirectiveOp getInstance(OpInstanceCache cache) {
-      return (AddPathBasedCacheDirectiveOp) cache
+    static AddCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
+      return (AddCacheDirectiveInfoOp) cache
           .get(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
     }
 
-    public AddPathBasedCacheDirectiveOp setDirective(
-        PathBasedCacheDirective directive) {
+    public AddCacheDirectiveInfoOp setDirective(
+        CacheDirectiveInfo directive) {
       this.directive = directive;
       assert(directive.getId() != null);
       assert(directive.getPath() != null);
@@ -2898,7 +2898,7 @@ public abstract class FSEditLogOp {
       String path = FSImageSerialization.readString(in);
       short replication = FSImageSerialization.readShort(in);
       String pool = FSImageSerialization.readString(in);
-      directive = new PathBasedCacheDirective.Builder().
+      directive = new CacheDirectiveInfo.Builder().
           setId(id).
           setPath(new Path(path)).
           setReplication(replication).
@@ -2930,7 +2930,7 @@ public abstract class FSEditLogOp {
 
     @Override
     void fromXml(Stanza st) throws InvalidXmlException {
-      directive = new PathBasedCacheDirective.Builder().
+      directive = new CacheDirectiveInfo.Builder().
           setId(Long.parseLong(st.getValue("ID"))).
           setPath(new Path(st.getValue("PATH"))).
           setReplication(Short.parseShort(st.getValue("REPLICATION"))).
@@ -2942,7 +2942,7 @@ public abstract class FSEditLogOp {
     @Override
     public String toString() {
       StringBuilder builder = new StringBuilder();
-      builder.append("AddPathBasedCacheDirective [");
+      builder.append("AddCacheDirectiveInfo [");
       builder.append("id=" + directive.getId() + ",");
       builder.append("path=" + directive.getPath().toUri().getPath() + ",");
       builder.append("replication=" + directive.getReplication() + ",");
@@ -2955,22 +2955,22 @@ public abstract class FSEditLogOp {
 
   /**
    * {@literal @AtMostOnce} for
-   * {@link ClientProtocol#modifyPathBasedCacheDirective}
+   * {@link ClientProtocol#modifyCacheDirective}
    */
-  static class ModifyPathBasedCacheDirectiveOp extends FSEditLogOp {
-    PathBasedCacheDirective directive;
+  static class ModifyCacheDirectiveInfoOp extends FSEditLogOp {
+    CacheDirectiveInfo directive;
 
-    public ModifyPathBasedCacheDirectiveOp() {
+    public ModifyCacheDirectiveInfoOp() {
       super(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE);
     }
 
-    static ModifyPathBasedCacheDirectiveOp getInstance(OpInstanceCache cache) {
-      return (ModifyPathBasedCacheDirectiveOp) cache
+    static ModifyCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
+      return (ModifyCacheDirectiveInfoOp) cache
           .get(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE);
     }
 
-    public ModifyPathBasedCacheDirectiveOp setDirective(
-        PathBasedCacheDirective directive) {
+    public ModifyCacheDirectiveInfoOp setDirective(
+        CacheDirectiveInfo directive) {
       this.directive = directive;
       assert(directive.getId() != null);
       return this;
@@ -2978,8 +2978,8 @@ public abstract class FSEditLogOp {
 
     @Override
     void readFields(DataInputStream in, int logVersion) throws IOException {
-      PathBasedCacheDirective.Builder builder =
-          new PathBasedCacheDirective.Builder();
+      CacheDirectiveInfo.Builder builder =
+          new CacheDirectiveInfo.Builder();
       builder.setId(FSImageSerialization.readLong(in));
       byte flags = in.readByte();
       if ((flags & 0x1) != 0) {
@@ -2993,7 +2993,7 @@ public abstract class FSEditLogOp {
       }
       if ((flags & ~0x7) != 0) {
         throw new IOException("unknown flags set in " +
-            "ModifyPathBasedCacheDirectiveOp: " + flags);
+            "ModifyCacheDirectiveInfoOp: " + flags);
       }
       this.directive = builder.build();
       readRpcIds(in, logVersion);
@@ -3041,8 +3041,8 @@ public abstract class FSEditLogOp {
 
     @Override
     void fromXml(Stanza st) throws InvalidXmlException {
-      PathBasedCacheDirective.Builder builder =
-          new PathBasedCacheDirective.Builder();
+      CacheDirectiveInfo.Builder builder =
+          new CacheDirectiveInfo.Builder();
       builder.setId(Long.parseLong(st.getValue("ID")));
       String path = st.getValueOrNull("PATH");
       if (path != null) {
@@ -3063,7 +3063,7 @@ public abstract class FSEditLogOp {
     @Override
     public String toString() {
       StringBuilder builder = new StringBuilder();
-      builder.append("ModifyPathBasedCacheDirectiveOp[");
+      builder.append("ModifyCacheDirectiveInfoOp[");
       builder.append("id=").append(directive.getId());
       if (directive.getPath() != null) {
         builder.append(",").append("path=").append(directive.getPath());
@@ -3083,21 +3083,21 @@ public abstract class FSEditLogOp {
 
   /**
    * {@literal @AtMostOnce} for
-   * {@link ClientProtocol#removePathBasedCacheDirective}
+   * {@link ClientProtocol#removeCacheDirective}
    */
-  static class RemovePathBasedCacheDirectiveOp extends FSEditLogOp {
+  static class RemoveCacheDirectiveInfoOp extends FSEditLogOp {
     long id;
 
-    public RemovePathBasedCacheDirectiveOp() {
+    public RemoveCacheDirectiveInfoOp() {
       super(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE);
     }
 
-    static RemovePathBasedCacheDirectiveOp getInstance(OpInstanceCache cache) {
-      return (RemovePathBasedCacheDirectiveOp) cache
+    static RemoveCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
+      return (RemoveCacheDirectiveInfoOp) cache
           .get(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE);
     }
 
-    public RemovePathBasedCacheDirectiveOp setId(long id) {
+    public RemoveCacheDirectiveInfoOp setId(long id) {
       this.id = id;
       return this;
     }
@@ -3129,7 +3129,7 @@ public abstract class FSEditLogOp {
     @Override
     public String toString() {
       StringBuilder builder = new StringBuilder();
-      builder.append("RemovePathBasedCacheDirective [");
+      builder.append("RemoveCacheDirectiveInfo [");
       builder.append("id=" + Long.toString(id));
       appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append("]");

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Nov 21 17:12:58 2013
@@ -151,7 +151,8 @@ import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -7064,8 +7065,8 @@ public class FSNamesystem implements Nam
     }
   }
 
-  long addPathBasedCacheDirective(
-      PathBasedCacheDirective directive) throws IOException {
+  long addCacheDirective(
+      CacheDirectiveInfo directive) throws IOException {
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = isPermissionEnabled ?
         getPermissionChecker() : null;
@@ -7081,15 +7082,15 @@ public class FSNamesystem implements Nam
       checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
         throw new SafeModeException(
-            "Cannot add PathBasedCache directive", safeMode);
+            "Cannot add cache directive", safeMode);
       }
       if (directive.getId() != null) {
         throw new IOException("addDirective: you cannot specify an ID " +
             "for this operation.");
       }
-      PathBasedCacheDirective effectiveDirective = 
+      CacheDirectiveInfo effectiveDirective = 
           cacheManager.addDirective(directive, pc);
-      getEditLog().logAddPathBasedCacheDirective(effectiveDirective,
+      getEditLog().logAddCacheDirectiveInfo(effectiveDirective,
           cacheEntry != null);
       result = effectiveDirective.getId();
       success = true;
@@ -7099,15 +7100,15 @@ public class FSNamesystem implements Nam
         getEditLog().logSync();
       }
       if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(success, "addPathBasedCacheDirective", null, null, null);
+        logAuditEvent(success, "addCacheDirective", null, null, null);
       }
       RetryCache.setState(cacheEntry, success, result);
     }
     return result;
   }
 
-  void modifyPathBasedCacheDirective(
-      PathBasedCacheDirective directive) throws IOException {
+  void modifyCacheDirective(
+      CacheDirectiveInfo directive) throws IOException {
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = isPermissionEnabled ?
         getPermissionChecker() : null;
@@ -7121,10 +7122,10 @@ public class FSNamesystem implements Nam
       checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
         throw new SafeModeException(
-            "Cannot add PathBasedCache directive", safeMode);
+            "Cannot add cache directive", safeMode);
       }
       cacheManager.modifyDirective(directive, pc);
-      getEditLog().logModifyPathBasedCacheDirective(directive,
+      getEditLog().logModifyCacheDirectiveInfo(directive,
           cacheEntry != null);
       success = true;
     } finally {
@@ -7133,13 +7134,13 @@ public class FSNamesystem implements Nam
         getEditLog().logSync();
       }
       if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(success, "addPathBasedCacheDirective", null, null, null);
+        logAuditEvent(success, "addCacheDirective", null, null, null);
       }
       RetryCache.setState(cacheEntry, success);
     }
   }
 
-  void removePathBasedCacheDirective(Long id) throws IOException {
+  void removeCacheDirective(Long id) throws IOException {
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = isPermissionEnabled ?
         getPermissionChecker() : null;
@@ -7153,15 +7154,15 @@ public class FSNamesystem implements Nam
       checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
         throw new SafeModeException(
-            "Cannot remove PathBasedCache directives", safeMode);
+            "Cannot remove cache directives", safeMode);
       }
       cacheManager.removeDirective(id, pc);
-      getEditLog().logRemovePathBasedCacheDirective(id, cacheEntry != null);
+      getEditLog().logRemoveCacheDirectiveInfo(id, cacheEntry != null);
       success = true;
     } finally {
       writeUnlock();
       if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(success, "removePathBasedCacheDirective", null, null,
+        logAuditEvent(success, "removeCacheDirective", null, null,
             null);
       }
       RetryCache.setState(cacheEntry, success);
@@ -7169,23 +7170,23 @@ public class FSNamesystem implements Nam
     getEditLog().logSync();
   }
 
-  BatchedListEntries<PathBasedCacheDirective> listPathBasedCacheDirectives(
-      long startId, PathBasedCacheDirective filter) throws IOException {
+  BatchedListEntries<CacheDirectiveEntry> listCacheDirectives(
+      long startId, CacheDirectiveInfo filter) throws IOException {
     checkOperation(OperationCategory.READ);
     final FSPermissionChecker pc = isPermissionEnabled ?
         getPermissionChecker() : null;
-    BatchedListEntries<PathBasedCacheDirective> results;
+    BatchedListEntries<CacheDirectiveEntry> results;
     readLock();
     boolean success = false;
     try {
       checkOperation(OperationCategory.READ);
       results =
-          cacheManager.listPathBasedCacheDirectives(startId, filter, pc);
+          cacheManager.listCacheDirectives(startId, filter, pc);
       success = true;
     } finally {
       readUnlock();
       if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(success, "listPathBasedCacheDirectives", null, null,
+        logAuditEvent(success, "listCacheDirectives", null, null,
             null);
       }
     }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Thu Nov 21 17:12:58 2013
@@ -61,7 +61,8 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -1233,52 +1234,52 @@ class NameNodeRpcServer implements Namen
   }
 
   @Override
-  public long addPathBasedCacheDirective(
-      PathBasedCacheDirective path) throws IOException {
-    return namesystem.addPathBasedCacheDirective(path);
+  public long addCacheDirective(
+      CacheDirectiveInfo path) throws IOException {
+    return namesystem.addCacheDirective(path);
   }
 
   @Override
-  public void modifyPathBasedCacheDirective(
-      PathBasedCacheDirective directive) throws IOException {
-    namesystem.modifyPathBasedCacheDirective(directive);
+  public void modifyCacheDirective(
+      CacheDirectiveInfo directive) throws IOException {
+    namesystem.modifyCacheDirective(directive);
   }
 
   @Override
-  public void removePathBasedCacheDirective(long id) throws IOException {
-    namesystem.removePathBasedCacheDirective(id);
+  public void removeCacheDirective(long id) throws IOException {
+    namesystem.removeCacheDirective(id);
   }
 
-  private class ServerSidePathBasedCacheEntriesIterator
-      extends BatchedRemoteIterator<Long, PathBasedCacheDirective> {
+  private class ServerSideCacheEntriesIterator 
+      extends BatchedRemoteIterator<Long, CacheDirectiveEntry> {
 
-    private final PathBasedCacheDirective filter;
+    private final CacheDirectiveInfo filter;
     
-    public ServerSidePathBasedCacheEntriesIterator(Long firstKey, 
-        PathBasedCacheDirective filter) {
+    public ServerSideCacheEntriesIterator (Long firstKey, 
+        CacheDirectiveInfo filter) {
       super(firstKey);
       this.filter = filter;
     }
 
     @Override
-    public BatchedEntries<PathBasedCacheDirective> makeRequest(
+    public BatchedEntries<CacheDirectiveEntry> makeRequest(
         Long nextKey) throws IOException {
-      return namesystem.listPathBasedCacheDirectives(nextKey, filter);
+      return namesystem.listCacheDirectives(nextKey, filter);
     }
 
     @Override
-    public Long elementToPrevKey(PathBasedCacheDirective entry) {
-      return entry.getId();
+    public Long elementToPrevKey(CacheDirectiveEntry entry) {
+      return entry.getInfo().getId();
     }
   }
   
   @Override
-  public RemoteIterator<PathBasedCacheDirective> listPathBasedCacheDirectives(long prevId,
-      PathBasedCacheDirective filter) throws IOException {
+  public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(long prevId,
+      CacheDirectiveInfo filter) throws IOException {
     if (filter == null) {
-      filter = new PathBasedCacheDirective.Builder().build();
+      filter = new CacheDirectiveInfo.Builder().build();
     }
-    return new ServerSidePathBasedCacheEntriesIterator(prevId, filter);
+    return new ServerSideCacheEntriesIterator(prevId, filter);
   }
 
   @Override

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java Thu Nov 21 17:12:58 2013
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.tools;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -31,8 +30,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.server.namenode.CachePool;
 import org.apache.hadoop.hdfs.tools.TableListing.Justification;
 import org.apache.hadoop.ipc.RemoteException;
@@ -121,7 +122,7 @@ public class CacheAdmin extends Configur
     int run(Configuration conf, List<String> args) throws IOException;
   }
 
-  private static class AddPathBasedCacheDirectiveCommand implements Command {
+  private static class AddCacheDirectiveInfoCommand implements Command {
     @Override
     public String getName() {
       return "-addDirective";
@@ -144,7 +145,7 @@ public class CacheAdmin extends Configur
           "added. You must have write permission on the cache pool "
           + "in order to add new directives.");
       return getShortUsage() + "\n" +
-        "Add a new PathBasedCache directive.\n\n" +
+        "Add a new cache directive.\n\n" +
         listing.toString();
     }
 
@@ -172,14 +173,14 @@ public class CacheAdmin extends Configur
       }
         
       DistributedFileSystem dfs = getDFS(conf);
-      PathBasedCacheDirective directive = new PathBasedCacheDirective.Builder().
+      CacheDirectiveInfo directive = new CacheDirectiveInfo.Builder().
           setPath(new Path(path)).
           setReplication(replication).
           setPool(poolName).
           build();
       try {
-        long id = dfs.addPathBasedCacheDirective(directive);
-        System.out.println("Added PathBasedCache entry " + id);
+        long id = dfs.addCacheDirective(directive);
+        System.out.println("Added cache directive " + id);
       } catch (IOException e) {
         System.err.println(prettifyException(e));
         return 2;
@@ -189,7 +190,7 @@ public class CacheAdmin extends Configur
     }
   }
 
-  private static class RemovePathBasedCacheDirectiveCommand implements Command {
+  private static class RemoveCacheDirectiveInfoCommand implements Command {
     @Override
     public String getName() {
       return "-removeDirective";
@@ -206,7 +207,7 @@ public class CacheAdmin extends Configur
       listing.addRow("<id>", "The id of the cache directive to remove.  " + 
         "You must have write permission on the pool of the " +
         "directive in order to remove it.  To see a list " +
-        "of PathBasedCache directive IDs, use the -listDirectives command.");
+        "of cache directive IDs, use the -listDirectives command.");
       return getShortUsage() + "\n" +
         "Remove a cache directive.\n\n" +
         listing.toString();
@@ -239,8 +240,8 @@ public class CacheAdmin extends Configur
       }
       DistributedFileSystem dfs = getDFS(conf);
       try {
-        dfs.getClient().removePathBasedCacheDirective(id);
-        System.out.println("Removed PathBasedCache directive " + id);
+        dfs.getClient().removeCacheDirective(id);
+        System.out.println("Removed cached directive " + id);
       } catch (IOException e) {
         System.err.println(prettifyException(e));
         return 2;
@@ -249,7 +250,7 @@ public class CacheAdmin extends Configur
     }
   }
 
-  private static class ModifyPathBasedCacheDirectiveCommand implements Command {
+  private static class ModifyCacheDirectiveInfoCommand implements Command {
     @Override
     public String getName() {
       return "-modifyDirective";
@@ -274,14 +275,14 @@ public class CacheAdmin extends Configur
           "added. You must have write permission on the cache pool "
           + "in order to move a directive into it. (optional)");
       return getShortUsage() + "\n" +
-        "Modify a PathBasedCache directive.\n\n" +
+        "Modify a cache directive.\n\n" +
         listing.toString();
     }
 
     @Override
     public int run(Configuration conf, List<String> args) throws IOException {
-      PathBasedCacheDirective.Builder builder =
-        new PathBasedCacheDirective.Builder();
+      CacheDirectiveInfo.Builder builder =
+        new CacheDirectiveInfo.Builder();
       boolean modified = false;
       String idString = StringUtils.popOptionWithArgument("-id", args);
       if (idString == null) {
@@ -317,8 +318,8 @@ public class CacheAdmin extends Configur
       }
       DistributedFileSystem dfs = getDFS(conf);
       try {
-        dfs.modifyPathBasedCacheDirective(builder.build());
-        System.out.println("Modified PathBasedCache entry " + idString);
+        dfs.modifyCacheDirective(builder.build());
+        System.out.println("Modified cache directive " + idString);
       } catch (IOException e) {
         System.err.println(prettifyException(e));
         return 2;
@@ -327,7 +328,7 @@ public class CacheAdmin extends Configur
     }
   }
 
-  private static class RemovePathBasedCacheDirectivesCommand implements Command {
+  private static class RemoveCacheDirectiveInfosCommand implements Command {
     @Override
     public String getName() {
       return "-removeDirectives";
@@ -363,31 +364,31 @@ public class CacheAdmin extends Configur
         return 1;
       }
       DistributedFileSystem dfs = getDFS(conf);
-      RemoteIterator<PathBasedCacheDirective> iter =
-          dfs.listPathBasedCacheDirectives(
-              new PathBasedCacheDirective.Builder().
+      RemoteIterator<CacheDirectiveEntry> iter =
+          dfs.listCacheDirectives(
+              new CacheDirectiveInfo.Builder().
                   setPath(new Path(path)).build());
       int exitCode = 0;
       while (iter.hasNext()) {
-        PathBasedCacheDirective directive = iter.next();
+        CacheDirectiveEntry entry = iter.next();
         try {
-          dfs.removePathBasedCacheDirective(directive.getId());
-          System.out.println("Removed PathBasedCache directive " +
-              directive.getId());
+          dfs.removeCacheDirective(entry.getInfo().getId());
+          System.out.println("Removed cache directive " +
+              entry.getInfo().getId());
         } catch (IOException e) {
           System.err.println(prettifyException(e));
           exitCode = 2;
         }
       }
       if (exitCode == 0) {
-        System.out.println("Removed every PathBasedCache directive with path " +
+        System.out.println("Removed every cache directive with path " +
             path);
       }
       return exitCode;
     }
   }
 
-  private static class ListPathBasedCacheDirectiveCommand implements Command {
+  private static class ListCacheDirectiveInfoCommand implements Command {
     @Override
     public String getName() {
       return "-listDirectives";
@@ -402,21 +403,21 @@ public class CacheAdmin extends Configur
     public String getLongUsage() {
       TableListing listing = getOptionDescriptionListing();
       listing.addRow("<path>", "List only " +
-          "PathBasedCache directives with this path. " +
-          "Note that if there is a PathBasedCache directive for <path> " +
+          "cache directives with this path. " +
+          "Note that if there is a cache directive for <path> " +
           "in a cache pool that we don't have read access for, it " + 
           "will not be listed.");
       listing.addRow("<pool>", "List only path cache directives in that pool.");
       listing.addRow("-stats", "List path-based cache directive statistics.");
       return getShortUsage() + "\n" +
-        "List PathBasedCache directives.\n\n" +
+        "List cache directives.\n\n" +
         listing.toString();
     }
 
     @Override
     public int run(Configuration conf, List<String> args) throws IOException {
-      PathBasedCacheDirective.Builder builder =
-          new PathBasedCacheDirective.Builder();
+      CacheDirectiveInfo.Builder builder =
+          new CacheDirectiveInfo.Builder();
       String pathFilter = StringUtils.popOptionWithArgument("-path", args);
       if (pathFilter != null) {
         builder.setPath(new Path(pathFilter));
@@ -443,20 +444,22 @@ public class CacheAdmin extends Configur
       TableListing tableListing = tableBuilder.build();
 
       DistributedFileSystem dfs = getDFS(conf);
-      RemoteIterator<PathBasedCacheDirective> iter =
-          dfs.listPathBasedCacheDirectives(builder.build());
+      RemoteIterator<CacheDirectiveEntry> iter =
+          dfs.listCacheDirectives(builder.build());
       int numEntries = 0;
       while (iter.hasNext()) {
-        PathBasedCacheDirective directive = iter.next();
+        CacheDirectiveEntry entry = iter.next();
+        CacheDirectiveInfo directive = entry.getInfo();
+        CacheDirectiveStats stats = entry.getStats();
         List<String> row = new LinkedList<String>();
         row.add("" + directive.getId());
         row.add(directive.getPool());
         row.add("" + directive.getReplication());
         row.add(directive.getPath().toUri().getPath());
         if (printStats) {
-          row.add("" + directive.getBytesNeeded());
-          row.add("" + directive.getBytesCached());
-          row.add("" + directive.getFilesAffected());
+          row.add("" + stats.getBytesNeeded());
+          row.add("" + stats.getBytesCached());
+          row.add("" + stats.getFilesAffected());
         }
         tableListing.addRow(row.toArray(new String[0]));
         numEntries++;
@@ -838,11 +841,11 @@ public class CacheAdmin extends Configur
   }
 
   private static Command[] COMMANDS = {
-    new AddPathBasedCacheDirectiveCommand(),
-    new ModifyPathBasedCacheDirectiveCommand(),
-    new ListPathBasedCacheDirectiveCommand(),
-    new RemovePathBasedCacheDirectiveCommand(),
-    new RemovePathBasedCacheDirectivesCommand(),
+    new AddCacheDirectiveInfoCommand(),
+    new ModifyCacheDirectiveInfoCommand(),
+    new ListCacheDirectiveInfoCommand(),
+    new RemoveCacheDirectiveInfoCommand(),
+    new RemoveCacheDirectiveInfosCommand(),
     new AddCachePoolCommand(),
     new ModifyCachePoolCommand(),
     new RemoveCachePoolCommand(),

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto Thu Nov 21 17:12:58 2013
@@ -363,49 +363,53 @@ message IsFileClosedResponseProto {
   required bool result = 1;
 }
 
-message PathBasedCacheDirectiveInfoProto {
+message CacheDirectiveInfoProto {
   optional int64 id = 1;
   optional string path = 2;
   optional uint32 replication = 3;
   optional string pool = 4;
-  optional int64 bytesNeeded = 5;
-  optional int64 bytesCached = 6;
-  optional int64 filesAffected = 7;
 }
 
-message AddPathBasedCacheDirectiveRequestProto {
-  required PathBasedCacheDirectiveInfoProto info = 1;
+message CacheDirectiveStatsProto {
+  required int64 bytesNeeded = 1;
+  required int64 bytesCached = 2;
+  required int64 filesAffected = 3;
 }
 
-message AddPathBasedCacheDirectiveResponseProto {
+message AddCacheDirectiveRequestProto {
+  required CacheDirectiveInfoProto info = 1;
+}
+
+message AddCacheDirectiveResponseProto {
   required int64 id = 1;
 }
 
-message ModifyPathBasedCacheDirectiveRequestProto {
-  required PathBasedCacheDirectiveInfoProto info = 1;
+message ModifyCacheDirectiveRequestProto {
+  required CacheDirectiveInfoProto info = 1;
 }
 
-message ModifyPathBasedCacheDirectiveResponseProto {
+message ModifyCacheDirectiveResponseProto {
 }
 
-message RemovePathBasedCacheDirectiveRequestProto {
+message RemoveCacheDirectiveRequestProto {
   required int64 id = 1;
 }
 
-message RemovePathBasedCacheDirectiveResponseProto {
+message RemoveCacheDirectiveResponseProto {
 }
 
-message ListPathBasedCacheDirectivesRequestProto {
+message ListCacheDirectivesRequestProto {
   required int64 prevId = 1;
-  required PathBasedCacheDirectiveInfoProto filter = 2;
+  required CacheDirectiveInfoProto filter = 2;
 }
 
-message ListPathBasedCacheDirectivesElementProto {
-  required PathBasedCacheDirectiveInfoProto info = 1;
+message CacheDirectiveEntryProto {
+  required CacheDirectiveInfoProto info = 1;
+  required CacheDirectiveStatsProto stats = 2;
 }
 
-message ListPathBasedCacheDirectivesResponseProto {
-  repeated ListPathBasedCacheDirectivesElementProto elements = 1;
+message ListCacheDirectivesResponseProto {
+  repeated CacheDirectiveEntryProto elements = 1;
   required bool hasMore = 2;
 }
 
@@ -632,14 +636,14 @@ service ClientNamenodeProtocol {
       returns(ListCorruptFileBlocksResponseProto);
   rpc metaSave(MetaSaveRequestProto) returns(MetaSaveResponseProto);
   rpc getFileInfo(GetFileInfoRequestProto) returns(GetFileInfoResponseProto);
-  rpc addPathBasedCacheDirective(AddPathBasedCacheDirectiveRequestProto)
-      returns (AddPathBasedCacheDirectiveResponseProto);
-  rpc modifyPathBasedCacheDirective(ModifyPathBasedCacheDirectiveRequestProto)
-      returns (ModifyPathBasedCacheDirectiveResponseProto);
-  rpc removePathBasedCacheDirective(RemovePathBasedCacheDirectiveRequestProto)
-      returns (RemovePathBasedCacheDirectiveResponseProto);
-  rpc listPathBasedCacheDirectives(ListPathBasedCacheDirectivesRequestProto)
-      returns (ListPathBasedCacheDirectivesResponseProto);
+  rpc addCacheDirective(AddCacheDirectiveRequestProto)
+      returns (AddCacheDirectiveResponseProto);
+  rpc modifyCacheDirective(ModifyCacheDirectiveRequestProto)
+      returns (ModifyCacheDirectiveResponseProto);
+  rpc removeCacheDirective(RemoveCacheDirectiveRequestProto)
+      returns (RemoveCacheDirectiveResponseProto);
+  rpc listCacheDirectives(ListCacheDirectivesRequestProto)
+      returns (ListCacheDirectivesResponseProto);
   rpc addCachePool(AddCachePoolRequestProto)
       returns(AddCachePoolResponseProto);
   rpc modifyCachePool(ModifyCachePoolRequestProto)

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm Thu Nov 21 17:12:58 2013
@@ -118,7 +118,7 @@ Centralized Cache Management in HDFS
 
   Usage: <<<hdfs cacheadmin -addDirective -path <path> -replication <replication> -pool <pool-name> >>>
 
-  Add a new PathBasedCache directive.
+  Add a new cache directive.
 
 *--+--+
 \<path\> | A path to cache. The path can be a directory or a file.
@@ -135,7 +135,7 @@ Centralized Cache Management in HDFS
   Remove a cache directive.
 
 *--+--+
-\<id\> | The id of the cache directive to remove.  You must have write permission on the pool of the directive in order to remove it.  To see a list of PathBasedCache directive IDs, use the -listDirectives command.
+\<id\> | The id of the cache directive to remove.  You must have write permission on the pool of the directive in order to remove it.  To see a list of cachedirective IDs, use the -listDirectives command.
 *--+--+
 
 *** {removeDirectives}
@@ -152,10 +152,10 @@ Centralized Cache Management in HDFS
 
   Usage: <<<hdfs cacheadmin -listDirectives [-path <path>] [-pool <pool>] >>>
 
-  List PathBasedCache directives.
+  List cache directives.
 
 *--+--+
-\<path\> | List only PathBasedCache directives with this path. Note that if there is a PathBasedCache directive for <path> in a cache pool that we don't have read access for, it will not be listed.
+\<path\> | List only cache directives with this path. Note that if there is a cache directive for <path> in a cache pool that we don't have read access for, it will not be listed.
 *--+--+
 \<pool\> | List only path cache directives in that pool.
 *--+--+

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Thu Nov 21 17:12:58 2013
@@ -998,20 +998,20 @@ public class DFSTestUtil {
     // OP_MODIFY_CACHE_POOL
     filesystem.modifyCachePool(new CachePoolInfo("pool1").setWeight(99));
     // OP_ADD_PATH_BASED_CACHE_DIRECTIVE
-    long id = filesystem.addPathBasedCacheDirective(
-        new PathBasedCacheDirective.Builder().
+    long id = filesystem.addCacheDirective(
+        new CacheDirectiveInfo.Builder().
             setPath(new Path("/path")).
             setReplication((short)1).
             setPool("pool1").
             build());
     // OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE
-    filesystem.modifyPathBasedCacheDirective(
-        new PathBasedCacheDirective.Builder().
+    filesystem.modifyCacheDirective(
+        new CacheDirectiveInfo.Builder().
             setId(id).
             setReplication((short)2).
             build());
     // OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE
-    filesystem.removePathBasedCacheDirective(id);
+    filesystem.removeCacheDirective(id);
     // OP_REMOVE_CACHE_POOL
     filesystem.removeCachePool("pool1");
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java Thu Nov 21 17:12:58 2013
@@ -42,7 +42,7 @@ import org.apache.hadoop.hdfs.MiniDFSClu
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
@@ -241,20 +241,20 @@ public class OfflineEditsViewerHelper {
         .setMode(new FsPermission((short)0700))
         .setWeight(1989));
     // OP_ADD_PATH_BASED_CACHE_DIRECTIVE 33
-    long id = dfs.addPathBasedCacheDirective(
-        new PathBasedCacheDirective.Builder().
+    long id = dfs.addCacheDirective(
+        new CacheDirectiveInfo.Builder().
             setPath(new Path("/bar")).
             setReplication((short)1).
             setPool(pool).
             build());
     // OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE 38
-    dfs.modifyPathBasedCacheDirective(
-        new PathBasedCacheDirective.Builder().
+    dfs.modifyCacheDirective(
+        new CacheDirectiveInfo.Builder().
             setId(id).
             setPath(new Path("/bar2")).
             build());
     // OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE 34
-    dfs.removePathBasedCacheDirective(id);
+    dfs.removeCacheDirective(id);
     // OP_REMOVE_CACHE_POOL 37
     dfs.removeCachePool(pool);
     // sync to disk, otherwise we parse partial edits

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java?rev=1544252&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java Thu Nov 21 17:12:58 2013
@@ -0,0 +1,978 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.GSet;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Supplier;
+
+public class TestCacheDirectives {
+  static final Log LOG = LogFactory.getLog(TestCacheDirectives.class);
+
+  private static final UserGroupInformation unprivilegedUser =
+      UserGroupInformation.createRemoteUser("unprivilegedUser");
+
+  static private Configuration conf;
+  static private MiniDFSCluster cluster;
+  static private DistributedFileSystem dfs;
+  static private NamenodeProtocols proto;
+  static private CacheManipulator prevCacheManipulator;
+
+  static {
+    EditLogFileOutputStream.setShouldSkipFsyncForTesting(false);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    conf = new HdfsConfiguration();
+    // set low limits here for testing purposes
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 2);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, 2);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    dfs = cluster.getFileSystem();
+    proto = cluster.getNameNodeRpc();
+    prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
+    NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
+    LogManager.getLogger(CacheReplicationMonitor.class).setLevel(Level.TRACE);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    // Restore the original CacheManipulator
+    NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
+  }
+
+  @Test(timeout=60000)
+  public void testBasicPoolOperations() throws Exception {
+    final String poolName = "pool1";
+    CachePoolInfo info = new CachePoolInfo(poolName).
+        setOwnerName("bob").setGroupName("bobgroup").
+        setMode(new FsPermission((short)0755)).setWeight(150);
+
+    // Add a pool
+    dfs.addCachePool(info);
+
+    // Do some bad addCachePools
+    try {
+      dfs.addCachePool(info);
+      fail("added the pool with the same name twice");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("pool1 already exists", ioe);
+    }
+    try {
+      dfs.addCachePool(new CachePoolInfo(""));
+      fail("added empty pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+          ioe);
+    }
+    try {
+      dfs.addCachePool(null);
+      fail("added null pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
+    }
+    try {
+      proto.addCachePool(new CachePoolInfo(""));
+      fail("added empty pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+          ioe);
+    }
+    try {
+      proto.addCachePool(null);
+      fail("added null pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
+    }
+
+    // Modify the pool
+    info.setOwnerName("jane").setGroupName("janegroup")
+        .setMode(new FsPermission((short)0700)).setWeight(314);
+    dfs.modifyCachePool(info);
+
+    // Do some invalid modify pools
+    try {
+      dfs.modifyCachePool(new CachePoolInfo("fool"));
+      fail("modified non-existent cache pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("fool does not exist", ioe);
+    }
+    try {
+      dfs.modifyCachePool(new CachePoolInfo(""));
+      fail("modified empty pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+          ioe);
+    }
+    try {
+      dfs.modifyCachePool(null);
+      fail("modified null pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
+    }
+    try {
+      proto.modifyCachePool(new CachePoolInfo(""));
+      fail("modified empty pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+          ioe);
+    }
+    try {
+      proto.modifyCachePool(null);
+      fail("modified null pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
+    }
+
+    // Remove the pool
+    dfs.removeCachePool(poolName);
+    // Do some bad removePools
+    try {
+      dfs.removeCachePool("pool99");
+      fail("expected to get an exception when " +
+          "removing a non-existent pool.");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("Cannot remove " +
+          "non-existent cache pool", ioe);
+    }
+    try {
+      dfs.removeCachePool(poolName);
+      fail("expected to get an exception when " +
+          "removing a non-existent pool.");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("Cannot remove " +
+          "non-existent cache pool", ioe);
+    }
+    try {
+      dfs.removeCachePool("");
+      fail("removed empty pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+          ioe);
+    }
+    try {
+      dfs.removeCachePool(null);
+      fail("removed null pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+          ioe);
+    }
+    try {
+      proto.removeCachePool("");
+      fail("removed empty pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+          ioe);
+    }
+    try {
+      proto.removeCachePool(null);
+      fail("removed null pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+          ioe);
+    }
+
+    info = new CachePoolInfo("pool2");
+    dfs.addCachePool(info);
+  }
+
+  @Test(timeout=60000)
+  public void testCreateAndModifyPools() throws Exception {
+    String poolName = "pool1";
+    String ownerName = "abc";
+    String groupName = "123";
+    FsPermission mode = new FsPermission((short)0755);
+    int weight = 150;
+    dfs.addCachePool(new CachePoolInfo(poolName).
+        setOwnerName(ownerName).setGroupName(groupName).
+        setMode(mode).setWeight(weight));
+    
+    RemoteIterator<CachePoolInfo> iter = dfs.listCachePools();
+    CachePoolInfo info = iter.next();
+    assertEquals(poolName, info.getPoolName());
+    assertEquals(ownerName, info.getOwnerName());
+    assertEquals(groupName, info.getGroupName());
+
+    ownerName = "def";
+    groupName = "456";
+    mode = new FsPermission((short)0700);
+    weight = 151;
+    dfs.modifyCachePool(new CachePoolInfo(poolName).
+        setOwnerName(ownerName).setGroupName(groupName).
+        setMode(mode).setWeight(weight));
+
+    iter = dfs.listCachePools();
+    info = iter.next();
+    assertEquals(poolName, info.getPoolName());
+    assertEquals(ownerName, info.getOwnerName());
+    assertEquals(groupName, info.getGroupName());
+    assertEquals(mode, info.getMode());
+    assertEquals(Integer.valueOf(weight), info.getWeight());
+
+    dfs.removeCachePool(poolName);
+    iter = dfs.listCachePools();
+    assertFalse("expected no cache pools after deleting pool", iter.hasNext());
+
+    proto.listCachePools(null);
+
+    try {
+      proto.removeCachePool("pool99");
+      fail("expected to get an exception when " +
+          "removing a non-existent pool.");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("Cannot remove non-existent",
+          ioe);
+    }
+    try {
+      proto.removeCachePool(poolName);
+      fail("expected to get an exception when " +
+          "removing a non-existent pool.");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("Cannot remove non-existent",
+          ioe);
+    }
+
+    iter = dfs.listCachePools();
+    assertFalse("expected no cache pools after deleting pool", iter.hasNext());
+  }
+
+  private static void validateListAll(
+      RemoteIterator<CacheDirectiveEntry> iter,
+      Long... ids) throws Exception {
+    for (Long id: ids) {
+      assertTrue("Unexpectedly few elements", iter.hasNext());
+      assertEquals("Unexpected directive ID", id,
+          iter.next().getInfo().getId());
+    }
+    assertFalse("Unexpectedly many list elements", iter.hasNext());
+  }
+
+  private static long addAsUnprivileged(
+      final CacheDirectiveInfo directive) throws Exception {
+    return unprivilegedUser
+        .doAs(new PrivilegedExceptionAction<Long>() {
+          @Override
+          public Long run() throws IOException {
+            DistributedFileSystem myDfs =
+                (DistributedFileSystem) FileSystem.get(conf);
+            return myDfs.addCacheDirective(directive);
+          }
+        });
+  }
+
+  @Test(timeout=60000)
+  public void testAddRemoveDirectives() throws Exception {
+    proto.addCachePool(new CachePoolInfo("pool1").
+        setMode(new FsPermission((short)0777)));
+    proto.addCachePool(new CachePoolInfo("pool2").
+        setMode(new FsPermission((short)0777)));
+    proto.addCachePool(new CachePoolInfo("pool3").
+        setMode(new FsPermission((short)0777)));
+    proto.addCachePool(new CachePoolInfo("pool4").
+        setMode(new FsPermission((short)0)));
+
+    CacheDirectiveInfo alpha = new CacheDirectiveInfo.Builder().
+        setPath(new Path("/alpha")).
+        setPool("pool1").
+        build();
+    CacheDirectiveInfo beta = new CacheDirectiveInfo.Builder().
+        setPath(new Path("/beta")).
+        setPool("pool2").
+        build();
+    CacheDirectiveInfo delta = new CacheDirectiveInfo.Builder().
+        setPath(new Path("/delta")).
+        setPool("pool1").
+        build();
+
+    long alphaId = addAsUnprivileged(alpha);
+    long alphaId2 = addAsUnprivileged(alpha);
+    assertFalse("Expected to get unique directives when re-adding an "
+        + "existing CacheDirectiveInfo",
+        alphaId == alphaId2);
+    long betaId = addAsUnprivileged(beta);
+
+    try {
+      addAsUnprivileged(new CacheDirectiveInfo.Builder().
+          setPath(new Path("/unicorn")).
+          setPool("no_such_pool").
+          build());
+      fail("expected an error when adding to a non-existent pool.");
+    } catch (InvalidRequestException ioe) {
+      GenericTestUtils.assertExceptionContains("Unknown pool", ioe);
+    }
+
+    try {
+      addAsUnprivileged(new CacheDirectiveInfo.Builder().
+          setPath(new Path("/blackhole")).
+          setPool("pool4").
+          build());
+      fail("expected an error when adding to a pool with " +
+          "mode 0 (no permissions for anyone).");
+    } catch (AccessControlException e) {
+      GenericTestUtils.
+          assertExceptionContains("Permission denied while accessing pool", e);
+    }
+
+    try {
+      addAsUnprivileged(new CacheDirectiveInfo.Builder().
+          setPath(new Path("/illegal:path/")).
+          setPool("pool1").
+          build());
+      fail("expected an error when adding a malformed path " +
+          "to the cache directives.");
+    } catch (IllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains("is not a valid DFS filename", e);
+    }
+
+    try {
+      addAsUnprivileged(new CacheDirectiveInfo.Builder().
+          setPath(new Path("/emptypoolname")).
+          setReplication((short)1).
+          setPool("").
+          build());
+      fail("expected an error when adding a cache " +
+          "directive with an empty pool name.");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("Invalid empty pool name", e);
+    }
+
+    long deltaId = addAsUnprivileged(delta);
+
+    // We expect the following to succeed, because DistributedFileSystem
+    // qualifies the path.
+    long relativeId = addAsUnprivileged(
+        new CacheDirectiveInfo.Builder().
+            setPath(new Path("relative")).
+            setPool("pool1").
+            build());
+
+    RemoteIterator<CacheDirectiveEntry> iter;
+    iter = dfs.listCacheDirectives(null);
+    validateListAll(iter, alphaId, alphaId2, betaId, deltaId, relativeId );
+    iter = dfs.listCacheDirectives(
+        new CacheDirectiveInfo.Builder().setPool("pool3").build());
+    assertFalse(iter.hasNext());
+    iter = dfs.listCacheDirectives(
+        new CacheDirectiveInfo.Builder().setPool("pool1").build());
+    validateListAll(iter, alphaId, alphaId2, deltaId, relativeId );
+    iter = dfs.listCacheDirectives(
+        new CacheDirectiveInfo.Builder().setPool("pool2").build());
+    validateListAll(iter, betaId);
+
+    dfs.removeCacheDirective(betaId);
+    iter = dfs.listCacheDirectives(
+        new CacheDirectiveInfo.Builder().setPool("pool2").build());
+    assertFalse(iter.hasNext());
+
+    try {
+      dfs.removeCacheDirective(betaId);
+      fail("expected an error when removing a non-existent ID");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("No directive with ID", e);
+    }
+
+    try {
+      proto.removeCacheDirective(-42l);
+      fail("expected an error when removing a negative ID");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Invalid negative ID", e);
+    }
+    try {
+      proto.removeCacheDirective(43l);
+      fail("expected an error when removing a non-existent ID");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("No directive with ID", e);
+    }
+
+    dfs.removeCacheDirective(alphaId);
+    dfs.removeCacheDirective(alphaId2);
+    dfs.removeCacheDirective(deltaId);
+
+    dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().
+        setId(relativeId).
+        setReplication((short)555).
+        build());
+    iter = dfs.listCacheDirectives(null);
+    assertTrue(iter.hasNext());
+    CacheDirectiveInfo modified = iter.next().getInfo();
+    assertEquals(relativeId, modified.getId().longValue());
+    assertEquals((short)555, modified.getReplication().shortValue());
+    dfs.removeCacheDirective(relativeId);
+    iter = dfs.listCacheDirectives(null);
+    assertFalse(iter.hasNext());
+
+    // Verify that PBCDs with path "." work correctly
+    CacheDirectiveInfo directive =
+        new CacheDirectiveInfo.Builder().setPath(new Path("."))
+            .setPool("pool1").build();
+    long id = dfs.addCacheDirective(directive);
+    dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(
+        directive).setId(id).setReplication((short)2).build());
+    dfs.removeCacheDirective(id);
+  }
+
+  @Test(timeout=60000)
+  public void testCacheManagerRestart() throws Exception {
+    cluster.shutdown();
+    cluster = null;
+    HdfsConfiguration conf = createCachingConf();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+
+    cluster.waitActive();
+    DistributedFileSystem dfs = cluster.getFileSystem();
+
+    // Create and validate a pool
+    final String pool = "poolparty";
+    String groupName = "partygroup";
+    FsPermission mode = new FsPermission((short)0777);
+    int weight = 747;
+    dfs.addCachePool(new CachePoolInfo(pool)
+        .setGroupName(groupName)
+        .setMode(mode)
+        .setWeight(weight));
+    RemoteIterator<CachePoolInfo> pit = dfs.listCachePools();
+    assertTrue("No cache pools found", pit.hasNext());
+    CachePoolInfo info = pit.next();
+    assertEquals(pool, info.getPoolName());
+    assertEquals(groupName, info.getGroupName());
+    assertEquals(mode, info.getMode());
+    assertEquals(weight, (int)info.getWeight());
+    assertFalse("Unexpected # of cache pools found", pit.hasNext());
+  
+    // Create some cache entries
+    int numEntries = 10;
+    String entryPrefix = "/party-";
+    long prevId = -1;
+    for (int i=0; i<numEntries; i++) {
+      prevId = dfs.addCacheDirective(
+          new CacheDirectiveInfo.Builder().
+            setPath(new Path(entryPrefix + i)).setPool(pool).build());
+    }
+    RemoteIterator<CacheDirectiveEntry> dit
+        = dfs.listCacheDirectives(null);
+    for (int i=0; i<numEntries; i++) {
+      assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
+      CacheDirectiveInfo cd = dit.next().getInfo();
+      assertEquals(i+1, cd.getId().longValue());
+      assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
+      assertEquals(pool, cd.getPool());
+    }
+    assertFalse("Unexpected # of cache directives found", dit.hasNext());
+  
+    // Restart namenode
+    cluster.restartNameNode();
+  
+    // Check that state came back up
+    pit = dfs.listCachePools();
+    assertTrue("No cache pools found", pit.hasNext());
+    info = pit.next();
+    assertEquals(pool, info.getPoolName());
+    assertEquals(pool, info.getPoolName());
+    assertEquals(groupName, info.getGroupName());
+    assertEquals(mode, info.getMode());
+    assertEquals(weight, (int)info.getWeight());
+    assertFalse("Unexpected # of cache pools found", pit.hasNext());
+  
+    dit = dfs.listCacheDirectives(null);
+    for (int i=0; i<numEntries; i++) {
+      assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
+      CacheDirectiveInfo cd = dit.next().getInfo();
+      assertEquals(i+1, cd.getId().longValue());
+      assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
+      assertEquals(pool, cd.getPool());
+    }
+    assertFalse("Unexpected # of cache directives found", dit.hasNext());
+
+    long nextId = dfs.addCacheDirective(
+          new CacheDirectiveInfo.Builder().
+            setPath(new Path("/foobar")).setPool(pool).build());
+    assertEquals(prevId + 1, nextId);
+  }
+
+  /**
+   * Wait for the NameNode to have an expected number of cached blocks
+   * and replicas.
+   * @param nn NameNode
+   * @param expectedCachedBlocks
+   * @param expectedCachedReplicas
+   * @throws Exception
+   */
+  private static void waitForCachedBlocks(NameNode nn,
+      final int expectedCachedBlocks, final int expectedCachedReplicas,
+      final String logString) throws Exception {
+    final FSNamesystem namesystem = nn.getNamesystem();
+    final CacheManager cacheManager = namesystem.getCacheManager();
+    LOG.info("Waiting for " + expectedCachedBlocks + " blocks with " +
+             expectedCachedReplicas + " replicas.");
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        int numCachedBlocks = 0, numCachedReplicas = 0;
+        namesystem.readLock();
+        try {
+          GSet<CachedBlock, CachedBlock> cachedBlocks =
+              cacheManager.getCachedBlocks();
+          if (cachedBlocks != null) {
+            for (Iterator<CachedBlock> iter = cachedBlocks.iterator();
+                iter.hasNext(); ) {
+              CachedBlock cachedBlock = iter.next();
+              numCachedBlocks++;
+              numCachedReplicas += cachedBlock.getDatanodes(Type.CACHED).size();
+            }
+          }
+        } finally {
+          namesystem.readUnlock();
+        }
+        if ((numCachedBlocks == expectedCachedBlocks) && 
+            (numCachedReplicas == expectedCachedReplicas)) {
+          return true;
+        } else {
+          LOG.info(logString + " cached blocks: have " + numCachedBlocks +
+              " / " + expectedCachedBlocks + ".  " +
+              "cached replicas: have " + numCachedReplicas +
+              " / " + expectedCachedReplicas);
+          return false;
+        }
+      }
+    }, 500, 60000);
+  }
+
+  private static void checkNumCachedReplicas(final DistributedFileSystem dfs,
+      final List<Path> paths, final int expectedBlocks,
+      final int expectedReplicas)
+      throws Exception {
+    int numCachedBlocks = 0;
+    int numCachedReplicas = 0;
+    for (Path p: paths) {
+      final FileStatus f = dfs.getFileStatus(p);
+      final long len = f.getLen();
+      final long blockSize = f.getBlockSize();
+      // round it up to full blocks
+      final long numBlocks = (len + blockSize - 1) / blockSize;
+      BlockLocation[] locs = dfs.getFileBlockLocations(p, 0, len);
+      assertEquals("Unexpected number of block locations for path " + p,
+          numBlocks, locs.length);
+      for (BlockLocation l: locs) {
+        if (l.getCachedHosts().length > 0) {
+          numCachedBlocks++;
+        }
+        numCachedReplicas += l.getCachedHosts().length;
+      }
+    }
+    LOG.info("Found " + numCachedBlocks + " of " + expectedBlocks + " blocks");
+    LOG.info("Found " + numCachedReplicas + " of " + expectedReplicas
+        + " replicas");
+    assertEquals("Unexpected number of cached blocks", expectedBlocks,
+        numCachedBlocks);
+    assertEquals("Unexpected number of cached replicas", expectedReplicas,
+        numCachedReplicas);
+  }
+
+  private static final long BLOCK_SIZE = 512;
+  private static final int NUM_DATANODES = 4;
+
+  // Most Linux installs will allow non-root users to lock 64KB.
+  private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
+
+  private static HdfsConfiguration createCachingConf() {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true);
+    conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
+    conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
+    return conf;
+  }
+
+  @Test(timeout=120000)
+  public void testWaitForCachedReplicas() throws Exception {
+    HdfsConfiguration conf = createCachingConf();
+    FileSystemTestHelper helper = new FileSystemTestHelper();
+    MiniDFSCluster cluster =
+      new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+    try {
+      cluster.waitActive();
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      NameNode namenode = cluster.getNameNode();
+      NamenodeProtocols nnRpc = namenode.getRpcServer();
+      Path rootDir = helper.getDefaultWorkingDirectory(dfs);
+      // Create the pool
+      final String pool = "friendlyPool";
+      nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
+      // Create some test files
+      final int numFiles = 2;
+      final int numBlocksPerFile = 2;
+      final List<String> paths = new ArrayList<String>(numFiles);
+      for (int i=0; i<numFiles; i++) {
+        Path p = new Path(rootDir, "testCachePaths-" + i);
+        FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
+            (int)BLOCK_SIZE);
+        paths.add(p.toUri().getPath());
+      }
+      // Check the initial statistics at the namenode
+      waitForCachedBlocks(namenode, 0, 0, "testWaitForCachedReplicas:0");
+      // Cache and check each path in sequence
+      int expected = 0;
+      for (int i=0; i<numFiles; i++) {
+        CacheDirectiveInfo directive =
+            new CacheDirectiveInfo.Builder().
+              setPath(new Path(paths.get(i))).
+              setPool(pool).
+              build();
+        nnRpc.addCacheDirective(directive);
+        expected += numBlocksPerFile;
+        waitForCachedBlocks(namenode, expected, expected,
+            "testWaitForCachedReplicas:1");
+      }
+      // Uncache and check each path in sequence
+      RemoteIterator<CacheDirectiveEntry> entries =
+          nnRpc.listCacheDirectives(0, null);
+      for (int i=0; i<numFiles; i++) {
+        CacheDirectiveEntry entry = entries.next();
+        nnRpc.removeCacheDirective(entry.getInfo().getId());
+        expected -= numBlocksPerFile;
+        waitForCachedBlocks(namenode, expected, expected,
+            "testWaitForCachedReplicas:2");
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout=120000)
+  public void testAddingCacheDirectiveInfosWhenCachingIsDisabled()
+      throws Exception {
+    HdfsConfiguration conf = createCachingConf();
+    conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, false);
+    MiniDFSCluster cluster =
+      new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+    try {
+      cluster.waitActive();
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      NameNode namenode = cluster.getNameNode();
+      // Create the pool
+      String pool = "pool1";
+      namenode.getRpcServer().addCachePool(new CachePoolInfo(pool));
+      // Create some test files
+      final int numFiles = 2;
+      final int numBlocksPerFile = 2;
+      final List<String> paths = new ArrayList<String>(numFiles);
+      for (int i=0; i<numFiles; i++) {
+        Path p = new Path("/testCachePaths-" + i);
+        FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
+            (int)BLOCK_SIZE);
+        paths.add(p.toUri().getPath());
+      }
+      // Check the initial statistics at the namenode
+      waitForCachedBlocks(namenode, 0, 0,
+          "testAddingCacheDirectiveInfosWhenCachingIsDisabled:0");
+      // Cache and check each path in sequence
+      int expected = 0;
+      for (int i=0; i<numFiles; i++) {
+        CacheDirectiveInfo directive =
+            new CacheDirectiveInfo.Builder().
+              setPath(new Path(paths.get(i))).
+              setPool(pool).
+              build();
+        dfs.addCacheDirective(directive);
+        waitForCachedBlocks(namenode, expected, 0,
+          "testAddingCacheDirectiveInfosWhenCachingIsDisabled:1");
+      }
+      Thread.sleep(20000);
+      waitForCachedBlocks(namenode, expected, 0,
+          "testAddingCacheDirectiveInfosWhenCachingIsDisabled:2");
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout=120000)
+  public void testWaitForCachedReplicasInDirectory() throws Exception {
+    HdfsConfiguration conf = createCachingConf();
+    MiniDFSCluster cluster =
+      new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+    try {
+      cluster.waitActive();
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      NameNode namenode = cluster.getNameNode();
+      // Create the pool
+      final String pool = "friendlyPool";
+      dfs.addCachePool(new CachePoolInfo(pool));
+      // Create some test files
+      final List<Path> paths = new LinkedList<Path>();
+      paths.add(new Path("/foo/bar"));
+      paths.add(new Path("/foo/baz"));
+      paths.add(new Path("/foo2/bar2"));
+      paths.add(new Path("/foo2/baz2"));
+      dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
+      dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
+      final int numBlocksPerFile = 2;
+      for (Path path : paths) {
+        FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
+            (int)BLOCK_SIZE, (short)3, false);
+      }
+      waitForCachedBlocks(namenode, 0, 0,
+          "testWaitForCachedReplicasInDirectory:0");
+      // cache entire directory
+      long id = dfs.addCacheDirective(
+            new CacheDirectiveInfo.Builder().
+              setPath(new Path("/foo")).
+              setReplication((short)2).
+              setPool(pool).
+              build());
+      waitForCachedBlocks(namenode, 4, 8,
+          "testWaitForCachedReplicasInDirectory:1");
+      // Verify that listDirectives gives the stats we want.
+      RemoteIterator<CacheDirectiveEntry> iter =
+        dfs.listCacheDirectives(new CacheDirectiveInfo.Builder().
+            setPath(new Path("/foo")).
+            build());
+      CacheDirectiveEntry entry = iter.next();
+      CacheDirectiveStats stats = entry.getStats();
+      Assert.assertEquals(Long.valueOf(2),
+          stats.getFilesAffected());
+      Assert.assertEquals(Long.valueOf(
+          2 * numBlocksPerFile * BLOCK_SIZE * 2),
+          stats.getBytesNeeded());
+      Assert.assertEquals(Long.valueOf(
+          2 * numBlocksPerFile * BLOCK_SIZE * 2),
+          stats.getBytesCached());
+      
+      long id2 = dfs.addCacheDirective(
+            new CacheDirectiveInfo.Builder().
+              setPath(new Path("/foo/bar")).
+              setReplication((short)4).
+              setPool(pool).
+              build());
+      // wait for an additional 2 cached replicas to come up
+      waitForCachedBlocks(namenode, 4, 10,
+          "testWaitForCachedReplicasInDirectory:2");
+      // the directory directive's stats are unchanged
+      iter = dfs.listCacheDirectives(
+          new CacheDirectiveInfo.Builder().
+            setPath(new Path("/foo")).
+            build());
+      entry = iter.next();
+      stats = entry.getStats();
+      Assert.assertEquals(Long.valueOf(2),
+          stats.getFilesAffected());
+      Assert.assertEquals(Long.valueOf(
+          2 * numBlocksPerFile * BLOCK_SIZE * 2),
+          stats.getBytesNeeded());
+      Assert.assertEquals(Long.valueOf(
+          2 * numBlocksPerFile * BLOCK_SIZE * 2),
+          stats.getBytesCached());
+      // verify /foo/bar's stats
+      iter = dfs.listCacheDirectives(
+          new CacheDirectiveInfo.Builder().
+            setPath(new Path("/foo/bar")).
+            build());
+      entry = iter.next();
+      stats = entry.getStats();
+      Assert.assertEquals(Long.valueOf(1),
+          stats.getFilesAffected());
+      Assert.assertEquals(Long.valueOf(
+          4 * numBlocksPerFile * BLOCK_SIZE),
+          stats.getBytesNeeded());
+      // only 3 because the file only has 3 replicas, not 4 as requested.
+      Assert.assertEquals(Long.valueOf(
+          3 * numBlocksPerFile * BLOCK_SIZE),
+          stats.getBytesCached());
+      
+      // remove and watch numCached go to 0
+      dfs.removeCacheDirective(id);
+      dfs.removeCacheDirective(id2);
+      waitForCachedBlocks(namenode, 0, 0,
+          "testWaitForCachedReplicasInDirectory:3");
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Tests stepping the cache replication factor up and down, checking the
+   * number of cached replicas and blocks as well as the advertised locations.
+   * @throws Exception
+   */
+  @Test(timeout=120000)
+  public void testReplicationFactor() throws Exception {
+    HdfsConfiguration conf = createCachingConf();
+    MiniDFSCluster cluster =
+      new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+    try {
+      cluster.waitActive();
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      NameNode namenode = cluster.getNameNode();
+      // Create the pool
+      final String pool = "friendlyPool";
+      dfs.addCachePool(new CachePoolInfo(pool));
+      // Create some test files
+      final List<Path> paths = new LinkedList<Path>();
+      paths.add(new Path("/foo/bar"));
+      paths.add(new Path("/foo/baz"));
+      paths.add(new Path("/foo2/bar2"));
+      paths.add(new Path("/foo2/baz2"));
+      dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
+      dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
+      final int numBlocksPerFile = 2;
+      for (Path path : paths) {
+        FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
+            (int)BLOCK_SIZE, (short)3, false);
+      }
+      waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:0");
+      checkNumCachedReplicas(dfs, paths, 0, 0);
+      // cache directory
+      long id = dfs.addCacheDirective(
+          new CacheDirectiveInfo.Builder().
+            setPath(new Path("/foo")).
+            setReplication((short)1).
+            setPool(pool).
+            build());
+      waitForCachedBlocks(namenode, 4, 4, "testReplicationFactor:1");
+      checkNumCachedReplicas(dfs, paths, 4, 4);
+      // step up the replication factor
+      for (int i=2; i<=3; i++) {
+        dfs.modifyCacheDirective(
+            new CacheDirectiveInfo.Builder().
+            setId(id).
+            setReplication((short)i).
+            build());
+        waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:2");
+        checkNumCachedReplicas(dfs, paths, 4, 4*i);
+      }
+      // step it down
+      for (int i=2; i>=1; i--) {
+        dfs.modifyCacheDirective(
+            new CacheDirectiveInfo.Builder().
+            setId(id).
+            setReplication((short)i).
+            build());
+        waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:3");
+        checkNumCachedReplicas(dfs, paths, 4, 4*i);
+      }
+      // remove and watch numCached go to 0
+      dfs.removeCacheDirective(id);
+      waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:4");
+      checkNumCachedReplicas(dfs, paths, 0, 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testListCachePoolPermissions() throws Exception {
+    final UserGroupInformation myUser = UserGroupInformation
+        .createRemoteUser("myuser");
+    final DistributedFileSystem myDfs = 
+        (DistributedFileSystem)DFSTestUtil.getFileSystemAs(myUser, conf);
+    final String poolName = "poolparty";
+    dfs.addCachePool(new CachePoolInfo(poolName)
+        .setMode(new FsPermission((short)0700)));
+    // Should only see partial info
+    RemoteIterator<CachePoolInfo> it = myDfs.listCachePools();
+    CachePoolInfo info = it.next();
+    assertFalse(it.hasNext());
+    assertEquals("Expected pool name", poolName, info.getPoolName());
+    assertNull("Unexpected owner name", info.getOwnerName());
+    assertNull("Unexpected group name", info.getGroupName());
+    assertNull("Unexpected mode", info.getMode());
+    assertNull("Unexpected weight", info.getWeight());
+    // Modify the pool so myuser is now the owner
+    dfs.modifyCachePool(new CachePoolInfo(poolName)
+        .setOwnerName(myUser.getShortUserName())
+        .setWeight(99));
+    // Should see full info
+    it = myDfs.listCachePools();
+    info = it.next();
+    assertFalse(it.hasNext());
+    assertEquals("Expected pool name", poolName, info.getPoolName());
+    assertEquals("Mismatched owner name", myUser.getShortUserName(),
+        info.getOwnerName());
+    assertNotNull("Expected group name", info.getGroupName());
+    assertEquals("Mismatched mode", (short) 0700,
+        info.getMode().toShort());
+    assertEquals("Mismatched weight", 99, (int)info.getWeight());
+  }
+}