You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2014/12/02 06:36:35 UTC

hadoop git commit: HDFS-7450. Consolidate the implementation of GetFileInfo(), GetListings() and GetContentSummary() into a single class. Contributed by Haohui Mai.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 9fa299025 -> 0af44ea84


HDFS-7450. Consolidate the implementation of GetFileInfo(), GetListings() and GetContentSummary() into a single class. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0af44ea8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0af44ea8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0af44ea8

Branch: refs/heads/trunk
Commit: 0af44ea8462437f8e7a8271b15a19677fd7f05a1
Parents: 9fa2990
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Dec 1 15:28:10 2014 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Mon Dec 1 21:36:25 2014 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../server/namenode/FSDirStatAndListingOp.java  | 480 +++++++++++++++++++
 .../hdfs/server/namenode/FSDirectory.java       | 362 +-------------
 .../hdfs/server/namenode/FSEditLogLoader.java   |   7 +-
 .../hdfs/server/namenode/FSNamesystem.java      | 128 ++---
 .../hdfs/server/namenode/NameNodeAdapter.java   |   3 +-
 6 files changed, 534 insertions(+), 449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af44ea8/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index dfdca8d..d5c1fe5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -407,6 +407,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7210. Avoid two separate RPC's namenode.append() and namenode.getFileInfo() 
     for an append call from DFSClient. (Vinayakumar B via umamahesh)
 
+    HDFS-7450. Consolidate the implementation of GetFileInfo(), GetListings() and
+    GetContentSummary() into a single class. (wheat9)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af44ea8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
new file mode 100644
index 0000000..35b3a6b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -0,0 +1,480 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+
+class FSDirStatAndListingOp {
+  static DirectoryListing getListingInt(
+      FSDirectory fsd, final String srcArg, byte[] startAfter,
+      boolean needLocation)
+    throws IOException {
+    String src = srcArg;
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    String startAfterString = new String(startAfter);
+    src = fsd.resolvePath(pc, src, pathComponents);
+
+    // Get file name when startAfter is an INodePath
+    if (FSDirectory.isReservedName(startAfterString)) {
+      byte[][] startAfterComponents = FSDirectory
+          .getPathComponentsForReservedPath(startAfterString);
+      try {
+        String tmp = FSDirectory.resolvePath(src, startAfterComponents, fsd);
+        byte[][] regularPath = INode.getPathComponents(tmp);
+        startAfter = regularPath[regularPath.length - 1];
+      } catch (IOException e) {
+        // Possibly the inode is deleted
+        throw new DirectoryListingStartAfterNotFoundException(
+            "Can't find startAfter " + startAfterString);
+      }
+    }
+
+    boolean isSuperUser = true;
+    if (fsd.isPermissionEnabled()) {
+      if (fsd.isDir(src)) {
+        fsd.checkPathAccess(pc, src, FsAction.READ_EXECUTE);
+      } else {
+        fsd.checkTraverse(pc, src);
+      }
+      isSuperUser = pc.isSuperUser();
+    }
+    return getListing(fsd, src, startAfter, needLocation, isSuperUser);
+  }
+
+  /**
+   * Get the file info for a specific file.
+   *
+   * @param srcArg The string representation of the path to the file
+   * @param resolveLink whether to throw UnresolvedLinkException
+   *        if src refers to a symlink
+   *
+   * @return object containing information regarding the file
+   *         or null if file not found
+   */
+  static HdfsFileStatus getFileInfo(
+      FSDirectory fsd, String srcArg, boolean resolveLink)
+      throws IOException {
+    String src = srcArg;
+    if (!DFSUtil.isValidName(src)) {
+      throw new InvalidPathException("Invalid file name: " + src);
+    }
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    src = fsd.resolvePath(pc, src, pathComponents);
+    boolean isSuperUser = true;
+    if (fsd.isPermissionEnabled()) {
+      fsd.checkPermission(pc, src, false, null, null, null, null, false,
+          resolveLink);
+      isSuperUser = pc.isSuperUser();
+    }
+    return getFileInfo(fsd, src, resolveLink,
+        FSDirectory.isReservedRawName(srcArg), isSuperUser);
+  }
+
+  /**
+   * Returns true if the file is closed
+   */
+  static boolean isFileClosed(FSDirectory fsd, String src) throws IOException {
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    src = fsd.resolvePath(pc, src, pathComponents);
+    if (fsd.isPermissionEnabled()) {
+      fsd.checkTraverse(pc, src);
+    }
+    return !INodeFile.valueOf(fsd.getINode(src), src).isUnderConstruction();
+  }
+
+  static ContentSummary getContentSummary(
+      FSDirectory fsd, String src) throws IOException {
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    src = fsd.resolvePath(pc, src, pathComponents);
+    if (fsd.isPermissionEnabled()) {
+      fsd.checkPermission(pc, src, false, null, null, null,
+          FsAction.READ_EXECUTE);
+    }
+    return getContentSummaryInt(fsd, src);
+  }
+
+  /**
+   * Get a partial listing of the indicated directory
+   *
+   * We will stop when any of the following conditions is met:
+   * 1) this.lsLimit files have been added
+   * 2) needLocation is true AND enough files have been added such
+   * that at least this.lsLimit block locations are in the response
+   *
+   * @param fsd FSDirectory
+   * @param src the directory name
+   * @param startAfter the name to start listing after
+   * @param needLocation if block locations are returned
+   * @return a partial listing starting after startAfter
+   */
+  private static DirectoryListing getListing(
+      FSDirectory fsd, String src, byte[] startAfter, boolean needLocation,
+      boolean isSuperUser)
+      throws IOException {
+    String srcs = FSDirectory.normalizePath(src);
+    final boolean isRawPath = FSDirectory.isReservedRawName(src);
+
+    fsd.readLock();
+    try {
+      if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
+        return getSnapshotsListing(fsd, srcs, startAfter);
+      }
+      final INodesInPath inodesInPath = fsd.getINodesInPath(srcs, true);
+      final INode[] inodes = inodesInPath.getINodes();
+      final int snapshot = inodesInPath.getPathSnapshotId();
+      final INode targetNode = inodes[inodes.length - 1];
+      if (targetNode == null)
+        return null;
+      byte parentStoragePolicy = isSuperUser ?
+          targetNode.getStoragePolicyID() : BlockStoragePolicySuite
+          .ID_UNSPECIFIED;
+
+      if (!targetNode.isDirectory()) {
+        return new DirectoryListing(
+            new HdfsFileStatus[]{createFileStatus(fsd,
+                HdfsFileStatus.EMPTY_NAME, targetNode, needLocation,
+                parentStoragePolicy, snapshot, isRawPath, inodesInPath)}, 0);
+      }
+
+      final INodeDirectory dirInode = targetNode.asDirectory();
+      final ReadOnlyList<INode> contents = dirInode.getChildrenList(snapshot);
+      int startChild = INodeDirectory.nextChild(contents, startAfter);
+      int totalNumChildren = contents.size();
+      int numOfListing = Math.min(totalNumChildren - startChild,
+          fsd.getLsLimit());
+      int locationBudget = fsd.getLsLimit();
+      int listingCnt = 0;
+      HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
+      for (int i=0; i<numOfListing && locationBudget>0; i++) {
+        INode cur = contents.get(startChild+i);
+        byte curPolicy = isSuperUser && !cur.isSymlink()?
+            cur.getLocalStoragePolicyID():
+            BlockStoragePolicySuite.ID_UNSPECIFIED;
+        listing[i] = createFileStatus(fsd, cur.getLocalNameBytes(), cur,
+            needLocation, fsd.getStoragePolicyID(curPolicy,
+                parentStoragePolicy), snapshot, isRawPath, inodesInPath);
+        listingCnt++;
+        if (needLocation) {
+            // Once we  hit lsLimit locations, stop.
+            // This helps to prevent excessively large response payloads.
+            // Approximate #locations with locatedBlockCount() * repl_factor
+            LocatedBlocks blks =
+                ((HdfsLocatedFileStatus)listing[i]).getBlockLocations();
+            locationBudget -= (blks == null) ? 0 :
+               blks.locatedBlockCount() * listing[i].getReplication();
+        }
+      }
+      // truncate return array if necessary
+      if (listingCnt < numOfListing) {
+          listing = Arrays.copyOf(listing, listingCnt);
+      }
+      return new DirectoryListing(
+          listing, totalNumChildren-startChild-listingCnt);
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+
+  /**
+   * Get a listing of all the snapshots of a snapshottable directory
+   */
+  private static DirectoryListing getSnapshotsListing(
+      FSDirectory fsd, String src, byte[] startAfter)
+      throws IOException {
+    Preconditions.checkState(fsd.hasReadLock());
+    Preconditions.checkArgument(
+        src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR),
+        "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
+
+    final String dirPath = FSDirectory.normalizePath(src.substring(0,
+        src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
+
+    final INode node = fsd.getINode(dirPath);
+    final INodeDirectory dirNode = INodeDirectory.valueOf(node, dirPath);
+    final DirectorySnapshottableFeature sf = dirNode.getDirectorySnapshottableFeature();
+    if (sf == null) {
+      throw new SnapshotException(
+          "Directory is not a snapshottable directory: " + dirPath);
+    }
+    final ReadOnlyList<Snapshot> snapshots = sf.getSnapshotList();
+    int skipSize = ReadOnlyList.Util.binarySearch(snapshots, startAfter);
+    skipSize = skipSize < 0 ? -skipSize - 1 : skipSize + 1;
+    int numOfListing = Math.min(snapshots.size() - skipSize, fsd.getLsLimit());
+    final HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
+    for (int i = 0; i < numOfListing; i++) {
+      Snapshot.Root sRoot = snapshots.get(i + skipSize).getRoot();
+      listing[i] = createFileStatus(fsd, sRoot.getLocalNameBytes(), sRoot,
+          BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
+          false, null);
+    }
+    return new DirectoryListing(
+        listing, snapshots.size() - skipSize - numOfListing);
+  }
+
+  /** Get the file info for a specific file.
+   * @param fsd FSDirectory
+   * @param src The string representation of the path to the file
+   * @param resolveLink whether to throw UnresolvedLinkException
+   * @param isRawPath true if a /.reserved/raw pathname was passed by the user
+   * @param includeStoragePolicy whether to include storage policy
+   * @return object containing information regarding the file
+   *         or null if file not found
+   */
+  static HdfsFileStatus getFileInfo(
+      FSDirectory fsd, String src, boolean resolveLink, boolean isRawPath,
+      boolean includeStoragePolicy)
+    throws IOException {
+    String srcs = FSDirectory.normalizePath(src);
+    fsd.readLock();
+    try {
+      if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
+        return getFileInfo4DotSnapshot(fsd, srcs);
+      }
+      final INodesInPath inodesInPath = fsd.getINodesInPath(srcs, resolveLink);
+      final INode[] inodes = inodesInPath.getINodes();
+      final INode i = inodes[inodes.length - 1];
+      byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ?
+          i.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED;
+      return i == null ? null : createFileStatus(fsd,
+          HdfsFileStatus.EMPTY_NAME, i, policyId,
+          inodesInPath.getPathSnapshotId(), isRawPath, inodesInPath);
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+
+  /**
+   * Currently we only support "ls /xxx/.snapshot" which will return all the
+   * snapshots of a directory. The FSCommand Ls will first call getFileInfo to
+   * make sure the file/directory exists (before the real getListing call).
+   * Since we do not have a real INode for ".snapshot", we return an empty
+   * non-null HdfsFileStatus here.
+   */
+  private static HdfsFileStatus getFileInfo4DotSnapshot(
+      FSDirectory fsd, String src)
+      throws UnresolvedLinkException {
+    if (fsd.getINode4DotSnapshot(src) != null) {
+      return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
+          HdfsFileStatus.EMPTY_NAME, -1L, 0, null,
+          BlockStoragePolicySuite.ID_UNSPECIFIED);
+    }
+    return null;
+  }
+
+  /**
+   * create an hdfs file status from an inode
+   *
+   * @param fsd FSDirectory
+   * @param path the local name
+   * @param node inode
+   * @param needLocation if block locations need to be included or not
+   * @param isRawPath true if this is being called on behalf of a path in
+   *                  /.reserved/raw
+   * @return a file status
+   * @throws java.io.IOException if any error occurs
+   */
+  static HdfsFileStatus createFileStatus(
+      FSDirectory fsd, byte[] path, INode node, boolean needLocation,
+      byte storagePolicy, int snapshot, boolean isRawPath, INodesInPath iip)
+      throws IOException {
+    if (needLocation) {
+      return createLocatedFileStatus(fsd, path, node, storagePolicy,
+          snapshot, isRawPath, iip);
+    } else {
+      return createFileStatus(fsd, path, node, storagePolicy, snapshot,
+          isRawPath, iip);
+    }
+  }
+
+  /**
+   * Create FileStatus by file INode
+   */
+  static HdfsFileStatus createFileStatus(
+      FSDirectory fsd, byte[] path, INode node, byte storagePolicy,
+      int snapshot, boolean isRawPath, INodesInPath iip) throws IOException {
+     long size = 0;     // length is zero for directories
+     short replication = 0;
+     long blocksize = 0;
+     final boolean isEncrypted;
+
+     final FileEncryptionInfo feInfo = isRawPath ? null :
+         fsd.getFileEncryptionInfo(node, snapshot, iip);
+
+     if (node.isFile()) {
+       final INodeFile fileNode = node.asFile();
+       size = fileNode.computeFileSize(snapshot);
+       replication = fileNode.getFileReplication(snapshot);
+       blocksize = fileNode.getPreferredBlockSize();
+       isEncrypted = (feInfo != null) ||
+           (isRawPath && fsd.isInAnEZ(INodesInPath.fromINode(node)));
+     } else {
+       isEncrypted = fsd.isInAnEZ(INodesInPath.fromINode(node));
+     }
+
+     int childrenNum = node.isDirectory() ?
+         node.asDirectory().getChildrenNum(snapshot) : 0;
+
+     return new HdfsFileStatus(
+        size,
+        node.isDirectory(),
+        replication,
+        blocksize,
+        node.getModificationTime(snapshot),
+        node.getAccessTime(snapshot),
+        getPermissionForFileStatus(node, snapshot, isEncrypted),
+        node.getUserName(snapshot),
+        node.getGroupName(snapshot),
+        node.isSymlink() ? node.asSymlink().getSymlink() : null,
+        path,
+        node.getId(),
+        childrenNum,
+        feInfo,
+        storagePolicy);
+  }
+
+  /**
+   * Create FileStatus with location info by file INode
+   */
+  private static HdfsLocatedFileStatus createLocatedFileStatus(
+      FSDirectory fsd, byte[] path, INode node, byte storagePolicy,
+      int snapshot, boolean isRawPath, INodesInPath iip) throws IOException {
+    assert fsd.hasReadLock();
+    long size = 0; // length is zero for directories
+    short replication = 0;
+    long blocksize = 0;
+    LocatedBlocks loc = null;
+    final boolean isEncrypted;
+    final FileEncryptionInfo feInfo = isRawPath ? null :
+        fsd.getFileEncryptionInfo(node, snapshot, iip);
+    if (node.isFile()) {
+      final INodeFile fileNode = node.asFile();
+      size = fileNode.computeFileSize(snapshot);
+      replication = fileNode.getFileReplication(snapshot);
+      blocksize = fileNode.getPreferredBlockSize();
+
+      final boolean inSnapshot = snapshot != Snapshot.CURRENT_STATE_ID;
+      final boolean isUc = !inSnapshot && fileNode.isUnderConstruction();
+      final long fileSize = !inSnapshot && isUc ?
+          fileNode.computeFileSizeNotIncludingLastUcBlock() : size;
+
+      loc = fsd.getFSNamesystem().getBlockManager().createLocatedBlocks(
+          fileNode.getBlocks(), fileSize, isUc, 0L, size, false,
+          inSnapshot, feInfo);
+      if (loc == null) {
+        loc = new LocatedBlocks();
+      }
+      isEncrypted = (feInfo != null) ||
+          (isRawPath && fsd.isInAnEZ(INodesInPath.fromINode(node)));
+    } else {
+      isEncrypted = fsd.isInAnEZ(INodesInPath.fromINode(node));
+    }
+    int childrenNum = node.isDirectory() ?
+        node.asDirectory().getChildrenNum(snapshot) : 0;
+
+    HdfsLocatedFileStatus status =
+        new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
+          blocksize, node.getModificationTime(snapshot),
+          node.getAccessTime(snapshot),
+          getPermissionForFileStatus(node, snapshot, isEncrypted),
+          node.getUserName(snapshot), node.getGroupName(snapshot),
+          node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
+          node.getId(), loc, childrenNum, feInfo, storagePolicy);
+    // Set caching information for the located blocks.
+    if (loc != null) {
+      CacheManager cacheManager = fsd.getFSNamesystem().getCacheManager();
+      for (LocatedBlock lb: loc.getLocatedBlocks()) {
+        cacheManager.setCachedLocations(lb);
+      }
+    }
+    return status;
+  }
+
+  /**
+   * Returns an inode's FsPermission for use in an outbound FileStatus.  If the
+   * inode has an ACL or is for an encrypted file/dir, then this method will
+   * return an FsPermissionExtension.
+   *
+   * @param node INode to check
+   * @param snapshot int snapshot ID
+   * @param isEncrypted boolean true if the file/dir is encrypted
+   * @return FsPermission from inode, with ACL bit on if the inode has an ACL
+   * and encrypted bit on if it represents an encrypted file/dir.
+   */
+  private static FsPermission getPermissionForFileStatus(
+      INode node, int snapshot, boolean isEncrypted) {
+    FsPermission perm = node.getFsPermission(snapshot);
+    boolean hasAcl = node.getAclFeature(snapshot) != null;
+    if (hasAcl || isEncrypted) {
+      perm = new FsPermissionExtension(perm, hasAcl, isEncrypted);
+    }
+    return perm;
+  }
+
+  private static ContentSummary getContentSummaryInt(
+      FSDirectory fsd, String src) throws IOException {
+    String srcs = FSDirectory.normalizePath(src);
+    fsd.readLock();
+    try {
+      INode targetNode = fsd.getNode(srcs, false);
+      if (targetNode == null) {
+        throw new FileNotFoundException("File does not exist: " + srcs);
+      }
+      else {
+        // Make it relinquish locks everytime contentCountLimit entries are
+        // processed. 0 means disabled. I.e. blocking for the entire duration.
+        ContentSummaryComputationContext cscc =
+            new ContentSummaryComputationContext(fsd, fsd.getFSNamesystem(),
+                fsd.getContentCountLimit());
+        ContentSummary cs = targetNode.computeAndConvertContentSummary(cscc);
+        fsd.addYieldCount(cscc.getYieldCount());
+        return cs;
+      }
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af44ea8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 07cad42..0c0b2af 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -63,16 +63,11 @@ import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
-import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
@@ -86,16 +81,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root;
 import org.apache.hadoop.hdfs.util.ByteArray;
 import org.apache.hadoop.hdfs.util.ChunkedArrayList;
-import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
@@ -259,7 +252,7 @@ public class FSDirectory implements Closeable {
     ezManager = new EncryptionZoneManager(this, conf);
   }
     
-  private FSNamesystem getFSNamesystem() {
+  FSNamesystem getFSNamesystem() {
     return namesystem;
   }
 
@@ -276,6 +269,14 @@ public class FSDirectory implements Closeable {
     return isPermissionEnabled;
   }
 
+  int getLsLimit() {
+    return lsLimit;
+  }
+
+  int getContentCountLimit() {
+    return contentCountLimit;
+  }
+
   FSEditLog getEditLog() {
     return editLog;
   }
@@ -1343,172 +1344,12 @@ public class FSDirectory implements Closeable {
     return removed;
   }
 
-  private byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
+  byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
     return inodePolicy != BlockStoragePolicySuite.ID_UNSPECIFIED ? inodePolicy :
         parentPolicy;
   }
 
-  /**
-   * Get a partial listing of the indicated directory
-   *
-   * We will stop when any of the following conditions is met:
-   * 1) this.lsLimit files have been added
-   * 2) needLocation is true AND enough files have been added such
-   * that at least this.lsLimit block locations are in the response
-   *
-   * @param src the directory name
-   * @param startAfter the name to start listing after
-   * @param needLocation if block locations are returned
-   * @return a partial listing starting after startAfter
-   */
-  DirectoryListing getListing(String src, byte[] startAfter,
-      boolean needLocation, boolean isSuperUser)
-      throws UnresolvedLinkException, IOException {
-    String srcs = normalizePath(src);
-    final boolean isRawPath = isReservedRawName(src);
-
-    readLock();
-    try {
-      if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
-        return getSnapshotsListing(srcs, startAfter);
-      }
-      final INodesInPath inodesInPath = getINodesInPath(srcs, true);
-      final INode[] inodes = inodesInPath.getINodes();
-      final int snapshot = inodesInPath.getPathSnapshotId();
-      final INode targetNode = inodes[inodes.length - 1];
-      if (targetNode == null)
-        return null;
-      byte parentStoragePolicy = isSuperUser ?
-          targetNode.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED;
-      
-      if (!targetNode.isDirectory()) {
-        return new DirectoryListing(
-            new HdfsFileStatus[]{createFileStatus(HdfsFileStatus.EMPTY_NAME,
-                targetNode, needLocation, parentStoragePolicy, snapshot,
-                isRawPath, inodesInPath)}, 0);
-      }
-
-      final INodeDirectory dirInode = targetNode.asDirectory();
-      final ReadOnlyList<INode> contents = dirInode.getChildrenList(snapshot);
-      int startChild = INodeDirectory.nextChild(contents, startAfter);
-      int totalNumChildren = contents.size();
-      int numOfListing = Math.min(totalNumChildren-startChild, this.lsLimit);
-      int locationBudget = this.lsLimit;
-      int listingCnt = 0;
-      HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
-      for (int i=0; i<numOfListing && locationBudget>0; i++) {
-        INode cur = contents.get(startChild+i);
-        byte curPolicy = isSuperUser && !cur.isSymlink()?
-            cur.getLocalStoragePolicyID():
-            BlockStoragePolicySuite.ID_UNSPECIFIED;
-        listing[i] = createFileStatus(cur.getLocalNameBytes(), cur, needLocation,
-            getStoragePolicyID(curPolicy, parentStoragePolicy), snapshot,
-            isRawPath, inodesInPath);
-        listingCnt++;
-        if (needLocation) {
-            // Once we  hit lsLimit locations, stop.
-            // This helps to prevent excessively large response payloads.
-            // Approximate #locations with locatedBlockCount() * repl_factor
-            LocatedBlocks blks = 
-                ((HdfsLocatedFileStatus)listing[i]).getBlockLocations();
-            locationBudget -= (blks == null) ? 0 :
-               blks.locatedBlockCount() * listing[i].getReplication();
-        }
-      }
-      // truncate return array if necessary
-      if (listingCnt < numOfListing) {
-          listing = Arrays.copyOf(listing, listingCnt);
-      }
-      return new DirectoryListing(
-          listing, totalNumChildren-startChild-listingCnt);
-    } finally {
-      readUnlock();
-    }
-  }
-  
-  /**
-   * Get a listing of all the snapshots of a snapshottable directory
-   */
-  private DirectoryListing getSnapshotsListing(String src, byte[] startAfter)
-      throws UnresolvedLinkException, IOException {
-    Preconditions.checkState(hasReadLock());
-    Preconditions.checkArgument(
-        src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR),
-        "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
-
-    final String dirPath = normalizePath(src.substring(0,
-        src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
-    
-    final INode node = this.getINode(dirPath);
-    final INodeDirectory dirNode = INodeDirectory.valueOf(node, dirPath);
-    final DirectorySnapshottableFeature sf = dirNode.getDirectorySnapshottableFeature();
-    if (sf == null) {
-      throw new SnapshotException(
-          "Directory is not a snapshottable directory: " + dirPath);
-    }
-    final ReadOnlyList<Snapshot> snapshots = sf.getSnapshotList();
-    int skipSize = ReadOnlyList.Util.binarySearch(snapshots, startAfter);
-    skipSize = skipSize < 0 ? -skipSize - 1 : skipSize + 1;
-    int numOfListing = Math.min(snapshots.size() - skipSize, this.lsLimit);
-    final HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
-    for (int i = 0; i < numOfListing; i++) {
-      Root sRoot = snapshots.get(i + skipSize).getRoot();
-      listing[i] = createFileStatus(sRoot.getLocalNameBytes(), sRoot,
-          BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
-          false, null);
-    }
-    return new DirectoryListing(
-        listing, snapshots.size() - skipSize - numOfListing);
-  }
-
-  /** Get the file info for a specific file.
-   * @param src The string representation of the path to the file
-   * @param resolveLink whether to throw UnresolvedLinkException
-   * @param isRawPath true if a /.reserved/raw pathname was passed by the user
-   * @param includeStoragePolicy whether to include storage policy
-   * @return object containing information regarding the file
-   *         or null if file not found
-   */
-  HdfsFileStatus getFileInfo(String src, boolean resolveLink,
-      boolean isRawPath, boolean includeStoragePolicy)
-    throws IOException {
-    String srcs = normalizePath(src);
-    readLock();
-    try {
-      if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
-        return getFileInfo4DotSnapshot(srcs);
-      }
-      final INodesInPath inodesInPath = getINodesInPath(srcs, resolveLink);
-      final INode[] inodes = inodesInPath.getINodes();
-      final INode i = inodes[inodes.length - 1];
-      byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ?
-          i.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED;
-      return i == null ? null : createFileStatus(HdfsFileStatus.EMPTY_NAME, i,
-          policyId, inodesInPath.getPathSnapshotId(), isRawPath,
-          inodesInPath);
-    } finally {
-      readUnlock();
-    }
-  }
-  
-  /**
-   * Currently we only support "ls /xxx/.snapshot" which will return all the
-   * snapshots of a directory. The FSCommand Ls will first call getFileInfo to
-   * make sure the file/directory exists (before the real getListing call).
-   * Since we do not have a real INode for ".snapshot", we return an empty
-   * non-null HdfsFileStatus here.
-   */
-  private HdfsFileStatus getFileInfo4DotSnapshot(String src)
-      throws UnresolvedLinkException {
-    if (getINode4DotSnapshot(src) != null) {
-      return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
-          HdfsFileStatus.EMPTY_NAME, -1L, 0, null,
-          BlockStoragePolicySuite.ID_UNSPECIFIED);
-    }
-    return null;
-  }
-
-  private INode getINode4DotSnapshot(String src) throws UnresolvedLinkException {
+  INode getINode4DotSnapshot(String src) throws UnresolvedLinkException {
     Preconditions.checkArgument(
         src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR),
         "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
@@ -2090,36 +1931,15 @@ public class FSDirectory implements Closeable {
     return src;
   }
 
-  ContentSummary getContentSummary(String src) 
-    throws FileNotFoundException, UnresolvedLinkException {
-    String srcs = normalizePath(src);
-    readLock();
-    try {
-      INode targetNode = getNode(srcs, false);
-      if (targetNode == null) {
-        throw new FileNotFoundException("File does not exist: " + srcs);
-      }
-      else {
-        // Make it relinquish locks everytime contentCountLimit entries are
-        // processed. 0 means disabled. I.e. blocking for the entire duration.
-        ContentSummaryComputationContext cscc =
-
-            new ContentSummaryComputationContext(this, getFSNamesystem(),
-            contentCountLimit);
-        ContentSummary cs = targetNode.computeAndConvertContentSummary(cscc);
-        yieldCount += cscc.getYieldCount();
-        return cs;
-      }
-    } finally {
-      readUnlock();
-    }
-  }
-
   @VisibleForTesting
   public long getYieldCount() {
     return yieldCount;
   }
 
+  void addYieldCount(long value) {
+    yieldCount += value;
+  }
+
   public INodeMap getINodeMap() {
     return inodeMap;
   }
@@ -2329,153 +2149,6 @@ public class FSDirectory implements Closeable {
   }
 
   /**
-   * create an hdfs file status from an inode
-   * 
-   * @param path the local name
-   * @param node inode
-   * @param needLocation if block locations need to be included or not
-   * @param isRawPath true if this is being called on behalf of a path in
-   *                  /.reserved/raw
-   * @return a file status
-   * @throws IOException if any error occurs
-   */
-  private HdfsFileStatus createFileStatus(byte[] path, INode node,
-      boolean needLocation, byte storagePolicy, int snapshot,
-      boolean isRawPath, INodesInPath iip)
-      throws IOException {
-    if (needLocation) {
-      return createLocatedFileStatus(path, node, storagePolicy, snapshot,
-          isRawPath, iip);
-    } else {
-      return createFileStatus(path, node, storagePolicy, snapshot,
-          isRawPath, iip);
-    }
-  }
-
-  /**
-   * Create FileStatus by file INode 
-   */
-  HdfsFileStatus createFileStatus(byte[] path, INode node, byte storagePolicy,
-      int snapshot, boolean isRawPath, INodesInPath iip) throws IOException {
-     long size = 0;     // length is zero for directories
-     short replication = 0;
-     long blocksize = 0;
-     final boolean isEncrypted;
-
-     final FileEncryptionInfo feInfo = isRawPath ? null :
-         getFileEncryptionInfo(node, snapshot, iip);
-
-     if (node.isFile()) {
-       final INodeFile fileNode = node.asFile();
-       size = fileNode.computeFileSize(snapshot);
-       replication = fileNode.getFileReplication(snapshot);
-       blocksize = fileNode.getPreferredBlockSize();
-       isEncrypted = (feInfo != null) ||
-           (isRawPath && isInAnEZ(INodesInPath.fromINode(node)));
-     } else {
-       isEncrypted = isInAnEZ(INodesInPath.fromINode(node));
-     }
-
-     int childrenNum = node.isDirectory() ? 
-         node.asDirectory().getChildrenNum(snapshot) : 0;
-
-     return new HdfsFileStatus(
-        size, 
-        node.isDirectory(), 
-        replication, 
-        blocksize,
-        node.getModificationTime(snapshot),
-        node.getAccessTime(snapshot),
-        getPermissionForFileStatus(node, snapshot, isEncrypted),
-        node.getUserName(snapshot),
-        node.getGroupName(snapshot),
-        node.isSymlink() ? node.asSymlink().getSymlink() : null,
-        path,
-        node.getId(),
-        childrenNum,
-        feInfo,
-        storagePolicy);
-  }
-
-  /**
-   * Create FileStatus with location info by file INode
-   */
-  private HdfsLocatedFileStatus createLocatedFileStatus(byte[] path, INode node,
-      byte storagePolicy, int snapshot, boolean isRawPath,
-      INodesInPath iip) throws IOException {
-    assert hasReadLock();
-    long size = 0; // length is zero for directories
-    short replication = 0;
-    long blocksize = 0;
-    LocatedBlocks loc = null;
-    final boolean isEncrypted;
-    final FileEncryptionInfo feInfo = isRawPath ? null :
-        getFileEncryptionInfo(node, snapshot, iip);
-    if (node.isFile()) {
-      final INodeFile fileNode = node.asFile();
-      size = fileNode.computeFileSize(snapshot);
-      replication = fileNode.getFileReplication(snapshot);
-      blocksize = fileNode.getPreferredBlockSize();
-
-      final boolean inSnapshot = snapshot != Snapshot.CURRENT_STATE_ID; 
-      final boolean isUc = !inSnapshot && fileNode.isUnderConstruction();
-      final long fileSize = !inSnapshot && isUc ? 
-          fileNode.computeFileSizeNotIncludingLastUcBlock() : size;
-
-      loc = getFSNamesystem().getBlockManager().createLocatedBlocks(
-          fileNode.getBlocks(), fileSize, isUc, 0L, size, false,
-          inSnapshot, feInfo);
-      if (loc == null) {
-        loc = new LocatedBlocks();
-      }
-      isEncrypted = (feInfo != null) ||
-          (isRawPath && isInAnEZ(INodesInPath.fromINode(node)));
-    } else {
-      isEncrypted = isInAnEZ(INodesInPath.fromINode(node));
-    }
-    int childrenNum = node.isDirectory() ? 
-        node.asDirectory().getChildrenNum(snapshot) : 0;
-
-    HdfsLocatedFileStatus status =
-        new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
-          blocksize, node.getModificationTime(snapshot),
-          node.getAccessTime(snapshot),
-          getPermissionForFileStatus(node, snapshot, isEncrypted),
-          node.getUserName(snapshot), node.getGroupName(snapshot),
-          node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
-          node.getId(), loc, childrenNum, feInfo, storagePolicy);
-    // Set caching information for the located blocks.
-    if (loc != null) {
-      CacheManager cacheManager = namesystem.getCacheManager();
-      for (LocatedBlock lb: loc.getLocatedBlocks()) {
-        cacheManager.setCachedLocations(lb);
-      }
-    }
-    return status;
-  }
-
-  /**
-   * Returns an inode's FsPermission for use in an outbound FileStatus.  If the
-   * inode has an ACL or is for an encrypted file/dir, then this method will
-   * return an FsPermissionExtension.
-   *
-   * @param node INode to check
-   * @param snapshot int snapshot ID
-   * @param isEncrypted boolean true if the file/dir is encrypted
-   * @return FsPermission from inode, with ACL bit on if the inode has an ACL
-   * and encrypted bit on if it represents an encrypted file/dir.
-   */
-  private static FsPermission getPermissionForFileStatus(INode node,
-      int snapshot, boolean isEncrypted) {
-    FsPermission perm = node.getFsPermission(snapshot);
-    boolean hasAcl = node.getAclFeature(snapshot) != null;
-    if (hasAcl || isEncrypted) {
-      perm = new FsPermissionExtension(perm, hasAcl, isEncrypted);
-    }
-    return perm;
-  }
-
-  /**
    * Add the specified path into the namespace.
    */
   INodeSymlink addSymlink(long id, String path, String target,
@@ -3322,6 +2995,7 @@ public class FSDirectory implements Closeable {
   HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink)
     throws IOException {
     return (namesystem.isAuditEnabled() && namesystem.isExternalInvocation())
-      ? getFileInfo(path, resolveSymlink, false, false) : null;
+      ? FSDirStatAndListingOp.getFileInfo(this, path, resolveSymlink, false,
+        false) : null;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af44ea8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 1a51b87..d57d9b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -372,8 +372,8 @@ public class FSEditLogLoader {
 
         // add the op into retry cache if necessary
         if (toAddRetryCache) {
-          HdfsFileStatus stat = fsNamesys.dir.createFileStatus(
-              HdfsFileStatus.EMPTY_NAME, newFile,
+          HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
+              fsNamesys.dir, HdfsFileStatus.EMPTY_NAME, newFile,
               BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
               false, iip);
           fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
@@ -393,7 +393,8 @@ public class FSEditLogLoader {
           
           // add the op into retry cache is necessary
           if (toAddRetryCache) {
-            HdfsFileStatus stat = fsNamesys.dir.createFileStatus(
+            HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
+                fsNamesys.dir,
                 HdfsFileStatus.EMPTY_NAME, newFile,
                 BlockStoragePolicySuite.ID_UNSPECIFIED,
                 Snapshot.CURRENT_STATE_ID, false, iip);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af44ea8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 390bc20..543f47a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -143,7 +143,6 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.FileStatus;
@@ -2474,7 +2473,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       toRemoveBlocks = startFileInternal(pc, src, permissions, holder, 
           clientMachine, create, overwrite, createParent, replication, 
           blockSize, isLazyPersist, suite, protocolVersion, edek, logRetryCache);
-      stat = dir.getFileInfo(src, false,
+      stat = FSDirStatAndListingOp.getFileInfo(dir, src, false,
           FSDirectory.isReservedRawName(srcArg), true);
     } catch (StandbyException se) {
       skipSync = true;
@@ -2923,8 +2922,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkNameNodeSafeMode("Cannot append to file" + src);
       src = dir.resolvePath(pc, src, pathComponents);
       lb = appendFileInternal(pc, src, holder, clientMachine, logRetryCache);
-      stat = dir.getFileInfo(src, false, FSDirectory.isReservedRawName(srcArg),
-          true);
+      stat = FSDirStatAndListingOp.getFileInfo(dir, src, false,
+          FSDirectory.isReservedRawName(srcArg), true);
     } catch (StandbyException se) {
       skipSync = true;
       throw se;
@@ -3921,7 +3920,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * Get the file info for a specific file.
    *
    * @param srcArg The string representation of the path to the file
-   * @param resolveLink whether to throw UnresolvedLinkException 
+   * @param resolveLink whether to throw UnresolvedLinkException
    *        if src refers to a symlink
    *
    * @throws AccessControlException if access is denied
@@ -3929,63 +3928,37 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    *
    * @return object containing information regarding the file
    *         or null if file not found
-   * @throws StandbyException 
+   * @throws StandbyException
    */
-  HdfsFileStatus getFileInfo(final String srcArg, boolean resolveLink)
-    throws AccessControlException, UnresolvedLinkException,
-           StandbyException, IOException {
-    String src = srcArg;
-    if (!DFSUtil.isValidName(src)) {
-      throw new InvalidPathException("Invalid file name: " + src);
-    }
-    HdfsFileStatus stat = null;
-    FSPermissionChecker pc = getPermissionChecker();
+  HdfsFileStatus getFileInfo(final String src, boolean resolveLink)
+    throws IOException {
     checkOperation(OperationCategory.READ);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    HdfsFileStatus stat = null;
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      src = dir.resolvePath(pc, src, pathComponents);
-      boolean isSuperUser = true;
-      if (isPermissionEnabled) {
-        checkPermission(pc, src, false, null, null, null, null, false,
-            resolveLink);
-        isSuperUser = pc.isSuperUser();
-      }
-      stat = dir.getFileInfo(src, resolveLink,
-          FSDirectory.isReservedRawName(srcArg), isSuperUser);
+      stat = FSDirStatAndListingOp.getFileInfo(dir, src, resolveLink);
     } catch (AccessControlException e) {
-      logAuditEvent(false, "getfileinfo", srcArg);
+      logAuditEvent(false, "getfileinfo", src);
       throw e;
     } finally {
       readUnlock();
     }
-    logAuditEvent(true, "getfileinfo", srcArg);
+    logAuditEvent(true, "getfileinfo", src);
     return stat;
   }
-  
+
   /**
    * Returns true if the file is closed
    */
-  boolean isFileClosed(final String srcArg)
-      throws AccessControlException, UnresolvedLinkException,
-      StandbyException, IOException {
-    String src = srcArg;
-    FSPermissionChecker pc = getPermissionChecker();  
+  boolean isFileClosed(final String src) throws IOException {
     checkOperation(OperationCategory.READ);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     readLock();
     try {
-      src = dir.resolvePath(pc, src, pathComponents);
       checkOperation(OperationCategory.READ);
-      if (isPermissionEnabled) {
-        checkTraverse(pc, src);
-      }
-      return !INodeFile.valueOf(dir.getINode(src), src).isUnderConstruction();
+      return FSDirStatAndListingOp.isFileClosed(dir, src);
     } catch (AccessControlException e) {
-      if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(false, "isFileClosed", srcArg);
-      }
+      logAuditEvent(false, "isFileClosed", src);
       throw e;
     } finally {
       readUnlock();
@@ -4182,7 +4155,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   /**
    * Get the content summary for a specific file/dir.
    *
-   * @param srcArg The string representation of the path to the file
+   * @param src The string representation of the path to the file
    *
    * @throws AccessControlException if access is denied
    * @throws UnresolvedLinkException if a symlink is encountered.
@@ -4193,27 +4166,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * @return object containing information regarding the file
    *         or null if file not found
    */
-  ContentSummary getContentSummary(final String srcArg) throws IOException {
-    String src = srcArg;
-    FSPermissionChecker pc = getPermissionChecker();
-    checkOperation(OperationCategory.READ);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+  ContentSummary getContentSummary(final String src) throws IOException {
     readLock();
     boolean success = true;
     try {
-      checkOperation(OperationCategory.READ);
-      src = dir.resolvePath(pc, src, pathComponents);
-      if (isPermissionEnabled) {
-        checkPermission(pc, src, false, null, null, null, FsAction.READ_EXECUTE);
-      }
-      return dir.getContentSummary(src);
-
+      return FSDirStatAndListingOp.getContentSummary(dir, src);
     } catch (AccessControlException ace) {
       success = false;
       throw ace;
     } finally {
       readUnlock();
-      logAuditEvent(success, "contentSummary", srcArg);
+      logAuditEvent(success, "contentSummary", src);
     }
   }
 
@@ -4722,58 +4685,21 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   DirectoryListing getListing(String src, byte[] startAfter,
       boolean needLocation) 
-      throws AccessControlException, UnresolvedLinkException, IOException {
+      throws IOException {
+    checkOperation(OperationCategory.READ);
+    DirectoryListing dl = null;
+    readLock();
     try {
-      return getListingInt(src, startAfter, needLocation);
+      checkOperation(NameNode.OperationCategory.READ);
+      dl = FSDirStatAndListingOp.getListingInt(dir, src, startAfter,
+          needLocation);
     } catch (AccessControlException e) {
       logAuditEvent(false, "listStatus", src);
       throw e;
-    }
-  }
-
-  private DirectoryListing getListingInt(final String srcArg, byte[] startAfter,
-      boolean needLocation)
-    throws AccessControlException, UnresolvedLinkException, IOException {
-    String src = srcArg;
-    DirectoryListing dl;
-    FSPermissionChecker pc = getPermissionChecker();
-    checkOperation(OperationCategory.READ);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    String startAfterString = new String(startAfter);
-    readLock();
-    try {
-      checkOperation(OperationCategory.READ);
-      src = dir.resolvePath(pc, src, pathComponents);
-
-      // Get file name when startAfter is an INodePath
-      if (FSDirectory.isReservedName(startAfterString)) {
-        byte[][] startAfterComponents = FSDirectory
-            .getPathComponentsForReservedPath(startAfterString);
-        try {
-          String tmp = FSDirectory.resolvePath(src, startAfterComponents, dir);
-          byte[][] regularPath = INode.getPathComponents(tmp);
-          startAfter = regularPath[regularPath.length - 1];
-        } catch (IOException e) {
-          // Possibly the inode is deleted
-          throw new DirectoryListingStartAfterNotFoundException(
-              "Can't find startAfter " + startAfterString);
-        }
-      }
-
-      boolean isSuperUser = true;
-      if (isPermissionEnabled) {
-        if (dir.isDir(src)) {
-          checkPathAccess(pc, src, FsAction.READ_EXECUTE);
-        } else {
-          checkTraverse(pc, src);
-        }
-        isSuperUser = pc.isSuperUser();
-      }
-      logAuditEvent(true, "listStatus", srcArg);
-      dl = dir.getListing(src, startAfter, needLocation, isSuperUser);
     } finally {
       readUnlock();
     }
+    logAuditEvent(true, "listStatus", src);
     return dl;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af44ea8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index c32ed67..1a42e28 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -70,7 +70,8 @@ public class NameNodeAdapter {
   public static HdfsFileStatus getFileInfo(NameNode namenode, String src,
       boolean resolveLink) throws AccessControlException, UnresolvedLinkException,
         StandbyException, IOException {
-    return namenode.getNamesystem().getFileInfo(src, resolveLink);
+    return FSDirStatAndListingOp.getFileInfo(namenode.getNamesystem()
+            .getFSDirectory(), src, resolveLink);
   }
   
   public static boolean mkdirs(NameNode namenode, String src,