You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/12/15 19:35:58 UTC

[07/50] [abbrv] hadoop git commit: HDFS-7498. Simplify the logic in INodesInPath. Contributed by Jing Zhao.

HDFS-7498. Simplify the logic in INodesInPath. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5776a41d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5776a41d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5776a41d

Branch: refs/heads/YARN-2139
Commit: 5776a41da08af653206bb94d7c76c9c4dcce059a
Parents: d08fc9a
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Dec 9 11:37:39 2014 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Dec 9 11:37:39 2014 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/fs/Path.java    |   1 -
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../java/org/apache/hadoop/hdfs/DFSUtil.java    |  15 +-
 .../server/namenode/EncryptionZoneManager.java  |  10 +-
 .../hdfs/server/namenode/FSDirConcatOp.java     |   5 +-
 .../hdfs/server/namenode/FSDirMkdirOp.java      |  42 ++--
 .../hdfs/server/namenode/FSDirRenameOp.java     |  51 ++--
 .../hdfs/server/namenode/FSDirSnapshotOp.java   |   2 +-
 .../server/namenode/FSDirStatAndListingOp.java  |  10 +-
 .../hdfs/server/namenode/FSDirectory.java       |  95 ++++----
 .../hdfs/server/namenode/FSEditLogLoader.java   |   4 +-
 .../hdfs/server/namenode/FSNamesystem.java      |   8 +-
 .../server/namenode/FSPermissionChecker.java    |  51 ++--
 .../hdfs/server/namenode/INodesInPath.java      | 240 +++++++++----------
 .../server/namenode/TestSnapshotPathINodes.java | 134 +++++------
 15 files changed, 320 insertions(+), 350 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5776a41d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Path.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Path.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Path.java
index 54ddeda..caeb7a1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Path.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Path.java
@@ -60,7 +60,6 @@ public class Path implements Comparable {
 
   /**
    * Pathnames with scheme and relative path are illegal.
-   * @param path to be checked
    */
   void checkNotSchemeWithRelative() {
     if (toUri().isAbsolute() && !isUriPathAbsolute()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5776a41d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 626d90a..9398429 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -447,6 +447,8 @@ Release 2.7.0 - UNRELEASED
     HDFS-7486. Consolidate XAttr-related implementation into a single class.
     (wheat9)
 
+    HDFS-7498. Simplify the logic in INodesInPath. (jing9)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5776a41d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index f1bfcb4..8b3f512 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -341,15 +341,20 @@ public class DFSUtil {
   /**
    * Given a list of path components returns a path as a UTF8 String
    */
-  public static String byteArray2PathString(byte[][] pathComponents) {
+  public static String byteArray2PathString(byte[][] pathComponents,
+      int offset, int length) {
     if (pathComponents.length == 0) {
       return "";
-    } else if (pathComponents.length == 1
+    }
+    Preconditions.checkArgument(offset >= 0 && offset < pathComponents.length);
+    Preconditions.checkArgument(length >= 0 && offset + length <=
+        pathComponents.length);
+    if (pathComponents.length == 1
         && (pathComponents[0] == null || pathComponents[0].length == 0)) {
       return Path.SEPARATOR;
     }
     StringBuilder result = new StringBuilder();
-    for (int i = 0; i < pathComponents.length; i++) {
+    for (int i = offset; i < offset + length; i++) {
       result.append(new String(pathComponents[i], Charsets.UTF_8));
       if (i < pathComponents.length - 1) {
         result.append(Path.SEPARATOR_CHAR);
@@ -358,6 +363,10 @@ public class DFSUtil {
     return result.toString();
   }
 
+  public static String byteArray2PathString(byte[][] pathComponents) {
+    return byteArray2PathString(pathComponents, 0, pathComponents.length);
+  }
+
   /**
    * Converts a list of path components into a path using Path.SEPARATOR.
    * 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5776a41d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
index faab1f0..5c4f39d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
@@ -199,9 +199,9 @@ public class EncryptionZoneManager {
   private EncryptionZoneInt getEncryptionZoneForPath(INodesInPath iip) {
     assert dir.hasReadLock();
     Preconditions.checkNotNull(iip);
-    final INode[] inodes = iip.getINodes();
-    for (int i = inodes.length - 1; i >= 0; i--) {
-      final INode inode = inodes[i];
+    List<INode> inodes = iip.getReadOnlyINodes();
+    for (int i = inodes.size() - 1; i >= 0; i--) {
+      final INode inode = inodes.get(i);
       if (inode != null) {
         final EncryptionZoneInt ezi = encryptionZones.get(inode.getId());
         if (ezi != null) {
@@ -259,9 +259,7 @@ public class EncryptionZoneManager {
       }
     }
 
-    if (srcInEZ || dstInEZ) {
-      Preconditions.checkState(srcEZI != null, "couldn't find src EZ?");
-      Preconditions.checkState(dstEZI != null, "couldn't find dst EZ?");
+    if (srcInEZ) {
       if (srcEZI != dstEZI) {
         final String srcEZPath = getFullPathName(srcEZI);
         final String dstEZPath = getFullPathName(dstEZI);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5776a41d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
index c2e0f08..f7e57be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
@@ -187,9 +187,8 @@ class FSDirConcatOp {
     // do the move
 
     final INodesInPath trgIIP = fsd.getINodesInPath4Write(target, true);
-    final INode[] trgINodes = trgIIP.getINodes();
     final INodeFile trgInode = trgIIP.getLastINode().asFile();
-    INodeDirectory trgParent = trgINodes[trgINodes.length-2].asDirectory();
+    INodeDirectory trgParent = trgIIP.getINode(-2).asDirectory();
     final int trgLatestSnapshot = trgIIP.getLatestSnapshotId();
 
     final INodeFile [] allSrcInodes = new INodeFile[srcs.length];
@@ -229,6 +228,6 @@ class FSDirConcatOp {
     trgInode.setModificationTime(timestamp, trgLatestSnapshot);
     trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
     // update quota on the parent directory ('count' files removed, 0 space)
-    FSDirectory.unprotectedUpdateCount(trgIIP, trgINodes.length - 1, -count, 0);
+    FSDirectory.unprotectedUpdateCount(trgIIP, trgIIP.length() - 1, -count, 0);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5776a41d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
index af9e925..c8c5cb2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
@@ -85,12 +85,11 @@ class FSDirMkdirOp {
       throws QuotaExceededException, UnresolvedLinkException, AclException {
     assert fsd.hasWriteLock();
     byte[][] components = INode.getPathComponents(src);
-    INodesInPath iip = fsd.getExistingPathINodes(components);
-    INode[] inodes = iip.getINodes();
-    final int pos = inodes.length - 1;
-    unprotectedMkdir(fsd, inodeId, iip, pos, components[pos], permissions,
-        aclEntries, timestamp);
-    return inodes[pos];
+    final INodesInPath iip = fsd.getExistingPathINodes(components);
+    final int pos = iip.length() - 1;
+    final INodesInPath newiip = unprotectedMkdir(fsd, inodeId, iip, pos,
+        components[pos], permissions, aclEntries, timestamp);
+    return newiip.getINode(pos);
   }
 
   /**
@@ -129,17 +128,17 @@ class FSDirMkdirOp {
         throw new SnapshotAccessControlException(
                 "Modification on RO snapshot is disallowed");
       }
-      INode[] inodes = iip.getINodes();
-
+      final int length = iip.length();
       // find the index of the first null in inodes[]
       StringBuilder pathbuilder = new StringBuilder();
       int i = 1;
-      for(; i < inodes.length && inodes[i] != null; i++) {
+      INode curNode;
+      for(; i < length && (curNode = iip.getINode(i)) != null; i++) {
         pathbuilder.append(Path.SEPARATOR).append(names[i]);
-        if (!inodes[i].isDirectory()) {
+        if (!curNode.isDirectory()) {
           throw new FileAlreadyExistsException(
                   "Parent path is not a directory: "
-                  + pathbuilder + " "+inodes[i].getLocalName());
+                  + pathbuilder + " " + curNode.getLocalName());
         }
       }
 
@@ -152,8 +151,8 @@ class FSDirMkdirOp {
         // if inheriting (ie. creating a file or symlink), use the parent dir,
         // else the supplied permissions
         // NOTE: the permissions of the auto-created directories violate posix
-        FsPermission parentFsPerm = inheritPermission
-                ? inodes[i-1].getFsPermission() : permissions.getPermission();
+        FsPermission parentFsPerm = inheritPermission ?
+            iip.getINode(i-1).getFsPermission() : permissions.getPermission();
 
         // ensure that the permissions allow user write+execute
         if (!parentFsPerm.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
@@ -176,11 +175,12 @@ class FSDirMkdirOp {
       }
 
       // create directories beginning from the first null index
-      for(; i < inodes.length; i++) {
+      for(; i < length; i++) {
         pathbuilder.append(Path.SEPARATOR).append(names[i]);
-        unprotectedMkdir(fsd, fsd.allocateNewInodeId(), iip, i, components[i],
-            (i < lastInodeIndex) ? parentPermissions : permissions, null, now);
-        if (inodes[i] == null) {
+        iip = unprotectedMkdir(fsd, fsd.allocateNewInodeId(), iip, i,
+            components[i], (i < lastInodeIndex) ? parentPermissions :
+                permissions, null, now);
+        if (iip.getINode(i) == null) {
           return false;
         }
         // Directory creation also count towards FilesCreated
@@ -188,7 +188,7 @@ class FSDirMkdirOp {
         NameNode.getNameNodeMetrics().incrFilesCreated();
 
         final String cur = pathbuilder.toString();
-        fsd.getEditLog().logMkDir(cur, inodes[i]);
+        fsd.getEditLog().logMkDir(cur, iip.getINode(i));
         if(NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug(
                   "mkdirs: created directory " + cur);
@@ -219,7 +219,7 @@ class FSDirMkdirOp {
    * The parent path to the directory is at [0, pos-1].
    * All ancestors exist. Newly created one stored at index pos.
    */
-  private static void unprotectedMkdir(
+  private static INodesInPath unprotectedMkdir(
       FSDirectory fsd, long inodeId, INodesInPath inodesInPath, int pos,
       byte[] name, PermissionStatus permission, List<AclEntry> aclEntries,
       long timestamp)
@@ -231,7 +231,9 @@ class FSDirMkdirOp {
       if (aclEntries != null) {
         AclStorage.updateINodeAcl(dir, aclEntries, Snapshot.CURRENT_STATE_ID);
       }
-      inodesInPath.setINode(pos, dir);
+      return INodesInPath.replace(inodesInPath, pos, dir);
+    } else {
+      return inodesInPath;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5776a41d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
index 9f3983a..511de7a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.protocol.FSLimitException;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
@@ -42,6 +41,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
+import static org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
 import static org.apache.hadoop.util.Time.now;
 
 class FSDirRenameOp {
@@ -77,44 +78,40 @@ class FSDirRenameOp {
    * Verify quota for rename operation where srcInodes[srcInodes.length-1] moves
    * dstInodes[dstInodes.length-1]
    */
-  static void verifyQuotaForRename(FSDirectory fsd,
-      INode[] src, INode[] dst)
-      throws QuotaExceededException {
+  private static void verifyQuotaForRename(FSDirectory fsd, INodesInPath src,
+      INodesInPath dst) throws QuotaExceededException {
     if (!fsd.getFSNamesystem().isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
       // Do not check quota if edits log is still being processed
       return;
     }
     int i = 0;
-    while(src[i] == dst[i]) { i++; }
+    while(src.getINode(i) == dst.getINode(i)) { i++; }
     // src[i - 1] is the last common ancestor.
 
-    final Quota.Counts delta = src[src.length - 1].computeQuotaUsage();
+    final Quota.Counts delta = src.getLastINode().computeQuotaUsage();
 
     // Reduce the required quota by dst that is being removed
-    final int dstIndex = dst.length - 1;
-    if (dst[dstIndex] != null) {
-      delta.subtract(dst[dstIndex].computeQuotaUsage());
+    final INode dstINode = dst.getLastINode();
+    if (dstINode != null) {
+      delta.subtract(dstINode.computeQuotaUsage());
     }
-    FSDirectory.verifyQuota(dst, dstIndex, delta.get(Quota.NAMESPACE),
-        delta.get(Quota.DISKSPACE), src[i - 1]);
+    FSDirectory.verifyQuota(dst, dst.length() - 1, delta.get(Quota.NAMESPACE),
+        delta.get(Quota.DISKSPACE), src.getINode(i - 1));
   }
 
   /**
    * Checks file system limits (max component length and max directory items)
    * during a rename operation.
    */
-  static void verifyFsLimitsForRename(FSDirectory fsd,
-      INodesInPath srcIIP,
+  static void verifyFsLimitsForRename(FSDirectory fsd, INodesInPath srcIIP,
       INodesInPath dstIIP)
-      throws FSLimitException.PathComponentTooLongException,
-          FSLimitException.MaxDirectoryItemsExceededException {
+      throws PathComponentTooLongException, MaxDirectoryItemsExceededException {
     byte[] dstChildName = dstIIP.getLastLocalName();
-    INode[] dstInodes = dstIIP.getINodes();
-    int pos = dstInodes.length - 1;
-    fsd.verifyMaxComponentLength(dstChildName, dstInodes, pos);
+    final String parentPath = dstIIP.getParentPath();
+    fsd.verifyMaxComponentLength(dstChildName, parentPath);
     // Do not enforce max directory items if renaming within same directory.
     if (srcIIP.getINode(-2) != dstIIP.getINode(-2)) {
-      fsd.verifyMaxDirItems(dstInodes, pos);
+      fsd.verifyMaxDirItems(dstIIP.getINode(-2).asDirectory(), parentPath);
     }
   }
 
@@ -176,7 +173,7 @@ class FSDirRenameOp {
     fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src);
     // Ensure dst has quota to accommodate rename
     verifyFsLimitsForRename(fsd, srcIIP, dstIIP);
-    verifyQuotaForRename(fsd, srcIIP.getINodes(), dstIIP.getINodes());
+    verifyQuotaForRename(fsd, srcIIP, dstIIP);
 
     RenameOperation tx = new RenameOperation(fsd, src, dst, srcIIP, dstIIP);
 
@@ -184,7 +181,7 @@ class FSDirRenameOp {
 
     try {
       // remove src
-      final long removedSrc = fsd.removeLastINode(srcIIP);
+      final long removedSrc = fsd.removeLastINode(tx.srcIIP);
       if (removedSrc == -1) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             + "failed to rename " + src + " to " + dst + " because the source" +
@@ -326,7 +323,7 @@ class FSDirRenameOp {
     validateDestination(src, dst, srcInode);
 
     INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
-    if (dstIIP.getINodes().length == 1) {
+    if (dstIIP.length() == 1) {
       error = "rename destination cannot be the root";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
           error);
@@ -357,12 +354,12 @@ class FSDirRenameOp {
 
     // Ensure dst has quota to accommodate rename
     verifyFsLimitsForRename(fsd, srcIIP, dstIIP);
-    verifyQuotaForRename(fsd, srcIIP.getINodes(), dstIIP.getINodes());
+    verifyQuotaForRename(fsd, srcIIP, dstIIP);
 
     RenameOperation tx = new RenameOperation(fsd, src, dst, srcIIP, dstIIP);
 
     boolean undoRemoveSrc = true;
-    final long removedSrc = fsd.removeLastINode(srcIIP);
+    final long removedSrc = fsd.removeLastINode(tx.srcIIP);
     if (removedSrc == -1) {
       error = "Failed to rename " + src + " to " + dst +
           " because the source can not be removed";
@@ -594,7 +591,7 @@ class FSDirRenameOp {
           + error);
       throw new FileNotFoundException(error);
     }
-    if (srcIIP.getINodes().length == 1) {
+    if (srcIIP.length() == 1) {
       error = "rename source cannot be the root";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           + error);
@@ -624,7 +621,6 @@ class FSDirRenameOp {
                     INodesInPath srcIIP, INodesInPath dstIIP)
         throws QuotaExceededException {
       this.fsd = fsd;
-      this.srcIIP = srcIIP;
       this.dstIIP = dstIIP;
       this.src = src;
       this.dst = dst;
@@ -652,7 +648,7 @@ class FSDirRenameOp {
                 srcChild, srcIIP.getLatestSnapshotId());
         withCount = (INodeReference.WithCount) withName.getReferredINode();
         srcChild = withName;
-        srcIIP.setLastINode(srcChild);
+        srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1, srcChild);
         // get the counts before rename
         withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
       } else if (srcChildIsReference) {
@@ -662,6 +658,7 @@ class FSDirRenameOp {
       } else {
         withCount = null;
       }
+      this.srcIIP = srcIIP;
     }
 
     boolean addSourceToDestination() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5776a41d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
index f295e06..ea7dc24 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
@@ -45,7 +45,7 @@ class FSDirSnapshotOp {
     }
     final byte[] bytes = DFSUtil.string2Bytes(snapshotName);
     fsd.verifyINodeName(bytes);
-    fsd.verifyMaxComponentLength(bytes, path, 0);
+    fsd.verifyMaxComponentLength(bytes, path);
   }
 
   /** Allow snapshot on a directory. */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5776a41d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index a8c3c16..2e7ed6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -122,9 +122,7 @@ class FSDirStatAndListingOp {
     if (fsd.isPermissionEnabled()) {
       fsd.checkTraverse(pc, iip);
     }
-    INode[] inodes = iip.getINodes();
-    return !INodeFile.valueOf(inodes[inodes.length - 1],
-        src).isUnderConstruction();
+    return !INodeFile.valueOf(iip.getLastINode(), src).isUnderConstruction();
   }
 
   static ContentSummary getContentSummary(
@@ -167,9 +165,8 @@ class FSDirStatAndListingOp {
         return getSnapshotsListing(fsd, srcs, startAfter);
       }
       final INodesInPath inodesInPath = fsd.getINodesInPath(srcs, true);
-      final INode[] inodes = inodesInPath.getINodes();
       final int snapshot = inodesInPath.getPathSnapshotId();
-      final INode targetNode = inodes[inodes.length - 1];
+      final INode targetNode = inodesInPath.getLastINode();
       if (targetNode == null)
         return null;
       byte parentStoragePolicy = isSuperUser ?
@@ -278,8 +275,7 @@ class FSDirStatAndListingOp {
         return getFileInfo4DotSnapshot(fsd, srcs);
       }
       final INodesInPath inodesInPath = fsd.getINodesInPath(srcs, resolveLink);
-      final INode[] inodes = inodesInPath.getINodes();
-      final INode i = inodes[inodes.length - 1];
+      final INode i = inodesInPath.getLastINode();
       byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ?
           i.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED;
       return i == null ? null : createFileStatus(fsd,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5776a41d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index e802627..81b0eb6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -642,8 +642,7 @@ public class FSDirectory implements Closeable {
    * @param path the file path
    * @return the block size of the file. 
    */
-  long getPreferredBlockSize(String path) throws UnresolvedLinkException,
-      FileNotFoundException, IOException {
+  long getPreferredBlockSize(String path) throws IOException {
     readLock();
     try {
       return INodeFile.valueOf(getNode(path, false), path
@@ -740,15 +739,13 @@ public class FSDirectory implements Closeable {
   
   private static boolean deleteAllowed(final INodesInPath iip,
       final String src) {
-    final INode[] inodes = iip.getINodes(); 
-    if (inodes == null || inodes.length == 0
-        || inodes[inodes.length - 1] == null) {
+    if (iip.length() < 1 || iip.getLastINode() == null) {
       if(NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
             + "failed to remove " + src + " because it does not exist");
       }
       return false;
-    } else if (inodes.length == 1) { // src is the root
+    } else if (iip.length() == 1) { // src is the root
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: "
           + "failed to remove " + src
           + " because the root is not allowed to be deleted");
@@ -763,8 +760,7 @@ public class FSDirectory implements Closeable {
   boolean isNonEmptyDirectory(INodesInPath inodesInPath) {
     readLock();
     try {
-      final INode[] inodes = inodesInPath.getINodes();
-      final INode inode = inodes[inodes.length - 1];
+      final INode inode = inodesInPath.getLastINode();
       if (inode == null || !inode.isDirectory()) {
         //not found or not a directory
         return false;
@@ -991,7 +987,7 @@ public class FSDirectory implements Closeable {
   
   private void updateCount(INodesInPath iip, long nsDelta, long dsDelta,
       boolean checkQuota) throws QuotaExceededException {
-    updateCount(iip, iip.getINodes().length - 1, nsDelta, dsDelta, checkQuota);
+    updateCount(iip, iip.length() - 1, nsDelta, dsDelta, checkQuota);
   }
 
   /** update count of each inode with quota
@@ -1011,12 +1007,11 @@ public class FSDirectory implements Closeable {
       //still initializing. do not check or update quotas.
       return;
     }
-    final INode[] inodes = iip.getINodes();
-    if (numOfINodes > inodes.length) {
-      numOfINodes = inodes.length;
+    if (numOfINodes > iip.length()) {
+      numOfINodes = iip.length();
     }
     if (checkQuota && !skipQuotaCheck) {
-      verifyQuota(inodes, numOfINodes, nsDelta, dsDelta, null);
+      verifyQuota(iip, numOfINodes, nsDelta, dsDelta, null);
     }
     unprotectedUpdateCount(iip, numOfINodes, nsDelta, dsDelta);
   }
@@ -1039,11 +1034,11 @@ public class FSDirectory implements Closeable {
    * updates quota without verification
    * callers responsibility is to make sure quota is not exceeded
    */
-  static void unprotectedUpdateCount(INodesInPath inodesInPath, int numOfINodes, long nsDelta, long dsDelta) {
-    final INode[] inodes = inodesInPath.getINodes();
+  static void unprotectedUpdateCount(INodesInPath inodesInPath,
+      int numOfINodes, long nsDelta, long dsDelta) {
     for(int i=0; i < numOfINodes; i++) {
-      if (inodes[i].isQuotaSet()) { // a directory with quota
-        inodes[i].asDirectory().getDirectoryWithQuotaFeature()
+      if (inodesInPath.getINode(i).isQuotaSet()) { // a directory with quota
+        inodesInPath.getINode(i).asDirectory().getDirectoryWithQuotaFeature()
             .addSpaceConsumed2Cache(nsDelta, dsDelta);
       }
     }
@@ -1105,14 +1100,15 @@ public class FSDirectory implements Closeable {
    * @param src The full path name of the child node.
    * @throws QuotaExceededException is thrown if it violates quota limit
    */
-  private boolean addINode(String src, INode child
-      ) throws QuotaExceededException, UnresolvedLinkException {
+  private boolean addINode(String src, INode child)
+      throws QuotaExceededException, UnresolvedLinkException {
     byte[][] components = INode.getPathComponents(src);
     child.setLocalName(components[components.length-1]);
     cacheName(child);
     writeLock();
     try {
-      return addLastINode(getExistingPathINodes(components), child, true);
+      final INodesInPath iip = getExistingPathINodes(components);
+      return addLastINode(iip, child, true);
     } finally {
       writeUnlock();
     }
@@ -1122,7 +1118,7 @@ public class FSDirectory implements Closeable {
    * Verify quota for adding or moving a new INode with required 
    * namespace and diskspace to a given position.
    *  
-   * @param inodes INodes corresponding to a path
+   * @param iip INodes corresponding to a path
    * @param pos position where a new INode will be added
    * @param nsDelta needed namespace
    * @param dsDelta needed diskspace
@@ -1131,7 +1127,7 @@ public class FSDirectory implements Closeable {
    *          Pass null if a node is not being moved.
    * @throws QuotaExceededException if quota limit is exceeded.
    */
-  static void verifyQuota(INode[] inodes, int pos, long nsDelta,
+  static void verifyQuota(INodesInPath iip, int pos, long nsDelta,
       long dsDelta, INode commonAncestor) throws QuotaExceededException {
     if (nsDelta <= 0 && dsDelta <= 0) {
       // if quota is being freed or not being consumed
@@ -1139,18 +1135,20 @@ public class FSDirectory implements Closeable {
     }
 
     // check existing components in the path
-    for(int i = (pos > inodes.length? inodes.length: pos) - 1; i >= 0; i--) {
-      if (commonAncestor == inodes[i]) {
+    for(int i = (pos > iip.length() ? iip.length(): pos) - 1; i >= 0; i--) {
+      if (commonAncestor == iip.getINode(i)) {
         // Stop checking for quota when common ancestor is reached
         return;
       }
       final DirectoryWithQuotaFeature q
-          = inodes[i].asDirectory().getDirectoryWithQuotaFeature();
+          = iip.getINode(i).asDirectory().getDirectoryWithQuotaFeature();
       if (q != null) { // a directory with quota
         try {
           q.verifyQuota(nsDelta, dsDelta);
         } catch (QuotaExceededException e) {
-          e.setPathName(getFullPathName(inodes, i));
+          List<INode> inodes = iip.getReadOnlyINodes();
+          final String path = getFullPathName(inodes.toArray(new INode[inodes.size()]), i);
+          e.setPathName(path);
           throw e;
         }
       }
@@ -1172,22 +1170,20 @@ public class FSDirectory implements Closeable {
    * Verify child's name for fs limit.
    *
    * @param childName byte[] containing new child name
-   * @param parentPath Object either INode[] or String containing parent path
-   * @param pos int position of new child in path
+   * @param parentPath String containing parent path
    * @throws PathComponentTooLongException child's name is too long.
    */
-  void verifyMaxComponentLength(byte[] childName, Object parentPath,
-      int pos) throws PathComponentTooLongException {
+  void verifyMaxComponentLength(byte[] childName, String parentPath)
+      throws PathComponentTooLongException {
     if (maxComponentLength == 0) {
       return;
     }
 
     final int length = childName.length;
     if (length > maxComponentLength) {
-      final String p = parentPath instanceof INode[]?
-          getFullPathName((INode[])parentPath, pos - 1): (String)parentPath;
       final PathComponentTooLongException e = new PathComponentTooLongException(
-          maxComponentLength, length, p, DFSUtil.bytes2String(childName));
+          maxComponentLength, length, parentPath,
+          DFSUtil.bytes2String(childName));
       if (namesystem.isImageLoaded()) {
         throw e;
       } else {
@@ -1200,20 +1196,16 @@ public class FSDirectory implements Closeable {
   /**
    * Verify children size for fs limit.
    *
-   * @param pathComponents INode[] containing full path of inodes to new child
-   * @param pos int position of new child in pathComponents
    * @throws MaxDirectoryItemsExceededException too many children.
    */
-  void verifyMaxDirItems(INode[] pathComponents, int pos)
+  void verifyMaxDirItems(INodeDirectory parent, String parentPath)
       throws MaxDirectoryItemsExceededException {
-
-    final INodeDirectory parent = pathComponents[pos-1].asDirectory();
     final int count = parent.getChildrenList(Snapshot.CURRENT_STATE_ID).size();
     if (count >= maxDirItems) {
       final MaxDirectoryItemsExceededException e
           = new MaxDirectoryItemsExceededException(maxDirItems, count);
       if (namesystem.isImageLoaded()) {
-        e.setPathName(getFullPathName(pathComponents, pos - 1));
+        e.setPathName(parentPath);
         throw e;
       } else {
         // Do not throw if edits log is still being processed
@@ -1227,9 +1219,9 @@ public class FSDirectory implements Closeable {
    * The same as {@link #addChild(INodesInPath, int, INode, boolean)}
    * with pos = length - 1.
    */
-  private boolean addLastINode(INodesInPath inodesInPath,
-      INode inode, boolean checkQuota) throws QuotaExceededException {
-    final int pos = inodesInPath.getINodes().length - 1;
+  private boolean addLastINode(INodesInPath inodesInPath, INode inode,
+      boolean checkQuota) throws QuotaExceededException {
+    final int pos = inodesInPath.length() - 1;
     return addChild(inodesInPath, pos, inode, checkQuota);
   }
 
@@ -1241,18 +1233,18 @@ public class FSDirectory implements Closeable {
    */
   boolean addChild(INodesInPath iip, int pos, INode child, boolean checkQuota)
       throws QuotaExceededException {
-    final INode[] inodes = iip.getINodes();
     // Disallow creation of /.reserved. This may be created when loading
     // editlog/fsimage during upgrade since /.reserved was a valid name in older
     // release. This may also be called when a user tries to create a file
     // or directory /.reserved.
-    if (pos == 1 && inodes[0] == rootDir && isReservedName(child)) {
+    if (pos == 1 && iip.getINode(0) == rootDir && isReservedName(child)) {
       throw new HadoopIllegalArgumentException(
           "File name \"" + child.getLocalName() + "\" is reserved and cannot "
               + "be created. If this is during upgrade change the name of the "
               + "existing file or directory to another name before upgrading "
               + "to the new release.");
     }
+    final INodeDirectory parent = iip.getINode(pos-1).asDirectory();
     // The filesystem limits are not really quotas, so this check may appear
     // odd. It's because a rename operation deletes the src, tries to add
     // to the dest, if that fails, re-adds the src from whence it came.
@@ -1260,8 +1252,9 @@ public class FSDirectory implements Closeable {
     // original location becase a quota violation would cause the the item
     // to go "poof".  The fs limits must be bypassed for the same reason.
     if (checkQuota) {
-      verifyMaxComponentLength(child.getLocalNameBytes(), inodes, pos);
-      verifyMaxDirItems(inodes, pos);
+      final String parentPath = iip.getPath(pos - 1);
+      verifyMaxComponentLength(child.getLocalNameBytes(), parentPath);
+      verifyMaxDirItems(parent, parentPath);
     }
     // always verify inode name
     verifyINodeName(child.getLocalNameBytes());
@@ -1270,7 +1263,6 @@ public class FSDirectory implements Closeable {
     updateCount(iip, pos,
         counts.get(Quota.NAMESPACE), counts.get(Quota.DISKSPACE), checkQuota);
     boolean isRename = (child.getParent() != null);
-    final INodeDirectory parent = inodes[pos-1].asDirectory();
     boolean added;
     try {
       added = parent.addChild(child, true, iip.getLatestSnapshotId());
@@ -1283,7 +1275,6 @@ public class FSDirectory implements Closeable {
       updateCountNoQuotaCheck(iip, pos,
           -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
     } else {
-      iip.setINode(pos - 1, child.getParent());
       if (!isRename) {
         AclStorage.copyINodeDefaultAcl(child);
       }
@@ -1320,7 +1311,7 @@ public class FSDirectory implements Closeable {
     
     if (!last.isInLatestSnapshot(latestSnapshot)) {
       final Quota.Counts counts = last.computeQuotaUsage();
-      updateCountNoQuotaCheck(iip, iip.getINodes().length - 1,
+      updateCountNoQuotaCheck(iip, iip.length() - 1,
           -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
 
       if (INodeReference.tryRemoveReference(last) > 0) {
@@ -1715,10 +1706,10 @@ public class FSDirectory implements Closeable {
 
   static INode resolveLastINode(String src, INodesInPath iip)
       throws FileNotFoundException {
-    INode[] inodes = iip.getINodes();
-    INode inode = inodes[inodes.length - 1];
-    if (inode == null)
+    INode inode = iip.getLastINode();
+    if (inode == null) {
       throw new FileNotFoundException("cannot find " + src);
+    }
     return inode;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5776a41d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index d12ae15..2721f85 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -343,9 +343,7 @@ public class FSEditLogLoader {
 
       // See if the file already exists (persistBlocks call)
       final INodesInPath iip = fsDir.getINodesInPath(path, true);
-      final INode[] inodes = iip.getINodes();
-      INodeFile oldFile = INodeFile.valueOf(
-          inodes[inodes.length - 1], path, true);
+      INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path, true);
       if (oldFile != null && addCloseOp.overwrite) {
         // This is OP_ADD with overwrite
         fsDir.unprotectedDelete(path, addCloseOp.mtime);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5776a41d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 2b530fa..30ac941 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1861,9 +1861,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           doAccessTime = false;
         }
 
-        final INode[] inodes = iip.getINodes();
-        final INodeFile inode = INodeFile.valueOf(
-            inodes[inodes.length - 1], src);
+        final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
         if (isPermissionEnabled) {
           checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
         }
@@ -8027,8 +8025,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.READ);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       final INodesInPath iip = dir.getINodesInPath(src, true);
-      INode[] inodes = iip.getINodes();
-      if (inodes[inodes.length - 1] == null) {
+      INode inode = iip.getLastINode();
+      if (inode == null) {
         throw new FileNotFoundException("Path not found");
       }
       if (isPermissionEnabled) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5776a41d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
index a0455dc..8de8c54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.Stack;
 
@@ -144,22 +145,25 @@ class FSPermissionChecker {
     // check if (parentAccess != null) && file exists, then check sb
     // If resolveLink, the check is performed on the link target.
     final int snapshotId = inodesInPath.getPathSnapshotId();
-    final INode[] inodes = inodesInPath.getINodes();
-    int ancestorIndex = inodes.length - 2;
-    for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
-        ancestorIndex--);
-    checkTraverse(inodes, ancestorIndex, snapshotId);
+    final int length = inodesInPath.length();
+    final INode last = length > 0 ? inodesInPath.getLastINode() : null;
+    final INode parent = length > 1 ? inodesInPath.getINode(-2) : null;
+
+    checkTraverse(inodesInPath, snapshotId);
 
-    final INode last = inodes[inodes.length - 1];
     if (parentAccess != null && parentAccess.implies(FsAction.WRITE)
-        && inodes.length > 1 && last != null) {
-      checkStickyBit(inodes[inodes.length - 2], last, snapshotId);
+        && length > 1 && last != null) {
+      checkStickyBit(parent, last, snapshotId);
     }
-    if (ancestorAccess != null && inodes.length > 1) {
-      check(inodes, ancestorIndex, snapshotId, ancestorAccess);
+    if (ancestorAccess != null && length > 1) {
+      List<INode> inodes = inodesInPath.getReadOnlyINodes();
+      INode ancestor = null;
+      for (int i = inodes.size() - 2; i >= 0 && (ancestor = inodes.get(i)) ==
+          null; i--);
+      check(ancestor, snapshotId, ancestorAccess);
     }
-    if (parentAccess != null && inodes.length > 1) {
-      check(inodes, inodes.length - 2, snapshotId, parentAccess);
+    if (parentAccess != null && length > 1 && parent != null) {
+      check(parent, snapshotId, parentAccess);
     }
     if (access != null) {
       check(last, snapshotId, access);
@@ -184,10 +188,15 @@ class FSPermissionChecker {
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkTraverse(INode[] inodes, int last, int snapshotId
-      ) throws AccessControlException {
-    for(int j = 0; j <= last; j++) {
-      check(inodes[j], snapshotId, FsAction.EXECUTE);
+  private void checkTraverse(INodesInPath iip, int snapshotId)
+      throws AccessControlException {
+    List<INode> inodes = iip.getReadOnlyINodes();
+    for (int i = 0; i < inodes.size() - 1; i++) {
+      INode inode = inodes.get(i);
+      if (inode == null) {
+        break;
+      }
+      check(inode, snapshotId, FsAction.EXECUTE);
     }
   }
 
@@ -215,14 +224,8 @@ class FSPermissionChecker {
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void check(INode[] inodes, int i, int snapshotId, FsAction access
-      ) throws AccessControlException {
-    check(i >= 0? inodes[i]: null, snapshotId, access);
-  }
-
-  /** Guarded by {@link FSNamesystem#readLock()} */
-  private void check(INode inode, int snapshotId, FsAction access
-      ) throws AccessControlException {
+  private void check(INode inode, int snapshotId, FsAction access)
+      throws AccessControlException {
     if (inode == null) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5776a41d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
index 58f5f3d..1501fce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
@@ -18,6 +18,9 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,6 +34,9 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
 import com.google.common.base.Preconditions;
 
+import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
+import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.ID_INTEGER_COMPARATOR;
+
 /**
  * Contains INodes information resolved from a given path.
  */
@@ -54,7 +60,6 @@ public class INodesInPath {
     }
     final byte[][] path = new byte[depth][];
     final INode[] inodes = new INode[depth];
-    final INodesInPath iip = new INodesInPath(path, depth);
     tmp = inode;
     index = depth;
     while (tmp != null) {
@@ -63,8 +68,7 @@ public class INodesInPath {
       inodes[index] = tmp;
       tmp = tmp.getParent();
     }
-    iip.setINodes(inodes);
-    return iip;
+    return new INodesInPath(inodes, path);
   }
 
   /**
@@ -134,30 +138,34 @@ public class INodesInPath {
    * @return the specified number of existing INodes in the path
    */
   static INodesInPath resolve(final INodeDirectory startingDir,
-      final byte[][] components, final int numOfINodes, 
+      final byte[][] components, final int numOfINodes,
       final boolean resolveLink) throws UnresolvedLinkException {
     Preconditions.checkArgument(startingDir.compareTo(components[0]) == 0);
 
     INode curNode = startingDir;
-    final INodesInPath existing = new INodesInPath(components, numOfINodes);
     int count = 0;
-    int index = numOfINodes - components.length;
-    if (index > 0) {
-      index = 0;
-    }
+    int index = numOfINodes <= components.length ?
+        numOfINodes - components.length : 0;
+    int inodeNum = 0;
+    int capacity = numOfINodes;
+    INode[] inodes = new INode[numOfINodes];
+    boolean isSnapshot = false;
+    int snapshotId = CURRENT_STATE_ID;
+
     while (count < components.length && curNode != null) {
       final boolean lastComp = (count == components.length - 1);      
       if (index >= 0) {
-        existing.addNode(curNode);
+        inodes[inodeNum++] = curNode;
       }
       final boolean isRef = curNode.isReference();
       final boolean isDir = curNode.isDirectory();
       final INodeDirectory dir = isDir? curNode.asDirectory(): null;  
       if (!isRef && isDir && dir.isWithSnapshot()) {
         //if the path is a non-snapshot path, update the latest snapshot.
-        if (!existing.isSnapshot()) {
-          existing.updateLatestSnapshotId(dir.getDirectoryWithSnapshotFeature()
-              .getLastSnapshotId());
+        if (!isSnapshot && shouldUpdateLatestId(
+            dir.getDirectoryWithSnapshotFeature().getLastSnapshotId(),
+            snapshotId)) {
+          snapshotId = dir.getDirectoryWithSnapshotFeature().getLastSnapshotId();
         }
       } else if (isRef && isDir && !lastComp) {
         // If the curNode is a reference node, need to check its dstSnapshot:
@@ -170,19 +178,18 @@ public class INodesInPath {
         // the latest snapshot if lastComp is true. In case of the operation is
         // a modification operation, we do a similar check in corresponding 
         // recordModification method.
-        if (!existing.isSnapshot()) {
+        if (!isSnapshot) {
           int dstSnapshotId = curNode.asReference().getDstSnapshotId();
-          int latest = existing.getLatestSnapshotId();
-          if (latest == Snapshot.CURRENT_STATE_ID || // no snapshot in dst tree of rename
-              (dstSnapshotId != Snapshot.CURRENT_STATE_ID && 
-                dstSnapshotId >= latest)) { // the above scenario 
-            int lastSnapshot = Snapshot.CURRENT_STATE_ID;
+          if (snapshotId == CURRENT_STATE_ID || // no snapshot in dst tree of rename
+              (dstSnapshotId != CURRENT_STATE_ID &&
+               dstSnapshotId >= snapshotId)) { // the above scenario
+            int lastSnapshot = CURRENT_STATE_ID;
             DirectoryWithSnapshotFeature sf;
             if (curNode.isDirectory() && 
                 (sf = curNode.asDirectory().getDirectoryWithSnapshotFeature()) != null) {
               lastSnapshot = sf.getLastSnapshotId();
             }
-            existing.setSnapshotId(lastSnapshot);
+            snapshotId = lastSnapshot;
           }
         }
       }
@@ -211,9 +218,9 @@ public class INodesInPath {
         // skip the ".snapshot" in components
         count++;
         index++;
-        existing.isSnapshot = true;
+        isSnapshot = true;
         if (index >= 0) { // decrease the capacity by 1 to account for .snapshot
-          existing.capacity--;
+          capacity--;
         }
         // check if ".snapshot" is the last element of components
         if (count == components.length - 1) {
@@ -222,65 +229,82 @@ public class INodesInPath {
         // Resolve snapshot root
         final Snapshot s = dir.getSnapshot(components[count + 1]);
         if (s == null) {
-          //snapshot not found
-          curNode = null;
+          curNode = null; // snapshot not found
         } else {
           curNode = s.getRoot();
-          existing.setSnapshotId(s.getId());
-        }
-        if (index >= -1) {
-          existing.snapshotRootIndex = existing.numNonNull;
+          snapshotId = s.getId();
         }
       } else {
         // normal case, and also for resolving file/dir under snapshot root
-        curNode = dir.getChild(childName, existing.getPathSnapshotId());
+        curNode = dir.getChild(childName,
+            isSnapshot ? snapshotId : CURRENT_STATE_ID);
       }
       count++;
       index++;
     }
-    return existing;
+    if (isSnapshot && capacity < numOfINodes &&
+        !isDotSnapshotDir(components[components.length - 1])) {
+      // for snapshot path shrink the inode array. however, for path ending with
+      // .snapshot, still keep last the null inode in the array
+      INode[] newNodes = new INode[capacity];
+      System.arraycopy(inodes, 0, newNodes, 0, capacity);
+      inodes = newNodes;
+    }
+    return new INodesInPath(inodes, components, isSnapshot, snapshotId);
+  }
+
+  private static boolean shouldUpdateLatestId(int sid, int snapshotId) {
+    return snapshotId == CURRENT_STATE_ID || (sid != CURRENT_STATE_ID &&
+        ID_INTEGER_COMPARATOR.compare(snapshotId, sid) < 0);
   }
 
-  private final byte[][] path;
-  /**
-   * Array with the specified number of INodes resolved for a given path.
-   */
-  private INode[] inodes;
   /**
-   * Indicate the number of non-null elements in {@link #inodes}
+   * Replace an inode of the given INodesInPath in the given position. We do a
+   * deep copy of the INode array.
+   * @param pos the position of the replacement
+   * @param inode the new inode
+   * @return a new INodesInPath instance
    */
-  private int numNonNull;
+  public static INodesInPath replace(INodesInPath iip, int pos, INode inode) {
+    Preconditions.checkArgument(iip.length() > 0 && pos > 0 // no for root
+        && pos < iip.length());
+    if (iip.getINode(pos) == null) {
+      Preconditions.checkState(iip.getINode(pos - 1) != null);
+    }
+    INode[] inodes = new INode[iip.inodes.length];
+    System.arraycopy(iip.inodes, 0, inodes, 0, inodes.length);
+    inodes[pos] = inode;
+    return new INodesInPath(inodes, iip.path, iip.isSnapshot, iip.snapshotId);
+  }
+
+  private final byte[][] path;
   /**
-   * The path for a snapshot file/dir contains the .snapshot thus makes the
-   * length of the path components larger the number of inodes. We use
-   * the capacity to control this special case.
+   * Array with the specified number of INodes resolved for a given path.
    */
-  private int capacity;
+  private final INode[] inodes;
   /**
    * true if this path corresponds to a snapshot
    */
-  private boolean isSnapshot;
-  /**
-   * index of the {@link Snapshot.Root} node in the inodes array,
-   * -1 for non-snapshot paths.
-   */
-  private int snapshotRootIndex;
+  private final boolean isSnapshot;
   /**
    * For snapshot paths, it is the id of the snapshot; or 
    * {@link Snapshot#CURRENT_STATE_ID} if the snapshot does not exist. For 
    * non-snapshot paths, it is the id of the latest snapshot found in the path;
    * or {@link Snapshot#CURRENT_STATE_ID} if no snapshot is found.
    */
-  private int snapshotId = Snapshot.CURRENT_STATE_ID; 
+  private final int snapshotId;
 
-  private INodesInPath(byte[][] path, int number) {
+  private INodesInPath(INode[] inodes, byte[][] path, boolean isSnapshot,
+      int snapshotId) {
+    Preconditions.checkArgument(inodes != null && path != null);
+    this.inodes = inodes;
     this.path = path;
-    assert (number >= 0);
-    inodes = new INode[number];
-    capacity = number;
-    numNonNull = 0;
-    isSnapshot = false;
-    snapshotRootIndex = -1;
+    this.isSnapshot = isSnapshot;
+    this.snapshotId = snapshotId;
+  }
+
+  private INodesInPath(INode[] inodes, byte[][] path) {
+    this(inodes, path, false, CURRENT_STATE_ID);
   }
 
   /**
@@ -296,49 +320,28 @@ public class INodesInPath {
    * For non-snapshot paths, return {@link Snapshot#CURRENT_STATE_ID}.
    */
   public int getPathSnapshotId() {
-    return isSnapshot ? snapshotId : Snapshot.CURRENT_STATE_ID;
-  }
-
-  private void setSnapshotId(int sid) {
-    snapshotId = sid;
+    return isSnapshot ? snapshotId : CURRENT_STATE_ID;
   }
-  
-  private void updateLatestSnapshotId(int sid) {
-    if (snapshotId == Snapshot.CURRENT_STATE_ID
-        || (sid != Snapshot.CURRENT_STATE_ID && Snapshot.ID_INTEGER_COMPARATOR
-            .compare(snapshotId, sid) < 0)) {
-      snapshotId = sid;
-    }
-  }
-
-  /**
-   * @return a new array of inodes excluding the null elements introduced by
-   * snapshot path elements. E.g., after resolving path "/dir/.snapshot",
-   * {@link #inodes} is {/, dir, null}, while the returned array only contains
-   * inodes of "/" and "dir". Note the length of the returned array is always
-   * equal to {@link #capacity}.
-   */
-  INode[] getINodes() {
-    if (capacity == inodes.length) {
-      return inodes;
-    }
 
-    INode[] newNodes = new INode[capacity];
-    System.arraycopy(inodes, 0, newNodes, 0, capacity);
-    return newNodes;
-  }
-  
   /**
    * @return the i-th inode if i >= 0;
    *         otherwise, i < 0, return the (length + i)-th inode.
    */
   public INode getINode(int i) {
-    return inodes[i >= 0? i: inodes.length + i];
+    if (inodes == null || inodes.length == 0) {
+      throw new NoSuchElementException("inodes is null or empty");
+    }
+    int index = i >= 0 ? i : inodes.length + i;
+    if (index < inodes.length && index >= 0) {
+      return inodes[index];
+    } else {
+      throw new NoSuchElementException("inodes.length == " + inodes.length);
+    }
   }
   
   /** @return the last inode. */
   public INode getLastINode() {
-    return inodes[inodes.length - 1];
+    return getINode(-1);
   }
 
   byte[] getLastLocalName() {
@@ -350,48 +353,29 @@ public class INodesInPath {
     return DFSUtil.byteArray2PathString(path);
   }
 
-  /**
-   * @return index of the {@link Snapshot.Root} node in the inodes array,
-   * -1 for non-snapshot paths.
-   */
-  int getSnapshotRootIndex() {
-    return this.snapshotRootIndex;
-  }
-  
-  /**
-   * @return isSnapshot true for a snapshot path
-   */
-  boolean isSnapshot() {
-    return this.isSnapshot;
-  }
-  
-  /**
-   * Add an INode at the end of the array
-   */
-  private void addNode(INode node) {
-    inodes[numNonNull++] = node;
+  public String getParentPath() {
+    return getPath(path.length - 1);
   }
 
-  private void setINodes(INode inodes[]) {
-    this.inodes = inodes;
-    this.numNonNull = this.inodes.length;
+  public String getPath(int pos) {
+    return DFSUtil.byteArray2PathString(path, 0, pos);
   }
-  
-  void setINode(int i, INode inode) {
-    inodes[i >= 0? i: inodes.length + i] = inode;
+
+  public int length() {
+    return inodes.length;
   }
-  
-  void setLastINode(INode last) {
-    inodes[inodes.length - 1] = last;
+
+  public List<INode> getReadOnlyINodes() {
+    return Collections.unmodifiableList(Arrays.asList(inodes));
   }
-  
+
   /**
-   * @return The number of non-null elements
+   * @return isSnapshot true for a snapshot path
    */
-  int getNumNonNull() {
-    return numNonNull;
+  boolean isSnapshot() {
+    return this.isSnapshot;
   }
-  
+
   private static String toString(INode inode) {
     return inode == null? null: inode.getLocalName();
   }
@@ -420,20 +404,16 @@ public class INodesInPath {
       }
       b.append("], length=").append(inodes.length);
     }
-    b.append("\n  numNonNull = ").append(numNonNull)
-     .append("\n  capacity   = ").append(capacity)
-     .append("\n  isSnapshot        = ").append(isSnapshot)
-     .append("\n  snapshotRootIndex = ").append(snapshotRootIndex)
+    b.append("\n  isSnapshot        = ").append(isSnapshot)
      .append("\n  snapshotId        = ").append(snapshotId);
     return b.toString();
   }
 
   void validate() {
-    // check parent up to snapshotRootIndex or numNonNull
-    final int n = snapshotRootIndex >= 0? snapshotRootIndex + 1: numNonNull;  
+    // check parent up to snapshotRootIndex if this is a snapshot path
     int i = 0;
     if (inodes[i] != null) {
-      for(i++; i < n && inodes[i] != null; i++) {
+      for(i++; i < inodes.length && inodes[i] != null; i++) {
         final INodeDirectory parent_i = inodes[i].getParent();
         final INodeDirectory parent_i_1 = inodes[i-1].getParent();
         if (parent_i != inodes[i-1] &&
@@ -447,8 +427,8 @@ public class INodesInPath {
         }
       }
     }
-    if (i != n) {
-      throw new AssertionError("i = " + i + " != " + n
+    if (i != inodes.length) {
+      throw new AssertionError("i = " + i + " != " + inodes.length
           + ", this=" + toString(false));
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5776a41d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java
index d1a2377..354bff1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.FileNotFoundException;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -104,19 +105,18 @@ public class TestSnapshotPathINodes {
     }
   }
   
-  static Snapshot getSnapshot(INodesInPath inodesInPath, String name) {
+  static Snapshot getSnapshot(INodesInPath inodesInPath, String name,
+      int index) {
     if (name == null) {
       return null;
     }
-    final int i = inodesInPath.getSnapshotRootIndex() - 1;
-    final INode inode = inodesInPath.getINodes()[i];
+    final INode inode = inodesInPath.getINode(index - 1);
     return inode.asDirectory().getSnapshot(DFSUtil.string2Bytes(name));
   }
 
   static void assertSnapshot(INodesInPath inodesInPath, boolean isSnapshot,
       final Snapshot snapshot, int index) {
     assertEquals(isSnapshot, inodesInPath.isSnapshot());
-    assertEquals(index, inodesInPath.getSnapshotRootIndex());
     assertEquals(Snapshot.getSnapshotId(isSnapshot ? snapshot : null),
         inodesInPath.getPathSnapshotId());
     if (!isSnapshot) {
@@ -124,7 +124,7 @@ public class TestSnapshotPathINodes {
           inodesInPath.getLatestSnapshotId());
     }
     if (isSnapshot && index >= 0) {
-      assertEquals(Snapshot.Root.class, inodesInPath.getINodes()[index].getClass());
+      assertEquals(Snapshot.Root.class, inodesInPath.getINode(index).getClass());
     }
   }
 
@@ -142,38 +142,35 @@ public class TestSnapshotPathINodes {
     String[] names = INode.getPathNames(file1.toString());
     byte[][] components = INode.getPathComponents(names);
     INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-    INode[] inodes = nodesInPath.getINodes();
     // The number of inodes should be equal to components.length
-    assertEquals(inodes.length, components.length);
+    assertEquals(nodesInPath.length(), components.length);
     // The returned nodesInPath should be non-snapshot
     assertSnapshot(nodesInPath, false, null, -1);
 
     // The last INode should be associated with file1
     assertTrue("file1=" + file1 + ", nodesInPath=" + nodesInPath,
-        inodes[components.length - 1] != null);
-    assertEquals(inodes[components.length - 1].getFullPathName(),
+        nodesInPath.getINode(components.length - 1) != null);
+    assertEquals(nodesInPath.getINode(components.length - 1).getFullPathName(),
         file1.toString());
-    assertEquals(inodes[components.length - 2].getFullPathName(),
+    assertEquals(nodesInPath.getINode(components.length - 2).getFullPathName(),
         sub1.toString());
-    assertEquals(inodes[components.length - 3].getFullPathName(),
+    assertEquals(nodesInPath.getINode(components.length - 3).getFullPathName(),
         dir.toString());
     
     // Call getExistingPathINodes and request only one INode. This is used
     // when identifying the INode for a given path.
     nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, 1, false);
-    inodes = nodesInPath.getINodes();
-    assertEquals(inodes.length, 1);
+    assertEquals(nodesInPath.length(), 1);
     assertSnapshot(nodesInPath, false, null, -1);
-    assertEquals(inodes[0].getFullPathName(), file1.toString());
+    assertEquals(nodesInPath.getINode(0).getFullPathName(), file1.toString());
     
     // Call getExistingPathINodes and request 2 INodes. This is usually used
     // when identifying the parent INode of a given path.
     nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, 2, false);
-    inodes = nodesInPath.getINodes();
-    assertEquals(inodes.length, 2);
+    assertEquals(nodesInPath.length(), 2);
     assertSnapshot(nodesInPath, false, null, -1);
-    assertEquals(inodes[1].getFullPathName(), file1.toString());
-    assertEquals(inodes[0].getFullPathName(), sub1.toString());
+    assertEquals(nodesInPath.getINode(1).getFullPathName(), file1.toString());
+    assertEquals(nodesInPath.getINode(0).getFullPathName(), sub1.toString());
   }
   
   /** 
@@ -191,53 +188,49 @@ public class TestSnapshotPathINodes {
     String[] names = INode.getPathNames(snapshotPath);
     byte[][] components = INode.getPathComponents(names);
     INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-    INode[] inodes = nodesInPath.getINodes();
     // Length of inodes should be (components.length - 1), since we will ignore
     // ".snapshot" 
-    assertEquals(inodes.length, components.length - 1);
+    assertEquals(nodesInPath.length(), components.length - 1);
     // SnapshotRootIndex should be 3: {root, Testsnapshot, sub1, s1, file1}
-    final Snapshot snapshot = getSnapshot(nodesInPath, "s1");
+    final Snapshot snapshot = getSnapshot(nodesInPath, "s1", 3);
     assertSnapshot(nodesInPath, true, snapshot, 3);
     // Check the INode for file1 (snapshot file)
-    INode snapshotFileNode = inodes[inodes.length - 1]; 
+    INode snapshotFileNode = nodesInPath.getLastINode();
     assertINodeFile(snapshotFileNode, file1);
     assertTrue(snapshotFileNode.getParent().isWithSnapshot());
     
     // Call getExistingPathINodes and request only one INode.
     nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, 1, false);
-    inodes = nodesInPath.getINodes();
-    assertEquals(inodes.length, 1);
+    assertEquals(nodesInPath.length(), 1);
     // The snapshotroot (s1) is not included in inodes. Thus the
     // snapshotRootIndex should be -1.
     assertSnapshot(nodesInPath, true, snapshot, -1);
     // Check the INode for file1 (snapshot file)
-    assertINodeFile(inodes[inodes.length - 1], file1);
+    assertINodeFile(nodesInPath.getLastINode(), file1);
     
     // Call getExistingPathINodes and request 2 INodes.
     nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, 2, false);
-    inodes = nodesInPath.getINodes();
-    assertEquals(inodes.length, 2);
+    assertEquals(nodesInPath.length(), 2);
     // There should be two INodes in inodes: s1 and snapshot of file1. Thus the
     // SnapshotRootIndex should be 0.
     assertSnapshot(nodesInPath, true, snapshot, 0);
-    assertINodeFile(inodes[inodes.length - 1], file1);
+    assertINodeFile(nodesInPath.getLastINode(), file1);
     
     // Resolve the path "/TestSnapshot/sub1/.snapshot"  
     String dotSnapshotPath = sub1.toString() + "/.snapshot";
     names = INode.getPathNames(dotSnapshotPath);
     components = INode.getPathComponents(names);
     nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-    inodes = nodesInPath.getINodes();
-    // The number of INodes returned should be components.length - 1 since we
-    // will ignore ".snapshot"
-    assertEquals(inodes.length, components.length - 1);
+    // The number of INodes returned should still be components.length
+    // since we put a null in the inode array for ".snapshot"
+    assertEquals(nodesInPath.length(), components.length);
 
     // No SnapshotRoot dir is included in the resolved inodes  
     assertSnapshot(nodesInPath, true, snapshot, -1);
-    // The last INode should be the INode for sub1
-    final INode last = inodes[inodes.length - 1];
-    assertEquals(last.getFullPathName(), sub1.toString());
-    assertFalse(last instanceof INodeFile);
+    // The last INode should be null, the last but 1 should be sub1
+    assertNull(nodesInPath.getLastINode());
+    assertEquals(nodesInPath.getINode(-2).getFullPathName(), sub1.toString());
+    assertTrue(nodesInPath.getINode(-2).isDirectory());
     
     String[] invalidPathComponent = {"invalidDir", "foo", ".snapshot", "bar"};
     Path invalidPath = new Path(invalidPathComponent[0]);
@@ -275,16 +268,15 @@ public class TestSnapshotPathINodes {
       String[] names = INode.getPathNames(snapshotPath);
       byte[][] components = INode.getPathComponents(names);
       INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-      INode[] inodes = nodesInPath.getINodes();
       // Length of inodes should be (components.length - 1), since we will ignore
       // ".snapshot" 
-      assertEquals(inodes.length, components.length - 1);
+      assertEquals(nodesInPath.length(), components.length - 1);
       // SnapshotRootIndex should be 3: {root, Testsnapshot, sub1, s2, file1}
-      snapshot = getSnapshot(nodesInPath, "s2");
+      snapshot = getSnapshot(nodesInPath, "s2", 3);
       assertSnapshot(nodesInPath, true, snapshot, 3);
   
       // Check the INode for file1 (snapshot file)
-      final INode inode = inodes[inodes.length - 1];
+      final INode inode = nodesInPath.getLastINode();
       assertEquals(file1.getName(), inode.getLocalName());
       assertTrue(inode.asFile().isWithSnapshot());
     }
@@ -293,25 +285,34 @@ public class TestSnapshotPathINodes {
     String[] names = INode.getPathNames(file1.toString());
     byte[][] components = INode.getPathComponents(names);
     INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-    INode[] inodes = nodesInPath.getINodes();
     // The length of inodes should be equal to components.length
-    assertEquals(inodes.length, components.length);
+    assertEquals(nodesInPath.length(), components.length);
     // The number of non-null elements should be components.length - 1 since
     // file1 has been deleted
-    assertEquals(nodesInPath.getNumNonNull(), components.length - 1);
+    assertEquals(getNumNonNull(nodesInPath), components.length - 1);
     // The returned nodesInPath should be non-snapshot
     assertSnapshot(nodesInPath, false, snapshot, -1);
     // The last INode should be null, and the one before should be associated
     // with sub1
-    assertNull(inodes[components.length - 1]);
-    assertEquals(inodes[components.length - 2].getFullPathName(),
+    assertNull(nodesInPath.getINode(components.length - 1));
+    assertEquals(nodesInPath.getINode(components.length - 2).getFullPathName(),
         sub1.toString());
-    assertEquals(inodes[components.length - 3].getFullPathName(),
+    assertEquals(nodesInPath.getINode(components.length - 3).getFullPathName(),
         dir.toString());
     hdfs.deleteSnapshot(sub1, "s2");
     hdfs.disallowSnapshot(sub1);
   }
 
+  private int getNumNonNull(INodesInPath iip) {
+    List<INode> inodes = iip.getReadOnlyINodes();
+    for (int i = inodes.size() - 1; i >= 0; i--) {
+      if (inodes.get(i) != null) {
+        return i+1;
+      }
+    }
+    return 0;
+  }
+
   /**
    * for snapshot file while adding a new file after snapshot.
    */
@@ -333,39 +334,37 @@ public class TestSnapshotPathINodes {
       String[] names = INode.getPathNames(snapshotPath);
       byte[][] components = INode.getPathComponents(names);
       INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-      INode[] inodes = nodesInPath.getINodes();
       // Length of inodes should be (components.length - 1), since we will ignore
       // ".snapshot" 
-      assertEquals(inodes.length, components.length - 1);
+      assertEquals(nodesInPath.length(), components.length - 1);
       // The number of non-null inodes should be components.length - 2, since
       // snapshot of file3 does not exist
-      assertEquals(nodesInPath.getNumNonNull(), components.length - 2);
-      s4 = getSnapshot(nodesInPath, "s4");
+      assertEquals(getNumNonNull(nodesInPath), components.length - 2);
+      s4 = getSnapshot(nodesInPath, "s4", 3);
 
       // SnapshotRootIndex should still be 3: {root, Testsnapshot, sub1, s4, null}
       assertSnapshot(nodesInPath, true, s4, 3);
   
       // Check the last INode in inodes, which should be null
-      assertNull(inodes[inodes.length - 1]);
+      assertNull(nodesInPath.getINode(nodesInPath.length() - 1));
     }
 
     // Check the inodes for /TestSnapshot/sub1/file3
     String[] names = INode.getPathNames(file3.toString());
     byte[][] components = INode.getPathComponents(names);
     INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-    INode[] inodes = nodesInPath.getINodes();
     // The number of inodes should be equal to components.length
-    assertEquals(inodes.length, components.length);
+    assertEquals(nodesInPath.length(), components.length);
 
     // The returned nodesInPath should be non-snapshot
     assertSnapshot(nodesInPath, false, s4, -1);
 
     // The last INode should be associated with file3
-    assertEquals(inodes[components.length - 1].getFullPathName(),
+    assertEquals(nodesInPath.getINode(components.length - 1).getFullPathName(),
         file3.toString());
-    assertEquals(inodes[components.length - 2].getFullPathName(),
+    assertEquals(nodesInPath.getINode(components.length - 2).getFullPathName(),
         sub1.toString());
-    assertEquals(inodes[components.length - 3].getFullPathName(),
+    assertEquals(nodesInPath.getINode(components.length - 3).getFullPathName(),
         dir.toString());
     hdfs.deleteSnapshot(sub1, "s4");
     hdfs.disallowSnapshot(sub1);
@@ -380,15 +379,15 @@ public class TestSnapshotPathINodes {
     String[] names = INode.getPathNames(file1.toString());
     byte[][] components = INode.getPathComponents(names);
     INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-    INode[] inodes = nodesInPath.getINodes();
     // The number of inodes should be equal to components.length
-    assertEquals(inodes.length, components.length);
+    assertEquals(nodesInPath.length(), components.length);
 
     // The last INode should be associated with file1
-    assertEquals(inodes[components.length - 1].getFullPathName(),
+    assertEquals(nodesInPath.getINode(components.length - 1).getFullPathName(),
         file1.toString());
     // record the modification time of the inode
-    final long modTime = inodes[inodes.length - 1].getModificationTime();
+    final long modTime = nodesInPath.getINode(nodesInPath.length() - 1)
+        .getModificationTime();
     
     // Create a snapshot for the dir, and check the inodes for the path
     // pointing to a snapshot file
@@ -403,14 +402,13 @@ public class TestSnapshotPathINodes {
     names = INode.getPathNames(snapshotPath);
     components = INode.getPathComponents(names);
     INodesInPath ssNodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-    INode[] ssInodes = ssNodesInPath.getINodes();
     // Length of ssInodes should be (components.length - 1), since we will
     // ignore ".snapshot" 
-    assertEquals(ssInodes.length, components.length - 1);
-    final Snapshot s3 = getSnapshot(ssNodesInPath, "s3");
+    assertEquals(ssNodesInPath.length(), components.length - 1);
+    final Snapshot s3 = getSnapshot(ssNodesInPath, "s3", 3);
     assertSnapshot(ssNodesInPath, true, s3, 3);
     // Check the INode for snapshot of file1
-    INode snapshotFileNode = ssInodes[ssInodes.length - 1]; 
+    INode snapshotFileNode = ssNodesInPath.getLastINode();
     assertEquals(snapshotFileNode.getLocalName(), file1.getName());
     assertTrue(snapshotFileNode.asFile().isWithSnapshot());
     // The modification time of the snapshot INode should be the same with the
@@ -423,14 +421,14 @@ public class TestSnapshotPathINodes {
     components = INode.getPathComponents(names);
     INodesInPath newNodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
     assertSnapshot(newNodesInPath, false, s3, -1);
-    INode[] newInodes = newNodesInPath.getINodes();
     // The number of inodes should be equal to components.length
-    assertEquals(newInodes.length, components.length);
+    assertEquals(newNodesInPath.length(), components.length);
     // The last INode should be associated with file1
     final int last = components.length - 1;
-    assertEquals(newInodes[last].getFullPathName(), file1.toString());
+    assertEquals(newNodesInPath.getINode(last).getFullPathName(),
+        file1.toString());
     // The modification time of the INode for file3 should have been changed
-    Assert.assertFalse(modTime == newInodes[last].getModificationTime());
+    Assert.assertFalse(modTime == newNodesInPath.getINode(last).getModificationTime());
     hdfs.deleteSnapshot(sub1, "s3");
     hdfs.disallowSnapshot(sub1);
   }