You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC

svn commit: r1619012 [21/35] - in /hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
 import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -29,9 +30,11 @@ import java.util.concurrent.atomic.Atomi
 import javax.management.ObjectName;
 
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.protocol.SnapshotInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormat;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -39,9 +42,10 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable.SnapshotDiffInfo;
 import org.apache.hadoop.metrics2.util.MBeans;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Manage snapshottable directories and their snapshots.
  * 
@@ -64,8 +68,8 @@ public class SnapshotManager implements 
   private int snapshotCounter = 0;
   
   /** All snapshottable directories in the namesystem. */
-  private final Map<Long, INodeDirectorySnapshottable> snapshottables
-      = new HashMap<Long, INodeDirectorySnapshottable>();
+  private final Map<Long, INodeDirectory> snapshottables =
+      new HashMap<Long, INodeDirectory>();
 
   public SnapshotManager(final FSDirectory fsdir) {
     this.fsdir = fsdir;
@@ -82,7 +86,7 @@ public class SnapshotManager implements 
       return;
     }
 
-    for(INodeDirectorySnapshottable s : snapshottables.values()) {
+    for(INodeDirectory s : snapshottables.values()) {
       if (s.isAncestorDirectory(dir)) {
         throw new SnapshotException(
             "Nested snapshottable directories not allowed: path=" + path
@@ -110,33 +114,30 @@ public class SnapshotManager implements 
       checkNestedSnapshottable(d, path);
     }
 
-
-    final INodeDirectorySnapshottable s;
     if (d.isSnapshottable()) {
       //The directory is already a snapshottable directory.
-      s = (INodeDirectorySnapshottable)d; 
-      s.setSnapshotQuota(INodeDirectorySnapshottable.SNAPSHOT_LIMIT);
+      d.setSnapshotQuota(DirectorySnapshottableFeature.SNAPSHOT_LIMIT);
     } else {
-      s = d.replaceSelf4INodeDirectorySnapshottable(iip.getLatestSnapshotId(),
-          fsdir.getINodeMap());
+      d.addSnapshottableFeature();
     }
-    addSnapshottable(s);
+    addSnapshottable(d);
   }
   
   /** Add the given snapshottable directory to {@link #snapshottables}. */
-  public void addSnapshottable(INodeDirectorySnapshottable dir) {
+  public void addSnapshottable(INodeDirectory dir) {
+    Preconditions.checkArgument(dir.isSnapshottable());
     snapshottables.put(dir.getId(), dir);
   }
 
   /** Remove the given snapshottable directory from {@link #snapshottables}. */
-  private void removeSnapshottable(INodeDirectorySnapshottable s) {
+  private void removeSnapshottable(INodeDirectory s) {
     snapshottables.remove(s.getId());
   }
   
   /** Remove snapshottable directories from {@link #snapshottables} */
-  public void removeSnapshottable(List<INodeDirectorySnapshottable> toRemove) {
+  public void removeSnapshottable(List<INodeDirectory> toRemove) {
     if (toRemove != null) {
-      for (INodeDirectorySnapshottable s : toRemove) {
+      for (INodeDirectory s : toRemove) {
         removeSnapshottable(s);
       }
     }
@@ -150,22 +151,22 @@ public class SnapshotManager implements 
   public void resetSnapshottable(final String path) throws IOException {
     final INodesInPath iip = fsdir.getINodesInPath4Write(path);
     final INodeDirectory d = INodeDirectory.valueOf(iip.getLastINode(), path);
-    if (!d.isSnapshottable()) {
+    DirectorySnapshottableFeature sf = d.getDirectorySnapshottableFeature();
+    if (sf == null) {
       // the directory is already non-snapshottable
       return;
     }
-    final INodeDirectorySnapshottable s = (INodeDirectorySnapshottable) d;
-    if (s.getNumSnapshots() > 0) {
+    if (sf.getNumSnapshots() > 0) {
       throw new SnapshotException("The directory " + path + " has snapshot(s). "
           + "Please redo the operation after removing all the snapshots.");
     }
 
-    if (s == fsdir.getRoot()) {
-      s.setSnapshotQuota(0); 
+    if (d == fsdir.getRoot()) {
+      d.setSnapshotQuota(0);
     } else {
-      s.replaceSelf(iip.getLatestSnapshotId(), fsdir.getINodeMap());
+      d.removeSnapshottableFeature();
     }
-    removeSnapshottable(s);
+    removeSnapshottable(d);
   }
 
   /**
@@ -178,10 +179,15 @@ public class SnapshotManager implements 
   *           Throw IOException when the given path does not lead to an
   *           existing snapshottable directory.
   */
-  public INodeDirectorySnapshottable getSnapshottableRoot(final String path
-      ) throws IOException {
-    final INodesInPath i = fsdir.getINodesInPath4Write(path);
-    return INodeDirectorySnapshottable.valueOf(i.getLastINode(), path);
+  public INodeDirectory getSnapshottableRoot(final String path)
+      throws IOException {
+    final INodeDirectory dir = INodeDirectory.valueOf(fsdir
+        .getINodesInPath4Write(path).getLastINode(), path);
+    if (!dir.isSnapshottable()) {
+      throw new SnapshotException(
+          "Directory is not a snapshottable directory: " + path);
+    }
+    return dir;
   }
 
   /**
@@ -200,7 +206,7 @@ public class SnapshotManager implements 
    */
   public String createSnapshot(final String path, String snapshotName
       ) throws IOException {
-    INodeDirectorySnapshottable srcRoot = getSnapshottableRoot(path);
+    INodeDirectory srcRoot = getSnapshottableRoot(path);
 
     if (snapshotCounter == getMaxSnapshotID()) {
       // We have reached the maximum allowable snapshot ID and since we don't
@@ -233,7 +239,7 @@ public class SnapshotManager implements 
     // parse the path, and check if the path is a snapshot path
     // the INodeDirectorySnapshottable#valueOf method will throw Exception 
     // if the path is not for a snapshottable directory
-    INodeDirectorySnapshottable srcRoot = getSnapshottableRoot(path);
+    INodeDirectory srcRoot = getSnapshottableRoot(path);
     srcRoot.removeSnapshot(snapshotName, collectedBlocks, removedINodes);
     numSnapshots.getAndDecrement();
   }
@@ -256,8 +262,7 @@ public class SnapshotManager implements 
       final String newSnapshotName) throws IOException {
     // Find the source root directory path where the snapshot was taken.
     // All the check for path has been included in the valueOf method.
-    final INodeDirectorySnapshottable srcRoot
-        = INodeDirectorySnapshottable.valueOf(fsdir.getINode(path), path);
+    final INodeDirectory srcRoot = getSnapshottableRoot(path);
     // Note that renameSnapshot and createSnapshot are synchronized externally
     // through FSNamesystem's write lock
     srcRoot.renameSnapshot(path, oldSnapshotName, newSnapshotName);
@@ -283,9 +288,26 @@ public class SnapshotManager implements 
     snapshotCounter = counter;
   }
 
-  INodeDirectorySnapshottable[] getSnapshottableDirs() {
+  INodeDirectory[] getSnapshottableDirs() {
     return snapshottables.values().toArray(
-        new INodeDirectorySnapshottable[snapshottables.size()]);
+        new INodeDirectory[snapshottables.size()]);
+  }
+
+  /**
+   * Write {@link #snapshotCounter}, {@link #numSnapshots},
+   * and all snapshots to the DataOutput.
+   */
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(snapshotCounter);
+    out.writeInt(numSnapshots.get());
+
+    // write all snapshots.
+    for(INodeDirectory snapshottableDir : snapshottables.values()) {
+      for (Snapshot s : snapshottableDir.getDirectorySnapshottableFeature()
+          .getSnapshotList()) {
+        s.write(out);
+      }
+    }
   }
   
   /**
@@ -321,16 +343,16 @@ public class SnapshotManager implements 
     
     List<SnapshottableDirectoryStatus> statusList = 
         new ArrayList<SnapshottableDirectoryStatus>();
-    for (INodeDirectorySnapshottable dir : snapshottables.values()) {
+    for (INodeDirectory dir : snapshottables.values()) {
       if (userName == null || userName.equals(dir.getUserName())) {
         SnapshottableDirectoryStatus status = new SnapshottableDirectoryStatus(
             dir.getModificationTime(), dir.getAccessTime(),
             dir.getFsPermission(), dir.getUserName(), dir.getGroupName(),
             dir.getLocalNameBytes(), dir.getId(), 
             dir.getChildrenNum(Snapshot.CURRENT_STATE_ID),
-            dir.getNumSnapshots(),
-            dir.getSnapshotQuota(), dir.getParent() == null ? 
-                DFSUtil.EMPTY_BYTES : 
+            dir.getDirectorySnapshottableFeature().getNumSnapshots(),
+            dir.getDirectorySnapshottableFeature().getSnapshotQuota(),
+            dir.getParent() == null ? DFSUtil.EMPTY_BYTES :
                 DFSUtil.string2Bytes(dir.getParent().getFullPathName()));
         statusList.add(status);
       }
@@ -344,21 +366,22 @@ public class SnapshotManager implements 
    * Compute the difference between two snapshots of a directory, or between a
    * snapshot of the directory and its current tree.
    */
-  public SnapshotDiffInfo diff(final String path, final String from,
+  public SnapshotDiffReport diff(final String path, final String from,
       final String to) throws IOException {
+    // Find the source root directory path where the snapshots were taken.
+    // All the check for path has been included in the valueOf method.
+    final INodeDirectory snapshotRoot = getSnapshottableRoot(path);
+
     if ((from == null || from.isEmpty())
         && (to == null || to.isEmpty())) {
       // both fromSnapshot and toSnapshot indicate the current tree
-      return null;
+      return new SnapshotDiffReport(path, from, to,
+          Collections.<DiffReportEntry> emptyList());
     }
-
-    // Find the source root directory path where the snapshots were taken.
-    // All the check for path has been included in the valueOf method.
-    INodesInPath inodesInPath = fsdir.getINodesInPath4Write(path.toString());
-    final INodeDirectorySnapshottable snapshotRoot = INodeDirectorySnapshottable
-        .valueOf(inodesInPath.getLastINode(), path);
-    
-    return snapshotRoot.computeDiff(from, to);
+    final SnapshotDiffInfo diffs = snapshotRoot
+        .getDirectorySnapshottableFeature().computeDiff(snapshotRoot, from, to);
+    return diffs != null ? diffs.generateReport() : new SnapshotDiffReport(
+        path, from, to, Collections.<DiffReportEntry> emptyList());
   }
   
   public void clearSnapshottableDirs() {
@@ -391,7 +414,7 @@ public class SnapshotManager implements 
     getSnapshottableDirectories() {
     List<SnapshottableDirectoryStatus.Bean> beans =
         new ArrayList<SnapshottableDirectoryStatus.Bean>();
-    for (INodeDirectorySnapshottable d : getSnapshottableDirs()) {
+    for (INodeDirectory d : getSnapshottableDirs()) {
       beans.add(toBean(d));
     }
     return beans.toArray(new SnapshottableDirectoryStatus.Bean[beans.size()]);
@@ -400,20 +423,19 @@ public class SnapshotManager implements 
   @Override // SnapshotStatsMXBean
   public SnapshotInfo.Bean[] getSnapshots() {
     List<SnapshotInfo.Bean> beans = new ArrayList<SnapshotInfo.Bean>();
-    for (INodeDirectorySnapshottable d : getSnapshottableDirs()) {
-      for (Snapshot s : d.getSnapshotList()) {
+    for (INodeDirectory d : getSnapshottableDirs()) {
+      for (Snapshot s : d.getDirectorySnapshottableFeature().getSnapshotList()) {
         beans.add(toBean(s));
       }
     }
     return beans.toArray(new SnapshotInfo.Bean[beans.size()]);
   }
 
-  public static SnapshottableDirectoryStatus.Bean toBean(
-      INodeDirectorySnapshottable d) {
+  public static SnapshottableDirectoryStatus.Bean toBean(INodeDirectory d) {
     return new SnapshottableDirectoryStatus.Bean(
         d.getFullPathName(),
-        d.getNumSnapshots(),
-        d.getSnapshotQuota(),
+        d.getDirectorySnapshottableFeature().getNumSnapshots(),
+        d.getDirectorySnapshottableFeature().getSnapshotQuota(),
         d.getModificationTime(),
         Short.valueOf(Integer.toOctalString(
             d.getFsPermissionShort())),

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java Tue Aug 19 23:49:39 2014
@@ -28,6 +28,8 @@ import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
 
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
@@ -53,8 +55,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -81,6 +86,7 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
 import org.apache.hadoop.hdfs.web.resources.DestinationParam;
 import org.apache.hadoop.hdfs.web.resources.DoAsParam;
+import org.apache.hadoop.hdfs.web.resources.ExcludeDatanodesParam;
 import org.apache.hadoop.hdfs.web.resources.GetOpParam;
 import org.apache.hadoop.hdfs.web.resources.GroupParam;
 import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
@@ -88,6 +94,7 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
 import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
 import org.apache.hadoop.hdfs.web.resources.OffsetParam;
+import org.apache.hadoop.hdfs.web.resources.OldSnapshotNameParam;
 import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
 import org.apache.hadoop.hdfs.web.resources.OwnerParam;
 import org.apache.hadoop.hdfs.web.resources.Param;
@@ -98,20 +105,30 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
 import org.apache.hadoop.hdfs.web.resources.RenewerParam;
 import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
+import org.apache.hadoop.hdfs.web.resources.SnapshotNameParam;
 import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
 import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
 import org.apache.hadoop.hdfs.web.resources.UserParam;
+import org.apache.hadoop.hdfs.web.resources.XAttrEncodingParam;
+import org.apache.hadoop.hdfs.web.resources.XAttrNameParam;
+import org.apache.hadoop.hdfs.web.resources.XAttrSetFlagParam;
+import org.apache.hadoop.hdfs.web.resources.XAttrValueParam;
+import org.apache.hadoop.hdfs.web.resources.FsActionParam;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
+import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
 import com.sun.jersey.spi.container.ResourceFilters;
 
 /** Web-hdfs NameNode implementation. */
@@ -162,22 +179,44 @@ public class NamenodeWebHdfsMethods {
 
     //clear content type
     response.setContentType(null);
+    
+    // set the remote address, if coming in via a trust proxy server then
+    // the address with be that of the proxied client
+    REMOTE_ADDRESS.set(JspHelper.getRemoteAddr(request));
   }
 
+  private void reset() {
+    REMOTE_ADDRESS.set(null);
+  }
+  
   private static NamenodeProtocols getRPCServer(NameNode namenode)
       throws IOException {
      final NamenodeProtocols np = namenode.getRpcServer();
      if (np == null) {
-       throw new IOException("Namenode is in startup mode");
+       throw new RetriableException("Namenode is in startup mode");
      }
      return np;
   }
-
+  
   @VisibleForTesting
   static DatanodeInfo chooseDatanode(final NameNode namenode,
       final String path, final HttpOpParam.Op op, final long openOffset,
-      final long blocksize) throws IOException {
+      final long blocksize, final String excludeDatanodes) throws IOException {
     final BlockManager bm = namenode.getNamesystem().getBlockManager();
+    
+    HashSet<Node> excludes = new HashSet<Node>();
+    if (excludeDatanodes != null) {
+      for (String host : StringUtils
+          .getTrimmedStringCollection(excludeDatanodes)) {
+        int idx = host.indexOf(":");
+        if (idx != -1) {          
+          excludes.add(bm.getDatanodeManager().getDatanodeByXferAddr(
+              host.substring(0, idx), Integer.parseInt(host.substring(idx + 1))));
+        } else {
+          excludes.add(bm.getDatanodeManager().getDatanodeByHost(host));
+        }
+      }
+    }
 
     if (op == PutOpParam.Op.CREATE) {
       //choose a datanode near to client 
@@ -186,7 +225,7 @@ public class NamenodeWebHdfsMethods {
       if (clientNode != null) {
         final DatanodeStorageInfo[] storages = bm.getBlockPlacementPolicy()
             .chooseTarget(path, 1, clientNode,
-                new ArrayList<DatanodeStorageInfo>(), false, null, blocksize,
+                new ArrayList<DatanodeStorageInfo>(), false, excludes, blocksize,
                 // TODO: get storage type from the file
                 StorageType.DEFAULT);
         if (storages.length > 0) {
@@ -215,7 +254,7 @@ public class NamenodeWebHdfsMethods {
         final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
         final int count = locations.locatedBlockCount();
         if (count > 0) {
-          return bestNode(locations.get(0).getLocations());
+          return bestNode(locations.get(0).getLocations(), excludes);
         }
       }
     } 
@@ -229,11 +268,14 @@ public class NamenodeWebHdfsMethods {
    * sorted based on availability and network distances, thus it is sufficient
    * to return the first element of the node here.
    */
-  private static DatanodeInfo bestNode(DatanodeInfo[] nodes) throws IOException {
-    if (nodes.length == 0 || nodes[0].isDecommissioned()) {
-      throw new IOException("No active nodes contain this block");
+  private static DatanodeInfo bestNode(DatanodeInfo[] nodes,
+      HashSet<Node> excludes) throws IOException {
+    for (DatanodeInfo dn: nodes) {
+      if (false == dn.isDecommissioned() && false == excludes.contains(dn)) {
+        return dn;
+      }
     }
-    return nodes[0];
+    throw new IOException("No active nodes contain this block");
   }
 
   private Token<? extends TokenIdentifier> generateDelegationToken(
@@ -252,11 +294,12 @@ public class NamenodeWebHdfsMethods {
       final UserGroupInformation ugi, final DelegationParam delegation,
       final UserParam username, final DoAsParam doAsUser,
       final String path, final HttpOpParam.Op op, final long openOffset,
-      final long blocksize,
+      final long blocksize, final String excludeDatanodes,
       final Param<?, ?>... parameters) throws URISyntaxException, IOException {
     final DatanodeInfo dn;
     try {
-      dn = chooseDatanode(namenode, path, op, openOffset, blocksize);
+      dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
+          excludeDatanodes);
     } catch (InvalidTopologyException ite) {
       throw new IOException("Failed to find datanode, suggest to check cluster health.", ite);
     }
@@ -333,12 +376,25 @@ public class NamenodeWebHdfsMethods {
       @QueryParam(TokenArgumentParam.NAME) @DefaultValue(TokenArgumentParam.DEFAULT)
           final TokenArgumentParam delegationTokenArgument,
       @QueryParam(AclPermissionParam.NAME) @DefaultValue(AclPermissionParam.DEFAULT) 
-          final AclPermissionParam aclPermission
-          )throws IOException, InterruptedException {
+          final AclPermissionParam aclPermission,
+      @QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT) 
+          final XAttrNameParam xattrName,
+      @QueryParam(XAttrValueParam.NAME) @DefaultValue(XAttrValueParam.DEFAULT) 
+          final XAttrValueParam xattrValue,
+      @QueryParam(XAttrSetFlagParam.NAME) @DefaultValue(XAttrSetFlagParam.DEFAULT) 
+          final XAttrSetFlagParam xattrSetFlag,
+      @QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
+          final SnapshotNameParam snapshotName,
+      @QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
+          final OldSnapshotNameParam oldSnapshotName,
+      @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+          final ExcludeDatanodesParam excludeDatanodes
+      ) throws IOException, InterruptedException {
     return put(ugi, delegation, username, doAsUser, ROOT, op, destination,
         owner, group, permission, overwrite, bufferSize, replication,
         blockSize, modificationTime, accessTime, renameOptions, createParent,
-        delegationTokenArgument,aclPermission);
+        delegationTokenArgument, aclPermission, xattrName, xattrValue,
+        xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes);
   }
 
   /** Handle HTTP PUT request. */
@@ -384,25 +440,39 @@ public class NamenodeWebHdfsMethods {
       @QueryParam(TokenArgumentParam.NAME) @DefaultValue(TokenArgumentParam.DEFAULT)
           final TokenArgumentParam delegationTokenArgument,
       @QueryParam(AclPermissionParam.NAME) @DefaultValue(AclPermissionParam.DEFAULT) 
-          final AclPermissionParam aclPermission
+          final AclPermissionParam aclPermission,
+      @QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT) 
+          final XAttrNameParam xattrName,
+      @QueryParam(XAttrValueParam.NAME) @DefaultValue(XAttrValueParam.DEFAULT) 
+          final XAttrValueParam xattrValue,
+      @QueryParam(XAttrSetFlagParam.NAME) @DefaultValue(XAttrSetFlagParam.DEFAULT) 
+          final XAttrSetFlagParam xattrSetFlag,
+      @QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
+          final SnapshotNameParam snapshotName,
+      @QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
+          final OldSnapshotNameParam oldSnapshotName,
+      @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+          final ExcludeDatanodesParam excludeDatanodes
       ) throws IOException, InterruptedException {
 
     init(ugi, delegation, username, doAsUser, path, op, destination, owner,
         group, permission, overwrite, bufferSize, replication, blockSize,
-        modificationTime, accessTime, renameOptions, delegationTokenArgument,aclPermission);
+        modificationTime, accessTime, renameOptions, delegationTokenArgument,
+        aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
+        oldSnapshotName, excludeDatanodes);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
       public Response run() throws IOException, URISyntaxException {
-        REMOTE_ADDRESS.set(request.getRemoteAddr());
         try {
           return put(ugi, delegation, username, doAsUser,
               path.getAbsolutePath(), op, destination, owner, group,
               permission, overwrite, bufferSize, replication, blockSize,
               modificationTime, accessTime, renameOptions, createParent,
-              delegationTokenArgument,aclPermission);
+              delegationTokenArgument, aclPermission, xattrName, xattrValue,
+              xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes);
         } finally {
-          REMOTE_ADDRESS.set(null);
+          reset();
         }
       }
     });
@@ -428,7 +498,13 @@ public class NamenodeWebHdfsMethods {
       final RenameOptionSetParam renameOptions,
       final CreateParentParam createParent,
       final TokenArgumentParam delegationTokenArgument,
-      final AclPermissionParam aclPermission
+      final AclPermissionParam aclPermission,
+      final XAttrNameParam xattrName,
+      final XAttrValueParam xattrValue, 
+      final XAttrSetFlagParam xattrSetFlag,
+      final SnapshotNameParam snapshotName,
+      final OldSnapshotNameParam oldSnapshotName,
+      final ExcludeDatanodesParam exclDatanodes
       ) throws IOException, URISyntaxException {
 
     final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
@@ -438,9 +514,10 @@ public class NamenodeWebHdfsMethods {
     switch(op.getValue()) {
     case CREATE:
     {
-      final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), -1L, blockSize.getValue(conf),
-          permission, overwrite, bufferSize, replication, blockSize);
+      final URI uri = redirectURI(namenode, ugi, delegation, username,
+          doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf),
+          exclDatanodes.getValue(), permission, overwrite, bufferSize,
+          replication, blockSize);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     } 
     case MKDIRS:
@@ -528,6 +605,28 @@ public class NamenodeWebHdfsMethods {
       np.setAcl(fullpath, aclPermission.getAclPermission(true));
       return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
+    case SETXATTR: {
+      np.setXAttr(
+          fullpath,
+          XAttrHelper.buildXAttr(xattrName.getXAttrName(),
+              xattrValue.getXAttrValue()), xattrSetFlag.getFlag());
+      return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
+    }
+    case REMOVEXATTR: {
+      np.removeXAttr(fullpath, XAttrHelper.buildXAttr(xattrName.getXAttrName()));
+      return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
+    }
+    case CREATESNAPSHOT: {
+      String snapshotPath = np.createSnapshot(fullpath, snapshotName.getValue());
+      final String js = JsonUtil.toJsonString(
+          org.apache.hadoop.fs.Path.class.getSimpleName(), snapshotPath);
+      return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
+    }
+    case RENAMESNAPSHOT: {
+      np.renameSnapshot(fullpath, oldSnapshotName.getValue(),
+          snapshotName.getValue());
+      return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
+    }
     default:
       throw new UnsupportedOperationException(op + " is not supported");
     }
@@ -551,9 +650,12 @@ public class NamenodeWebHdfsMethods {
       @QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
           final ConcatSourcesParam concatSrcs,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
-          final BufferSizeParam bufferSize
+          final BufferSizeParam bufferSize,
+      @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+          final ExcludeDatanodesParam excludeDatanodes
       ) throws IOException, InterruptedException {
-    return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs, bufferSize);
+    return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs,
+        bufferSize, excludeDatanodes);
   }
 
   /** Handle HTTP POST request. */
@@ -575,20 +677,23 @@ public class NamenodeWebHdfsMethods {
       @QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
           final ConcatSourcesParam concatSrcs,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
-          final BufferSizeParam bufferSize
+          final BufferSizeParam bufferSize,
+      @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+          final ExcludeDatanodesParam excludeDatanodes
       ) throws IOException, InterruptedException {
 
-    init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize);
+    init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize,
+        excludeDatanodes);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
       public Response run() throws IOException, URISyntaxException {
-        REMOTE_ADDRESS.set(request.getRemoteAddr());
         try {
           return post(ugi, delegation, username, doAsUser,
-              path.getAbsolutePath(), op, concatSrcs, bufferSize);
+              path.getAbsolutePath(), op, concatSrcs, bufferSize,
+              excludeDatanodes);
         } finally {
-          REMOTE_ADDRESS.set(null);
+          reset();
         }
       }
     });
@@ -602,15 +707,17 @@ public class NamenodeWebHdfsMethods {
       final String fullpath,
       final PostOpParam op,
       final ConcatSourcesParam concatSrcs,
-      final BufferSizeParam bufferSize
+      final BufferSizeParam bufferSize,
+      final ExcludeDatanodesParam excludeDatanodes
       ) throws IOException, URISyntaxException {
     final NameNode namenode = (NameNode)context.getAttribute("name.node");
 
     switch(op.getValue()) {
     case APPEND:
     {
-      final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), -1L, -1L, bufferSize);
+      final URI uri = redirectURI(namenode, ugi, delegation, username,
+          doAsUser, fullpath, op.getValue(), -1L, -1L,
+          excludeDatanodes.getValue(), bufferSize);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case CONCAT:
@@ -644,10 +751,18 @@ public class NamenodeWebHdfsMethods {
       @QueryParam(RenewerParam.NAME) @DefaultValue(RenewerParam.DEFAULT)
           final RenewerParam renewer,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
-          final BufferSizeParam bufferSize
+          final BufferSizeParam bufferSize,
+      @QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT) 
+          final List<XAttrNameParam> xattrNames,
+      @QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT) 
+          final XAttrEncodingParam xattrEncoding,
+      @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+          final ExcludeDatanodesParam excludeDatanodes,
+      @QueryParam(FsActionParam.NAME) @DefaultValue(FsActionParam.DEFAULT)
+          final FsActionParam fsAction
       ) throws IOException, InterruptedException {
-    return get(ugi, delegation, username, doAsUser, ROOT, op,
-        offset, length, renewer, bufferSize);
+    return get(ugi, delegation, username, doAsUser, ROOT, op, offset, length,
+        renewer, bufferSize, xattrNames, xattrEncoding, excludeDatanodes, fsAction);
   }
 
   /** Handle HTTP GET request. */
@@ -672,21 +787,29 @@ public class NamenodeWebHdfsMethods {
       @QueryParam(RenewerParam.NAME) @DefaultValue(RenewerParam.DEFAULT)
           final RenewerParam renewer,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
-          final BufferSizeParam bufferSize
+          final BufferSizeParam bufferSize,
+      @QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT) 
+          final List<XAttrNameParam> xattrNames,
+      @QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT) 
+          final XAttrEncodingParam xattrEncoding,
+      @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+          final ExcludeDatanodesParam excludeDatanodes,
+      @QueryParam(FsActionParam.NAME) @DefaultValue(FsActionParam.DEFAULT)
+          final FsActionParam fsAction
       ) throws IOException, InterruptedException {
 
-    init(ugi, delegation, username, doAsUser, path, op,
-        offset, length, renewer, bufferSize);
+    init(ugi, delegation, username, doAsUser, path, op, offset, length,
+        renewer, bufferSize, xattrEncoding, excludeDatanodes, fsAction);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
       public Response run() throws IOException, URISyntaxException {
-        REMOTE_ADDRESS.set(request.getRemoteAddr());
         try {
           return get(ugi, delegation, username, doAsUser,
-              path.getAbsolutePath(), op, offset, length, renewer, bufferSize);
+              path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
+              xattrNames, xattrEncoding, excludeDatanodes, fsAction);
         } finally {
-          REMOTE_ADDRESS.set(null);
+          reset();
         }
       }
     });
@@ -702,7 +825,11 @@ public class NamenodeWebHdfsMethods {
       final OffsetParam offset,
       final LengthParam length,
       final RenewerParam renewer,
-      final BufferSizeParam bufferSize
+      final BufferSizeParam bufferSize,
+      final List<XAttrNameParam> xattrNames,
+      final XAttrEncodingParam xattrEncoding,
+      final ExcludeDatanodesParam excludeDatanodes,
+      final FsActionParam fsAction
       ) throws IOException, URISyntaxException {
     final NameNode namenode = (NameNode)context.getAttribute("name.node");
     final NamenodeProtocols np = getRPCServer(namenode);
@@ -710,8 +837,9 @@ public class NamenodeWebHdfsMethods {
     switch(op.getValue()) {
     case OPEN:
     {
-      final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize);
+      final URI uri = redirectURI(namenode, ugi, delegation, username,
+          doAsUser, fullpath, op.getValue(), offset.getValue(), -1L,
+          excludeDatanodes.getValue(), offset, length, bufferSize);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GET_BLOCK_LOCATIONS:
@@ -747,7 +875,7 @@ public class NamenodeWebHdfsMethods {
     case GETFILECHECKSUM:
     {
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), -1L, -1L);
+          fullpath, op.getValue(), -1L, -1L, null);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GETDELEGATIONTOKEN:
@@ -777,6 +905,31 @@ public class NamenodeWebHdfsMethods {
       final String js = JsonUtil.toJsonString(status);
       return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
     }
+    case GETXATTRS: {
+      List<String> names = null;
+      if (xattrNames != null) {
+        names = Lists.newArrayListWithCapacity(xattrNames.size());
+        for (XAttrNameParam xattrName : xattrNames) {
+          if (xattrName.getXAttrName() != null) {
+            names.add(xattrName.getXAttrName());
+          }
+        }
+      }
+      List<XAttr> xAttrs = np.getXAttrs(fullpath, (names != null && 
+          !names.isEmpty()) ? XAttrHelper.buildXAttrs(names) : null);
+      final String js = JsonUtil.toJsonString(xAttrs,
+          xattrEncoding.getEncoding());
+      return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
+    }
+    case LISTXATTRS: {
+      final List<XAttr> xAttrs = np.listXAttrs(fullpath);
+      final String js = JsonUtil.toJsonString(xAttrs);
+      return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
+    }
+    case CHECKACCESS: {
+      np.checkAccess(fullpath, FsAction.getFsAction(fsAction.getValue()));
+      return Response.ok().build();
+    }
     default:
       throw new UnsupportedOperationException(op + " is not supported");
     }
@@ -860,9 +1013,12 @@ public class NamenodeWebHdfsMethods {
       @QueryParam(DeleteOpParam.NAME) @DefaultValue(DeleteOpParam.DEFAULT)
           final DeleteOpParam op,
       @QueryParam(RecursiveParam.NAME) @DefaultValue(RecursiveParam.DEFAULT)
-          final RecursiveParam recursive
+          final RecursiveParam recursive,
+      @QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
+          final SnapshotNameParam snapshotName
       ) throws IOException, InterruptedException {
-    return delete(ugi, delegation, username, doAsUser, ROOT, op, recursive);
+    return delete(ugi, delegation, username, doAsUser, ROOT, op, recursive,
+        snapshotName);
   }
 
   /** Handle HTTP DELETE request. */
@@ -881,20 +1037,21 @@ public class NamenodeWebHdfsMethods {
       @QueryParam(DeleteOpParam.NAME) @DefaultValue(DeleteOpParam.DEFAULT)
           final DeleteOpParam op,
       @QueryParam(RecursiveParam.NAME) @DefaultValue(RecursiveParam.DEFAULT)
-          final RecursiveParam recursive
+          final RecursiveParam recursive,
+      @QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
+          final SnapshotNameParam snapshotName
       ) throws IOException, InterruptedException {
 
-    init(ugi, delegation, username, doAsUser, path, op, recursive);
+    init(ugi, delegation, username, doAsUser, path, op, recursive, snapshotName);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
       public Response run() throws IOException {
-        REMOTE_ADDRESS.set(request.getRemoteAddr());
         try {
           return delete(ugi, delegation, username, doAsUser,
-              path.getAbsolutePath(), op, recursive);
+              path.getAbsolutePath(), op, recursive, snapshotName);
         } finally {
-          REMOTE_ADDRESS.set(null);
+          reset();
         }
       }
     });
@@ -907,17 +1064,22 @@ public class NamenodeWebHdfsMethods {
       final DoAsParam doAsUser,
       final String fullpath,
       final DeleteOpParam op,
-      final RecursiveParam recursive
+      final RecursiveParam recursive,
+      final SnapshotNameParam snapshotName
       ) throws IOException {
     final NameNode namenode = (NameNode)context.getAttribute("name.node");
+    final NamenodeProtocols np = getRPCServer(namenode);
 
     switch(op.getValue()) {
-    case DELETE:
-    {
-      final boolean b = getRPCServer(namenode).delete(fullpath, recursive.getValue());
+    case DELETE: {
+      final boolean b = np.delete(fullpath, recursive.getValue());
       final String js = JsonUtil.toJsonString("boolean", b);
       return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
     }
+    case DELETESNAPSHOT: {
+      np.deleteSnapshot(fullpath, snapshotName.getValue());
+      return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
+    }
     default:
       throw new UnsupportedOperationException(op + " is not supported");
     }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java Tue Aug 19 23:49:39 2014
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
@@ -50,6 +51,7 @@ public class BlockCommand extends Datano
   final String poolId;
   final Block[] blocks;
   final DatanodeInfo[][] targets;
+  final StorageType[][] targetStorageTypes;
   final String[][] targetStorageIDs;
 
   /**
@@ -62,17 +64,20 @@ public class BlockCommand extends Datano
     this.poolId = poolId;
     blocks = new Block[blocktargetlist.size()]; 
     targets = new DatanodeInfo[blocks.length][];
+    targetStorageTypes = new StorageType[blocks.length][];
     targetStorageIDs = new String[blocks.length][];
 
     for(int i = 0; i < blocks.length; i++) {
       BlockTargetPair p = blocktargetlist.get(i);
       blocks[i] = p.block;
       targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets);
+      targetStorageTypes[i] = DatanodeStorageInfo.toStorageTypes(p.targets);
       targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets);
     }
   }
 
   private static final DatanodeInfo[][] EMPTY_TARGET_DATANODES = {};
+  private static final StorageType[][] EMPTY_TARGET_STORAGE_TYPES = {};
   private static final String[][] EMPTY_TARGET_STORAGEIDS = {};
 
   /**
@@ -81,7 +86,7 @@ public class BlockCommand extends Datano
    */
   public BlockCommand(int action, String poolId, Block blocks[]) {
     this(action, poolId, blocks, EMPTY_TARGET_DATANODES,
-        EMPTY_TARGET_STORAGEIDS);
+        EMPTY_TARGET_STORAGE_TYPES, EMPTY_TARGET_STORAGEIDS);
   }
 
   /**
@@ -89,11 +94,13 @@ public class BlockCommand extends Datano
    * @param blocks blocks related to the action
    */
   public BlockCommand(int action, String poolId, Block[] blocks,
-      DatanodeInfo[][] targets, String[][] targetStorageIDs) {
+      DatanodeInfo[][] targets, StorageType[][] targetStorageTypes,
+      String[][] targetStorageIDs) {
     super(action);
     this.poolId = poolId;
     this.blocks = blocks;
     this.targets = targets;
+    this.targetStorageTypes = targetStorageTypes;
     this.targetStorageIDs = targetStorageIDs;
   }
   
@@ -109,6 +116,10 @@ public class BlockCommand extends Datano
     return targets;
   }
 
+  public StorageType[][] getTargetStorageTypes() {
+    return targetStorageTypes;
+  }
+
   public String[][] getTargetStorageIDs() {
     return targetStorageIDs;
   }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java Tue Aug 19 23:49:39 2014
@@ -32,7 +32,6 @@ public class BlockIdCommand extends Data
 
   /**
    * Create BlockCommand for the given action
-   * @param blocks blocks related to the action
    */
   public BlockIdCommand(int action, String poolId, long[] blockIds) {
     super(action);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java Tue Aug 19 23:49:39 2014
@@ -17,10 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.protocol;
 
-import java.util.Arrays;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 
 /**
@@ -39,12 +38,15 @@ public class BlocksWithLocations {
     final Block block;
     final String[] datanodeUuids;
     final String[] storageIDs;
+    final StorageType[] storageTypes;
     
     /** constructor */
-    public BlockWithLocations(Block block, String[] datanodeUuids, String[] storageIDs) {
+    public BlockWithLocations(Block block, String[] datanodeUuids,
+        String[] storageIDs, StorageType[] storageTypes) {
       this.block = block;
       this.datanodeUuids = datanodeUuids;
       this.storageIDs = storageIDs;
+      this.storageTypes = storageTypes;
     }
     
     /** get the block */
@@ -61,7 +63,12 @@ public class BlocksWithLocations {
     public String[] getStorageIDs() {
       return storageIDs;
     }
-    
+
+    /** @return the storage types */
+    public StorageType[] getStorageTypes() {
+      return storageTypes;
+    }
+
     @Override
     public String toString() {
       final StringBuilder b = new StringBuilder();
@@ -70,12 +77,18 @@ public class BlocksWithLocations {
         return b.append("[]").toString();
       }
       
-      b.append(storageIDs[0]).append('@').append(datanodeUuids[0]);
+      appendString(0, b.append("["));
       for(int i = 1; i < datanodeUuids.length; i++) {
-        b.append(", ").append(storageIDs[i]).append("@").append(datanodeUuids[i]);
+        appendString(i, b.append(","));
       }
       return b.append("]").toString();
     }
+    
+    private StringBuilder appendString(int i, StringBuilder b) {
+      return b.append("[").append(storageTypes[i]).append("]")
+              .append(storageIDs[i])
+              .append("@").append(datanodeUuids[i]);
+    }
   }
 
   private final BlockWithLocations[] blocks;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Tue Aug 19 23:49:39 2014
@@ -119,9 +119,9 @@ public interface DatanodeProtocol {
    * and should be deleted.  This function is meant to upload *all*
    * the locally-stored blocks.  It's invoked upon startup and then
    * infrequently afterwards.
-   * @param registration
-   * @param poolId - the block pool ID for the blocks
-   * @param reports - report of blocks per storage
+   * @param registration datanode registration
+   * @param poolId the block pool ID for the blocks
+   * @param reports report of blocks per storage
    *     Each finalized block is represented as 3 longs. Each under-
    *     construction replica is represented as 4 longs.
    *     This is done instead of Block[] to reduce memory used by block reports.

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java Tue Aug 19 23:49:39 2014
@@ -48,8 +48,6 @@ public class DatanodeStorage {
 
   /**
    * Create a storage with {@link State#NORMAL} and {@link StorageType#DEFAULT}.
-   *
-   * @param storageID
    */
   public DatanodeStorage(String storageID) {
     this(storageID, State.NORMAL, StorageType.DEFAULT);
@@ -84,6 +82,11 @@ public class DatanodeStorage {
   }
 
   @Override
+  public String toString() {
+    return "DatanodeStorage["+ storageID + "," + storageType + "," + state +"]";
+  }
+  
+  @Override
   public boolean equals(Object other){
     if (other == this) {
       return true;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java Tue Aug 19 23:49:39 2014
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
+import org.apache.hadoop.ipc.GenericRefreshProtocol;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 
 /** The full set of RPC methods implemented by the Namenode.  */
@@ -35,6 +36,7 @@ public interface NamenodeProtocols
           RefreshAuthorizationPolicyProtocol,
           RefreshUserMappingsProtocol,
           RefreshCallQueueProtocol,
+          GenericRefreshProtocol,
           GetUserMappingsProtocol,
           HAServiceProtocol {
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java Tue Aug 19 23:49:39 2014
@@ -22,6 +22,9 @@ import org.apache.hadoop.classification.
 
 /**
  * A BlockCommand is an instruction to a datanode to register with the namenode.
+ * This command can't be combined with other commands in the same response.
+ * This is because after the datanode processes RegisterCommand, it will skip
+ * the rest of the DatanodeCommands in the same HeartbeatResponse.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ServerCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ServerCommand.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ServerCommand.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ServerCommand.java Tue Aug 19 23:49:39 2014
@@ -39,7 +39,7 @@ public abstract class ServerCommand {
    * 
    * @see DatanodeProtocol
    * @see NamenodeProtocol
-   * @param action
+   * @param action protocol specific action
    */
   public ServerCommand(int action) {
     this.action = action;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java Tue Aug 19 23:49:39 2014
@@ -32,11 +32,16 @@ import com.google.common.base.Preconditi
  * DfsClientShm is a subclass of ShortCircuitShm which is used by the
  * DfsClient.
  * When the UNIX domain socket associated with this shared memory segment
- * closes unexpectedly, we mark the slots inside this segment as stale.
- * ShortCircuitReplica objects that contain stale slots are themselves stale,
+ * closes unexpectedly, we mark the slots inside this segment as disconnected.
+ * ShortCircuitReplica objects that contain disconnected slots are stale,
  * and will not be used to service new reads or mmap operations.
  * However, in-progress read or mmap operations will continue to proceed.
  * Once the last slot is deallocated, the segment can be safely munmapped.
+ *
+ * Slots may also become stale because the associated replica has been deleted
+ * on the DataNode.  In this case, the DataNode will clear the 'valid' bit.
+ * The client will then see these slots as stale (see
+ * #{ShortCircuitReplica#isStale}).
  */
 public class DfsClientShm extends ShortCircuitShm
     implements DomainSocketWatcher.Handler {
@@ -58,7 +63,7 @@ public class DfsClientShm extends ShortC
    *
    * {@link DfsClientShm#handle} sets this to true.
    */
-  private boolean stale = false;
+  private boolean disconnected = false;
 
   DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager,
       DomainPeer peer) throws IOException {
@@ -76,14 +81,14 @@ public class DfsClientShm extends ShortC
   }
 
   /**
-   * Determine if the shared memory segment is stale.
+   * Determine if the shared memory segment is disconnected from the DataNode.
    *
    * This must be called with the DfsClientShmManager lock held.
    *
    * @return   True if the shared memory segment is stale.
    */
-  public synchronized boolean isStale() {
-    return stale;
+  public synchronized boolean isDisconnected() {
+    return disconnected;
   }
 
   /**
@@ -97,8 +102,8 @@ public class DfsClientShm extends ShortC
   public boolean handle(DomainSocket sock) {
     manager.unregisterShm(getShmId());
     synchronized (this) {
-      Preconditions.checkState(!stale);
-      stale = true;
+      Preconditions.checkState(!disconnected);
+      disconnected = true;
       boolean hadSlots = false;
       for (Iterator<Slot> iter = slotIterator(); iter.hasNext(); ) {
         Slot slot = iter.next();

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java Tue Aug 19 23:49:39 2014
@@ -271,12 +271,12 @@ public class DfsClientShmManager impleme
             loading = false;
             finishedLoading.signalAll();
           }
-          if (shm.isStale()) {
+          if (shm.isDisconnected()) {
             // If the peer closed immediately after the shared memory segment
             // was created, the DomainSocketWatcher callback might already have
-            // fired and marked the shm as stale.  In this case, we obviously
-            // don't want to add the SharedMemorySegment to our list of valid
-            // not-full segments.
+            // fired and marked the shm as disconnected.  In this case, we
+            // obviously don't want to add the SharedMemorySegment to our list
+            // of valid not-full segments.
             if (LOG.isDebugEnabled()) {
               LOG.debug(this + ": the UNIX domain socket associated with " +
                   "this short-circuit memory closed before we could make " +
@@ -299,7 +299,7 @@ public class DfsClientShmManager impleme
     void freeSlot(Slot slot) {
       DfsClientShm shm = (DfsClientShm)slot.getShm();
       shm.unregisterSlot(slot.getSlotIdx());
-      if (shm.isStale()) {
+      if (shm.isDisconnected()) {
         // Stale shared memory segments should not be tracked here.
         Preconditions.checkState(!full.containsKey(shm.getShmId()));
         Preconditions.checkState(!notFull.containsKey(shm.getShmId()));

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java Tue Aug 19 23:49:39 2014
@@ -111,7 +111,7 @@ public class ShortCircuitCache implement
         Long evictionTimeNs = Long.valueOf(0);
         while (true) {
           Entry<Long, ShortCircuitReplica> entry = 
-              evictableMmapped.ceilingEntry(evictionTimeNs);
+              evictable.ceilingEntry(evictionTimeNs);
           if (entry == null) break;
           evictionTimeNs = entry.getKey();
           long evictionTimeMs = 
@@ -384,10 +384,6 @@ public class ShortCircuitCache implement
     this.shmManager = shmManager;
   }
 
-  public long getMmapRetryTimeoutMs() {
-    return mmapRetryTimeoutMs;
-  }
-
   public long getStaleThresholdMs() {
     return staleThresholdMs;
   }
@@ -437,11 +433,22 @@ public class ShortCircuitCache implement
   void unref(ShortCircuitReplica replica) {
     lock.lock();
     try {
-      // If the replica is stale, but we haven't purged it yet, let's do that.
-      // It would be a shame to evict a non-stale replica so that we could put
-      // a stale one into the cache.
-      if ((!replica.purged) && replica.isStale()) {
-        purge(replica);
+      // If the replica is stale or unusable, but we haven't purged it yet,
+      // let's do that.  It would be a shame to evict a non-stale replica so
+      // that we could put a stale or unusable one into the cache.
+      if (!replica.purged) {
+        String purgeReason = null;
+        if (!replica.getDataStream().getChannel().isOpen()) {
+          purgeReason = "purging replica because its data channel is closed.";
+        } else if (!replica.getMetaStream().getChannel().isOpen()) {
+          purgeReason = "purging replica because its meta channel is closed.";
+        } else if (replica.isStale()) {
+          purgeReason = "purging replica because it is stale.";
+        }
+        if (purgeReason != null) {
+          LOG.debug(this + ": " + purgeReason);
+          purge(replica);
+        }
       }
       String addedString = "";
       boolean shouldTrimEvictionMaps = false;
@@ -836,7 +843,7 @@ public class ShortCircuitCache implement
         } else if (replica.mmapData instanceof Long) {
           long lastAttemptTimeMs = (Long)replica.mmapData;
           long delta = Time.monotonicNow() - lastAttemptTimeMs;
-          if (delta < staleThresholdMs) {
+          if (delta < mmapRetryTimeoutMs) {
             if (LOG.isTraceEnabled()) {
               LOG.trace(this + ": can't create client mmap for " +
                   replica + " because we failed to " +

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java Tue Aug 19 23:49:39 2014
@@ -306,6 +306,13 @@ public class ShortCircuitShm {
           (slotAddress - baseAddress) / BYTES_PER_SLOT);
     }
 
+    /**
+     * Clear the slot.
+     */
+    void clear() {
+      unsafe.putLongVolatile(null, this.slotAddress, 0);
+    }
+
     private boolean isSet(long flag) {
       long prev = unsafe.getLongVolatile(null, this.slotAddress);
       return (prev & flag) != 0;
@@ -535,6 +542,7 @@ public class ShortCircuitShm {
     }
     allocatedSlots.set(idx, true);
     Slot slot = new Slot(calculateSlotAddress(idx), blockId);
+    slot.clear();
     slot.makeValid();
     slots[idx] = slot;
     if (LOG.isTraceEnabled()) {
@@ -583,7 +591,7 @@ public class ShortCircuitShm {
     Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId);
     if (!slot.isValid()) {
       throw new InvalidRequestException(this + ": slot " + slotIdx +
-          " has not been allocated.");
+          " is not marked as valid.");
     }
     slots[slotIdx] = slot;
     allocatedSlots.set(slotIdx, true);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java Tue Aug 19 23:49:39 2014
@@ -40,7 +40,8 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolStats;
-import org.apache.hadoop.hdfs.tools.TableListing.Justification;
+import org.apache.hadoop.tools.TableListing;
+import org.apache.hadoop.tools.TableListing.Justification;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 
@@ -503,19 +504,21 @@ public class CacheAdmin extends Configur
 
     @Override
     public String getShortUsage() {
-      return "[" + getName() + " [-stats] [-path <path>] [-pool <pool>]]\n";
+      return "[" + getName()
+          + " [-stats] [-path <path>] [-pool <pool>] [-id <id>]\n";
     }
 
     @Override
     public String getLongUsage() {
       TableListing listing = getOptionDescriptionListing();
+      listing.addRow("-stats", "List path-based cache directive statistics.");
       listing.addRow("<path>", "List only " +
           "cache directives with this path. " +
           "Note that if there is a cache directive for <path> " +
           "in a cache pool that we don't have read access for, it " + 
           "will not be listed.");
       listing.addRow("<pool>", "List only path cache directives in that pool.");
-      listing.addRow("-stats", "List path-based cache directive statistics.");
+      listing.addRow("<id>", "List the cache directive with this id.");
       return getShortUsage() + "\n" +
         "List cache directives.\n\n" +
         listing.toString();
@@ -534,6 +537,10 @@ public class CacheAdmin extends Configur
         builder.setPool(poolFilter);
       }
       boolean printStats = StringUtils.popOption("-stats", args);
+      String idFilter = StringUtils.popOptionWithArgument("-id", args);
+      if (idFilter != null) {
+        builder.setId(Long.parseLong(idFilter));
+      }
       if (!args.isEmpty()) {
         System.err.println("Can't understand argument: " + args.get(0));
         return 1;