You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/12/13 00:13:56 UTC

[2/2] hadoop git commit: HDFS-7059. Avoid resolving path multiple times. Contributed by Jing Zhao.

HDFS-7059. Avoid resolving path multiple times. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c78e3a7c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c78e3a7c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c78e3a7c

Branch: refs/heads/trunk
Commit: c78e3a7cdd10c40454e9acb06986ba6d8573cb19
Parents: 7784b10
Author: Jing Zhao <ji...@apache.org>
Authored: Fri Dec 12 14:15:06 2014 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Fri Dec 12 15:13:35 2014 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../hadoop/hdfs/server/namenode/FSDirAclOp.java |  27 +--
 .../hdfs/server/namenode/FSDirMkdirOp.java      |  33 ++-
 .../hdfs/server/namenode/FSDirRenameOp.java     | 173 ++++++-------
 .../server/namenode/FSDirStatAndListingOp.java  |  34 +--
 .../hdfs/server/namenode/FSDirXAttrOp.java      |  11 +-
 .../hdfs/server/namenode/FSDirectory.java       | 243 +++++++------------
 .../hdfs/server/namenode/FSEditLogLoader.java   |  37 +--
 .../hdfs/server/namenode/FSImageFormat.java     |   6 +-
 .../hdfs/server/namenode/FSNamesystem.java      | 200 ++++++++-------
 .../hdfs/server/namenode/INodesInPath.java      | 137 +++++++----
 .../hdfs/server/namenode/LeaseManager.java      |   6 +-
 .../hdfs/server/namenode/FSAclBaseTest.java     |   6 +-
 .../hadoop/hdfs/server/namenode/TestFsck.java   |   4 +-
 .../hdfs/server/namenode/TestLeaseManager.java  |   2 +
 .../server/namenode/TestSnapshotPathINodes.java |  64 ++---
 .../snapshot/TestOpenFilesWithSnapshot.java     |   3 +-
 .../snapshot/TestSnapshotReplication.java       |  11 +-
 18 files changed, 473 insertions(+), 526 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c78e3a7c/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index d635400..eeedb0d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -453,6 +453,8 @@ Release 2.7.0 - UNRELEASED
 
     HDFS-7463. Simplify FSNamesystem#getBlockLocationsUpdateTimes. (wheat9)
 
+    HDFS-7509. Avoid resolving path multiple times. (jing9)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c78e3a7c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
index c2dee20..0d2b34c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
@@ -46,7 +46,7 @@ class FSDirAclOp {
       INodesInPath iip = fsd.getINodesInPath4Write(
           FSDirectory.normalizePath(src), true);
       fsd.checkOwner(pc, iip);
-      INode inode = FSDirectory.resolveLastINode(src, iip);
+      INode inode = FSDirectory.resolveLastINode(iip);
       int snapshotId = iip.getLatestSnapshotId();
       List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
       List<AclEntry> newAcl = AclTransformation.mergeAclEntries(
@@ -72,7 +72,7 @@ class FSDirAclOp {
       INodesInPath iip = fsd.getINodesInPath4Write(
           FSDirectory.normalizePath(src), true);
       fsd.checkOwner(pc, iip);
-      INode inode = FSDirectory.resolveLastINode(src, iip);
+      INode inode = FSDirectory.resolveLastINode(iip);
       int snapshotId = iip.getLatestSnapshotId();
       List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
       List<AclEntry> newAcl = AclTransformation.filterAclEntriesByAclSpec(
@@ -97,7 +97,7 @@ class FSDirAclOp {
       INodesInPath iip = fsd.getINodesInPath4Write(
           FSDirectory.normalizePath(src), true);
       fsd.checkOwner(pc, iip);
-      INode inode = FSDirectory.resolveLastINode(src, iip);
+      INode inode = FSDirectory.resolveLastINode(iip);
       int snapshotId = iip.getLatestSnapshotId();
       List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
       List<AclEntry> newAcl = AclTransformation.filterDefaultAclEntries(
@@ -121,7 +121,7 @@ class FSDirAclOp {
     try {
       INodesInPath iip = fsd.getINodesInPath4Write(src);
       fsd.checkOwner(pc, iip);
-      unprotectedRemoveAcl(fsd, src);
+      unprotectedRemoveAcl(fsd, iip);
     } finally {
       fsd.writeUnlock();
     }
@@ -168,7 +168,7 @@ class FSDirAclOp {
       if (fsd.isPermissionEnabled()) {
         fsd.checkTraverse(pc, iip);
       }
-      INode inode = FSDirectory.resolveLastINode(srcs, iip);
+      INode inode = FSDirectory.resolveLastINode(iip);
       int snapshotId = iip.getPathSnapshotId();
       List<AclEntry> acl = AclStorage.readINodeAcl(inode, snapshotId);
       FsPermission fsPermission = inode.getFsPermission(snapshotId);
@@ -185,16 +185,17 @@ class FSDirAclOp {
   static List<AclEntry> unprotectedSetAcl(
       FSDirectory fsd, String src, List<AclEntry> aclSpec)
       throws IOException {
+    assert fsd.hasWriteLock();
+    final INodesInPath iip = fsd.getINodesInPath4Write(
+        FSDirectory.normalizePath(src), true);
+
     // ACL removal is logged to edits as OP_SET_ACL with an empty list.
     if (aclSpec.isEmpty()) {
-      unprotectedRemoveAcl(fsd, src);
+      unprotectedRemoveAcl(fsd, iip);
       return AclFeature.EMPTY_ENTRY_LIST;
     }
 
-    assert fsd.hasWriteLock();
-    INodesInPath iip = fsd.getINodesInPath4Write(FSDirectory.normalizePath
-        (src), true);
-    INode inode = FSDirectory.resolveLastINode(src, iip);
+    INode inode = FSDirectory.resolveLastINode(iip);
     int snapshotId = iip.getLatestSnapshotId();
     List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
     List<AclEntry> newAcl = AclTransformation.replaceAclEntries(existingAcl,
@@ -212,12 +213,10 @@ class FSDirAclOp {
     }
   }
 
-  private static void unprotectedRemoveAcl(FSDirectory fsd, String src)
+  private static void unprotectedRemoveAcl(FSDirectory fsd, INodesInPath iip)
       throws IOException {
     assert fsd.hasWriteLock();
-    INodesInPath iip = fsd.getINodesInPath4Write(
-        FSDirectory.normalizePath(src), true);
-    INode inode = FSDirectory.resolveLastINode(src, iip);
+    INode inode = FSDirectory.resolveLastINode(iip);
     int snapshotId = iip.getLatestSnapshotId();
     AclFeature f = inode.getAclFeature();
     if (f == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c78e3a7c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
index c8c5cb2..7e62d2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
@@ -50,8 +50,7 @@ class FSDirMkdirOp {
       throw new InvalidPathException(src);
     }
     FSPermissionChecker pc = fsd.getPermissionChecker();
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath
-        (src);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     src = fsd.resolvePath(pc, src, pathComponents);
     INodesInPath iip = fsd.getINodesInPath4Write(src);
     if (fsd.isPermissionEnabled()) {
@@ -72,7 +71,7 @@ class FSDirMkdirOp {
       // create multiple inodes.
       fsn.checkFsObjectLimit();
 
-      if (!mkdirsRecursively(fsd, src, permissions, false, now())) {
+      if (mkdirsRecursively(fsd, iip, permissions, false, now()) == null) {
         throw new IOException("Failed to create directory: " + src);
       }
     }
@@ -97,33 +96,34 @@ class FSDirMkdirOp {
    * If ancestor directories do not exist, automatically create them.
 
    * @param fsd FSDirectory
-   * @param src string representation of the path to the directory
+   * @param iip the INodesInPath instance containing all the existing INodes
+   *            and null elements for non-existing components in the path
    * @param permissions the permission of the directory
    * @param inheritPermission
    *   if the permission of the directory should inherit from its parent or not.
    *   u+wx is implicitly added to the automatically created directories,
    *   and to the given directory if inheritPermission is true
    * @param now creation time
-   * @return true if the operation succeeds false otherwise
+   * @return non-null INodesInPath instance if operation succeeds
    * @throws QuotaExceededException if directory creation violates
    *                                any quota limit
    * @throws UnresolvedLinkException if a symlink is encountered in src.
    * @throws SnapshotAccessControlException if path is in RO snapshot
    */
-  static boolean mkdirsRecursively(
-      FSDirectory fsd, String src, PermissionStatus permissions,
-      boolean inheritPermission, long now)
+  static INodesInPath mkdirsRecursively(FSDirectory fsd, INodesInPath iip,
+      PermissionStatus permissions, boolean inheritPermission, long now)
       throws FileAlreadyExistsException, QuotaExceededException,
              UnresolvedLinkException, SnapshotAccessControlException,
              AclException {
-    src = FSDirectory.normalizePath(src);
-    String[] names = INode.getPathNames(src);
-    byte[][] components = INode.getPathComponents(names);
-    final int lastInodeIndex = components.length - 1;
+    final int lastInodeIndex = iip.length() - 1;
+    final byte[][] components = iip.getPathComponents();
+    final String[] names = new String[components.length];
+    for (int i = 0; i < components.length; i++) {
+      names[i] = DFSUtil.bytes2String(components[i]);
+    }
 
     fsd.writeLock();
     try {
-      INodesInPath iip = fsd.getExistingPathINodes(components);
       if (iip.isSnapshot()) {
         throw new SnapshotAccessControlException(
                 "Modification on RO snapshot is disallowed");
@@ -136,8 +136,7 @@ class FSDirMkdirOp {
       for(; i < length && (curNode = iip.getINode(i)) != null; i++) {
         pathbuilder.append(Path.SEPARATOR).append(names[i]);
         if (!curNode.isDirectory()) {
-          throw new FileAlreadyExistsException(
-                  "Parent path is not a directory: "
+          throw new FileAlreadyExistsException("Parent path is not a directory: "
                   + pathbuilder + " " + curNode.getLocalName());
         }
       }
@@ -181,7 +180,7 @@ class FSDirMkdirOp {
             components[i], (i < lastInodeIndex) ? parentPermissions :
                 permissions, null, now);
         if (iip.getINode(i) == null) {
-          return false;
+          return null;
         }
         // Directory creation also count towards FilesCreated
         // to match count of FilesDeleted metric.
@@ -197,7 +196,7 @@ class FSDirMkdirOp {
     } finally {
       fsd.writeUnlock();
     }
-    return true;
+    return iip;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c78e3a7c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
index c62c88e..e3020ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ChunkedArrayList;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.util.Time;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -43,9 +44,9 @@ import java.util.Map;
 
 import static org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
 import static org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
-import static org.apache.hadoop.util.Time.now;
 
 class FSDirRenameOp {
+  @Deprecated
   static RenameOldResult renameToInt(
       FSDirectory fsd, final String srcArg, final String dstArg,
       boolean logRetryCache)
@@ -67,7 +68,7 @@ class FSDirRenameOp {
     src = fsd.resolvePath(pc, src, srcComponents);
     dst = fsd.resolvePath(pc, dst, dstComponents);
     @SuppressWarnings("deprecation")
-    final boolean status = renameToInternal(fsd, pc, src, dst, logRetryCache);
+    final boolean status = renameTo(fsd, pc, src, dst, logRetryCache);
     if (status) {
       resultingStat = fsd.getAuditFileInfo(dst, false);
     }
@@ -116,6 +117,22 @@ class FSDirRenameOp {
   }
 
   /**
+   * <br>
+   * Note: This is to be used by {@link FSEditLogLoader} only.
+   * <br>
+   */
+  @Deprecated
+  static boolean unprotectedRenameTo(FSDirectory fsd, String src, String dst,
+      long timestamp) throws IOException {
+    if (fsd.isDir(dst)) {
+      dst += Path.SEPARATOR + new Path(src).getName();
+    }
+    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
+    final INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
+    return unprotectedRenameTo(fsd, src, dst, srcIIP, dstIIP, timestamp);
+  }
+
+  /**
    * Change a path name
    *
    * @param fsd FSDirectory
@@ -126,24 +143,19 @@ class FSDirRenameOp {
    * boolean, Options.Rename...)}
    */
   @Deprecated
-  static boolean unprotectedRenameTo(
-      FSDirectory fsd, String src, String dst, long timestamp)
+  static boolean unprotectedRenameTo(FSDirectory fsd, String src, String dst,
+      final INodesInPath srcIIP, final INodesInPath dstIIP, long timestamp)
       throws IOException {
     assert fsd.hasWriteLock();
-    INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
     final INode srcInode = srcIIP.getLastINode();
     try {
-      validateRenameSource(src, srcIIP);
+      validateRenameSource(srcIIP);
     } catch (SnapshotException e) {
       throw e;
     } catch (IOException ignored) {
       return false;
     }
 
-    if (fsd.isDir(dst)) {
-      dst += Path.SEPARATOR + new Path(src).getName();
-    }
-
     // validate the destination
     if (dst.equals(src)) {
       return true;
@@ -155,7 +167,6 @@ class FSDirRenameOp {
       return false;
     }
 
-    INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
     if (dstIIP.getLastINode() != null) {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
           "failed to rename " + src + " to " + dst + " because destination " +
@@ -234,8 +245,7 @@ class FSDirRenameOp {
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     src = fsd.resolvePath(pc, src, srcComponents);
     dst = fsd.resolvePath(pc, dst, dstComponents);
-    renameToInternal(fsd, pc, src, dst, logRetryCache, collectedBlocks,
-        options);
+    renameTo(fsd, pc, src, dst, collectedBlocks, logRetryCache, options);
     HdfsFileStatus resultingStat = fsd.getAuditFileInfo(dst, false);
 
     return new AbstractMap.SimpleImmutableEntry<BlocksMapUpdateInfo,
@@ -246,29 +256,44 @@ class FSDirRenameOp {
    * @see #unprotectedRenameTo(FSDirectory, String, String, long,
    * org.apache.hadoop.fs.Options.Rename...)
    */
-  static void renameTo(
-      FSDirectory fsd, String src, String dst, long mtime,
-      BlocksMapUpdateInfo collectedBlocks, Options.Rename... options)
-      throws IOException {
+  static void renameTo(FSDirectory fsd, FSPermissionChecker pc, String src,
+      String dst, BlocksMapUpdateInfo collectedBlocks, boolean logRetryCache,
+      Options.Rename... options) throws IOException {
+    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
+    final INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
+    if (fsd.isPermissionEnabled()) {
+      // Rename does not operate on link targets
+      // Do not resolveLink when checking permissions of src and dst
+      // Check write access to parent of src
+      fsd.checkPermission(pc, srcIIP, false, null, FsAction.WRITE, null, null,
+          false);
+      // Check write access to ancestor of dst
+      fsd.checkPermission(pc, dstIIP, false, FsAction.WRITE, null, null, null,
+          false);
+    }
+
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src + " to "
           + dst);
     }
+    final long mtime = Time.now();
     fsd.writeLock();
     try {
-      if (unprotectedRenameTo(fsd, src, dst, mtime, collectedBlocks, options)) {
+      if (unprotectedRenameTo(fsd, src, dst, srcIIP, dstIIP, mtime,
+          collectedBlocks, options)) {
         fsd.getFSNamesystem().incrDeletedFileCount(1);
       }
     } finally {
       fsd.writeUnlock();
     }
+    fsd.getEditLog().logRename(src, dst, mtime, logRetryCache, options);
   }
 
   /**
    * Rename src to dst.
    * <br>
    * Note: This is to be used by {@link org.apache.hadoop.hdfs.server
-   * .namenode.FSEditLog} only.
+   * .namenode.FSEditLogLoader} only.
    * <br>
    *
    * @param fsd       FSDirectory
@@ -282,7 +307,9 @@ class FSDirRenameOp {
       Options.Rename... options)
       throws IOException {
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-    boolean ret = unprotectedRenameTo(fsd, src, dst, timestamp,
+    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
+    final INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
+    boolean ret = unprotectedRenameTo(fsd, src, dst, srcIIP, dstIIP, timestamp,
         collectedBlocks, options);
     if (!collectedBlocks.getToDeleteList().isEmpty()) {
       fsd.getFSNamesystem().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
@@ -302,8 +329,8 @@ class FSDirRenameOp {
    * @param collectedBlocks blocks to be removed
    * @param options         Rename options
    */
-  static boolean unprotectedRenameTo(
-      FSDirectory fsd, String src, String dst, long timestamp,
+  static boolean unprotectedRenameTo(FSDirectory fsd, String src, String dst,
+      final INodesInPath srcIIP, final INodesInPath dstIIP, long timestamp,
       BlocksMapUpdateInfo collectedBlocks, Options.Rename... options)
       throws IOException {
     assert fsd.hasWriteLock();
@@ -311,9 +338,8 @@ class FSDirRenameOp {
         && Arrays.asList(options).contains(Options.Rename.OVERWRITE);
 
     final String error;
-    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
     final INode srcInode = srcIIP.getLastINode();
-    validateRenameSource(src, srcIIP);
+    validateRenameSource(srcIIP);
 
     // validate the destination
     if (dst.equals(src)) {
@@ -322,7 +348,6 @@ class FSDirRenameOp {
     }
     validateDestination(src, dst, srcInode);
 
-    INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
     if (dstIIP.length() == 1) {
       error = "rename destination cannot be the root";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
@@ -373,8 +398,8 @@ class FSDirRenameOp {
     long removedNum = 0;
     try {
       if (dstInode != null) { // dst exists remove it
-        if ((removedNum = fsd.removeLastINode(dstIIP)) != -1) {
-          removedDst = dstIIP.getLastINode();
+        if ((removedNum = fsd.removeLastINode(tx.dstIIP)) != -1) {
+          removedDst = tx.dstIIP.getLastINode();
           undoRemoveDst = true;
         }
       }
@@ -395,13 +420,13 @@ class FSDirRenameOp {
           undoRemoveDst = false;
           if (removedNum > 0) {
             List<INode> removedINodes = new ChunkedArrayList<INode>();
-            if (!removedDst.isInLatestSnapshot(dstIIP.getLatestSnapshotId())) {
+            if (!removedDst.isInLatestSnapshot(tx.dstIIP.getLatestSnapshotId())) {
               removedDst.destroyAndCollectBlocks(collectedBlocks,
                   removedINodes);
               filesDeleted = true;
             } else {
               filesDeleted = removedDst.cleanSubtree(
-                  Snapshot.CURRENT_STATE_ID, dstIIP.getLatestSnapshotId(),
+                  Snapshot.CURRENT_STATE_ID, tx.dstIIP.getLatestSnapshotId(),
                   collectedBlocks, removedINodes, true)
                   .get(Quota.NAMESPACE) >= 0;
             }
@@ -431,7 +456,7 @@ class FSDirRenameOp {
           dstParent.asDirectory().undoRename4DstParent(removedDst,
               dstIIP.getLatestSnapshotId());
         } else {
-          fsd.addLastINodeNoQuotaCheck(dstIIP, removedDst);
+          fsd.addLastINodeNoQuotaCheck(tx.dstIIP, removedDst);
         }
         if (removedDst.isReference()) {
           final INodeReference removedDstRef = removedDst.asReference();
@@ -447,59 +472,41 @@ class FSDirRenameOp {
   }
 
   /**
-   * @see #unprotectedRenameTo(FSDirectory, String, String, long)
    * @deprecated Use {@link #renameToInt(FSDirectory, String, String,
    * boolean, Options.Rename...)}
    */
   @Deprecated
   @SuppressWarnings("deprecation")
-  private static boolean renameTo(
-      FSDirectory fsd, String src, String dst, long mtime)
-      throws IOException {
+  private static boolean renameTo(FSDirectory fsd, FSPermissionChecker pc,
+      String src, String dst, boolean logRetryCache) throws IOException {
+    // Rename does not operate on link targets
+    // Do not resolveLink when checking permissions of src and dst
+    // Check write access to parent of src
+    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
+    // Note: We should not be doing this.  This is move() not renameTo().
+    final String actualDst = fsd.isDir(dst) ?
+        dst + Path.SEPARATOR + new Path(src).getName() : dst;
+    final INodesInPath dstIIP = fsd.getINodesInPath4Write(actualDst, false);
+    if (fsd.isPermissionEnabled()) {
+      fsd.checkPermission(pc, srcIIP, false, null, FsAction.WRITE, null, null,
+          false);
+      // Check write access to ancestor of dst
+      fsd.checkPermission(pc, dstIIP, false, FsAction.WRITE, null, null,
+          null, false);
+    }
+
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src + " to "
           + dst);
     }
+    final long mtime = Time.now();
     boolean stat = false;
     fsd.writeLock();
     try {
-      stat = unprotectedRenameTo(fsd, src, dst, mtime);
+      stat = unprotectedRenameTo(fsd, src, actualDst, srcIIP, dstIIP, mtime);
     } finally {
       fsd.writeUnlock();
     }
-    return stat;
-  }
-
-  /**
-   * @deprecated See {@link #renameTo(FSDirectory, String, String, long)}
-   */
-  @Deprecated
-  private static boolean renameToInternal(
-      FSDirectory fsd, FSPermissionChecker pc, String src, String dst,
-      boolean logRetryCache)
-      throws IOException {
-    if (fsd.isPermissionEnabled()) {
-      //We should not be doing this.  This is move() not renameTo().
-      //but for now,
-      //NOTE: yes, this is bad!  it's assuming much lower level behavior
-      //      of rewriting the dst
-      String actualdst = fsd.isDir(dst) ? dst + Path.SEPARATOR + new Path
-          (src).getName() : dst;
-      // Rename does not operates on link targets
-      // Do not resolveLink when checking permissions of src and dst
-      // Check write access to parent of src
-      INodesInPath srcIIP = fsd.getINodesInPath(src, false);
-      fsd.checkPermission(pc, srcIIP, false, null, FsAction.WRITE, null, null,
-          false);
-      INodesInPath dstIIP = fsd.getINodesInPath(actualdst, false);
-      // Check write access to ancestor of dst
-      fsd.checkPermission(pc, dstIIP, false, FsAction.WRITE, null, null,
-          null, false);
-    }
-
-    long mtime = now();
-    @SuppressWarnings("deprecation")
-    final boolean stat = renameTo(fsd, src, dst, mtime);
     if (stat) {
       fsd.getEditLog().logRename(src, dst, mtime, logRetryCache);
       return true;
@@ -507,29 +514,6 @@ class FSDirRenameOp {
     return false;
   }
 
-  private static void renameToInternal(
-      FSDirectory fsd, FSPermissionChecker pc, String src, String dst,
-      boolean logRetryCache, BlocksMapUpdateInfo collectedBlocks,
-      Options.Rename... options)
-      throws IOException {
-    if (fsd.isPermissionEnabled()) {
-      // Rename does not operates on link targets
-      // Do not resolveLink when checking permissions of src and dst
-      // Check write access to parent of src
-      INodesInPath srcIIP = fsd.getINodesInPath(src, false);
-      fsd.checkPermission(pc, srcIIP, false, null, FsAction.WRITE, null, null,
-          false);
-      // Check write access to ancestor of dst
-      INodesInPath dstIIP = fsd.getINodesInPath(dst, false);
-      fsd.checkPermission(pc, dstIIP, false, FsAction.WRITE, null, null, null,
-          false);
-    }
-
-    long mtime = now();
-    renameTo(fsd, src, dst, mtime, collectedBlocks, options);
-    fsd.getEditLog().logRename(src, dst, mtime, logRetryCache, options);
-  }
-
   private static void validateDestination(
       String src, String dst, INode srcInode)
       throws IOException {
@@ -579,13 +563,13 @@ class FSDirRenameOp {
     }
   }
 
-  private static void validateRenameSource(String src, INodesInPath srcIIP)
+  private static void validateRenameSource(INodesInPath srcIIP)
       throws IOException {
     String error;
     final INode srcInode = srcIIP.getLastINode();
     // validate source
     if (srcInode == null) {
-      error = "rename source " + src + " is not found.";
+      error = "rename source " + srcIIP.getPath() + " is not found.";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           + error);
       throw new FileNotFoundException(error);
@@ -625,8 +609,7 @@ class FSDirRenameOp {
       this.dst = dst;
       srcChild = srcIIP.getLastINode();
       srcChildName = srcChild.getLocalNameBytes();
-      isSrcInSnapshot = srcChild.isInLatestSnapshot(srcIIP
-          .getLatestSnapshotId());
+      isSrcInSnapshot = srcChild.isInLatestSnapshot(srcIIP.getLatestSnapshotId());
       srcChildIsReference = srcChild.isReference();
       srcParent = srcIIP.getINode(-2).asDirectory();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c78e3a7c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index 0f94171..5bc790e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -72,14 +72,14 @@ class FSDirStatAndListingOp {
 
     boolean isSuperUser = true;
     if (fsd.isPermissionEnabled()) {
-      if (fsd.isDir(src)) {
+      if (iip.getLastINode() != null && iip.getLastINode().isDirectory()) {
         fsd.checkPathAccess(pc, iip, FsAction.READ_EXECUTE);
       } else {
         fsd.checkTraverse(pc, iip);
       }
       isSuperUser = pc.isSuperUser();
     }
-    return getListing(fsd, src, startAfter, needLocation, isSuperUser);
+    return getListing(fsd, iip, src, startAfter, needLocation, isSuperUser);
   }
 
   /**
@@ -131,12 +131,12 @@ class FSDirStatAndListingOp {
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     FSPermissionChecker pc = fsd.getPermissionChecker();
     src = fsd.resolvePath(pc, src, pathComponents);
-    final INodesInPath iip = fsd.getINodesInPath(src, true);
+    final INodesInPath iip = fsd.getINodesInPath(src, false);
     if (fsd.isPermissionEnabled()) {
       fsd.checkPermission(pc, iip, false, null, null, null,
           FsAction.READ_EXECUTE);
     }
-    return getContentSummaryInt(fsd, src);
+    return getContentSummaryInt(fsd, iip);
   }
 
   /**
@@ -148,14 +148,15 @@ class FSDirStatAndListingOp {
    * that at least this.lsLimit block locations are in the response
    *
    * @param fsd FSDirectory
+   * @param iip the INodesInPath instance containing all the INodes along the
+   *            path
    * @param src the directory name
    * @param startAfter the name to start listing after
    * @param needLocation if block locations are returned
    * @return a partial listing starting after startAfter
    */
-  private static DirectoryListing getListing(
-      FSDirectory fsd, String src, byte[] startAfter, boolean needLocation,
-      boolean isSuperUser)
+  private static DirectoryListing getListing(FSDirectory fsd, INodesInPath iip,
+      String src, byte[] startAfter, boolean needLocation, boolean isSuperUser)
       throws IOException {
     String srcs = FSDirectory.normalizePath(src);
     final boolean isRawPath = FSDirectory.isReservedRawName(src);
@@ -165,9 +166,8 @@ class FSDirStatAndListingOp {
       if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
         return getSnapshotsListing(fsd, srcs, startAfter);
       }
-      final INodesInPath inodesInPath = fsd.getINodesInPath(srcs, true);
-      final int snapshot = inodesInPath.getPathSnapshotId();
-      final INode targetNode = inodesInPath.getLastINode();
+      final int snapshot = iip.getPathSnapshotId();
+      final INode targetNode = iip.getLastINode();
       if (targetNode == null)
         return null;
       byte parentStoragePolicy = isSuperUser ?
@@ -178,7 +178,7 @@ class FSDirStatAndListingOp {
         return new DirectoryListing(
             new HdfsFileStatus[]{createFileStatus(fsd,
                 HdfsFileStatus.EMPTY_NAME, targetNode, needLocation,
-                parentStoragePolicy, snapshot, isRawPath, inodesInPath)}, 0);
+                parentStoragePolicy, snapshot, isRawPath, iip)}, 0);
       }
 
       final INodeDirectory dirInode = targetNode.asDirectory();
@@ -196,7 +196,8 @@ class FSDirStatAndListingOp {
             cur.getLocalStoragePolicyID():
             BlockStoragePolicySuite.ID_UNSPECIFIED;
         listing[i] = createFileStatus(fsd, cur.getLocalNameBytes(), cur,
-            needLocation, fsd.getStoragePolicyID(curPolicy, parentStoragePolicy), snapshot, isRawPath, inodesInPath);
+            needLocation, fsd.getStoragePolicyID(curPolicy,
+                parentStoragePolicy), snapshot, isRawPath, iip);
         listingCnt++;
         if (needLocation) {
             // Once we  hit lsLimit locations, stop.
@@ -453,14 +454,13 @@ class FSDirStatAndListingOp {
     return perm;
   }
 
-  private static ContentSummary getContentSummaryInt(
-      FSDirectory fsd, String src) throws IOException {
-    String srcs = FSDirectory.normalizePath(src);
+  private static ContentSummary getContentSummaryInt(FSDirectory fsd,
+      INodesInPath iip) throws IOException {
     fsd.readLock();
     try {
-      INode targetNode = fsd.getNode(srcs, false);
+      INode targetNode = iip.getLastINode();
       if (targetNode == null) {
-        throw new FileNotFoundException("File does not exist: " + srcs);
+        throw new FileNotFoundException("File does not exist: " + iip.getPath());
       }
       else {
         // Make it relinquish locks everytime contentCountLimit entries are

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c78e3a7c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
index 303b9e3..47a995d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
@@ -191,7 +191,7 @@ class FSDirXAttrOp {
     assert fsd.hasWriteLock();
     INodesInPath iip = fsd.getINodesInPath4Write(
         FSDirectory.normalizePath(src), true);
-    INode inode = FSDirectory.resolveLastINode(src, iip);
+    INode inode = FSDirectory.resolveLastINode(iip);
     int snapshotId = iip.getLatestSnapshotId();
     List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
     List<XAttr> removedXAttrs = Lists.newArrayListWithCapacity(toRemove.size());
@@ -260,8 +260,9 @@ class FSDirXAttrOp {
       final EnumSet<XAttrSetFlag> flag)
       throws IOException {
     assert fsd.hasWriteLock();
-    INodesInPath iip = fsd.getINodesInPath4Write(FSDirectory.normalizePath(src), true);
-    INode inode = FSDirectory.resolveLastINode(src, iip);
+    INodesInPath iip = fsd.getINodesInPath4Write(FSDirectory.normalizePath(src),
+        true);
+    INode inode = FSDirectory.resolveLastINode(iip);
     int snapshotId = iip.getLatestSnapshotId();
     List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
     List<XAttr> newXAttrs = setINodeXAttrs(fsd, existingXAttrs, xAttrs, flag);
@@ -444,8 +445,8 @@ class FSDirXAttrOp {
     String srcs = FSDirectory.normalizePath(src);
     fsd.readLock();
     try {
-      INodesInPath iip = fsd.getLastINodeInPath(srcs, true);
-      INode inode = FSDirectory.resolveLastINode(src, iip);
+      INodesInPath iip = fsd.getINodesInPath(srcs, true);
+      INode inode = FSDirectory.resolveLastINode(iip);
       int snapshotId = iip.getPathSnapshotId();
       return XAttrStorage.readINodeXAttrs(inode, snapshotId);
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c78e3a7c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 81b0eb6..ee9bdd0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -335,8 +335,8 @@ public class FSDirectory implements Closeable {
 
   private static INodeFile newINodeFile(long id, PermissionStatus permissions,
       long mtime, long atime, short replication, long preferredBlockSize) {
-    return newINodeFile(id, permissions, mtime, atime, replication, preferredBlockSize,
-        (byte)0);
+    return newINodeFile(id, permissions, mtime, atime, replication,
+        preferredBlockSize, (byte)0);
   }
 
   private static INodeFile newINodeFile(long id, PermissionStatus permissions,
@@ -354,20 +354,21 @@ public class FSDirectory implements Closeable {
    * @throws UnresolvedLinkException
    * @throws SnapshotAccessControlException 
    */
-  INodeFile addFile(String path, PermissionStatus permissions,
+  INodeFile addFile(INodesInPath iip, String path, PermissionStatus permissions,
                     short replication, long preferredBlockSize,
                     String clientName, String clientMachine)
     throws FileAlreadyExistsException, QuotaExceededException,
       UnresolvedLinkException, SnapshotAccessControlException, AclException {
 
     long modTime = now();
-    INodeFile newNode = newINodeFile(allocateNewInodeId(), permissions, modTime, modTime, replication, preferredBlockSize);
+    INodeFile newNode = newINodeFile(allocateNewInodeId(), permissions, modTime,
+        modTime, replication, preferredBlockSize);
     newNode.toUnderConstruction(clientName, clientMachine);
 
     boolean added = false;
     writeLock();
     try {
-      added = addINode(path, newNode);
+      added = addINode(iip, newNode);
     } finally {
       writeUnlock();
     }
@@ -382,8 +383,8 @@ public class FSDirectory implements Closeable {
     return newNode;
   }
 
-  INodeFile unprotectedAddFile( long id,
-                            String path, 
+  INodeFile unprotectedAddFile(long id,
+                            INodesInPath iip,
                             PermissionStatus permissions,
                             List<AclEntry> aclEntries,
                             List<XAttr> xAttrs,
@@ -401,14 +402,13 @@ public class FSDirectory implements Closeable {
       newNode = newINodeFile(id, permissions, modificationTime,
           modificationTime, replication, preferredBlockSize, storagePolicyId);
       newNode.toUnderConstruction(clientName, clientMachine);
-
     } else {
       newNode = newINodeFile(id, permissions, modificationTime, atime,
           replication, preferredBlockSize, storagePolicyId);
     }
 
     try {
-      if (addINode(path, newNode)) {
+      if (addINode(iip, newNode)) {
         if (aclEntries != null) {
           AclStorage.updateINodeAcl(newNode, aclEntries,
             Snapshot.CURRENT_STATE_ID);
@@ -422,8 +422,8 @@ public class FSDirectory implements Closeable {
     } catch (IOException e) {
       if(NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug(
-            "DIR* FSDirectory.unprotectedAddFile: exception when add " + path
-                + " to the file system", e);
+            "DIR* FSDirectory.unprotectedAddFile: exception when add "
+                + iip.getPath() + " to the file system", e);
       }
     }
     return null;
@@ -468,18 +468,18 @@ public class FSDirectory implements Closeable {
    * Remove a block from the file.
    * @return Whether the block exists in the corresponding file
    */
-  boolean removeBlock(String path, INodeFile fileNode, Block block)
-      throws IOException {
+  boolean removeBlock(String path, INodesInPath iip, INodeFile fileNode,
+      Block block) throws IOException {
     Preconditions.checkArgument(fileNode.isUnderConstruction());
     writeLock();
     try {
-      return unprotectedRemoveBlock(path, fileNode, block);
+      return unprotectedRemoveBlock(path, iip, fileNode, block);
     } finally {
       writeUnlock();
     }
   }
   
-  boolean unprotectedRemoveBlock(String path,
+  boolean unprotectedRemoveBlock(String path, INodesInPath iip,
       INodeFile fileNode, Block block) throws IOException {
     // modify file-> block and blocksMap
     // fileNode should be under construction
@@ -496,7 +496,6 @@ public class FSDirectory implements Closeable {
     }
 
     // update space consumed
-    final INodesInPath iip = getINodesInPath4Write(path, true);
     updateCount(iip, 0, -fileNode.getBlockDiskspace(), true);
     return true;
   }
@@ -638,20 +637,6 @@ public class FSDirectory implements Closeable {
     XAttrStorage.updateINodeXAttrs(inode, newXAttrs, latestSnapshotId);
   }
 
-  /**
-   * @param path the file path
-   * @return the block size of the file. 
-   */
-  long getPreferredBlockSize(String path) throws IOException {
-    readLock();
-    try {
-      return INodeFile.valueOf(getNode(path, false), path
-          ).getPreferredBlockSize();
-    } finally {
-      readUnlock();
-    }
-  }
-
   void setPermission(String src, FsPermission permission)
       throws FileNotFoundException, UnresolvedLinkException,
       QuotaExceededException, SnapshotAccessControlException {
@@ -706,28 +691,26 @@ public class FSDirectory implements Closeable {
 
   /**
    * Delete the target directory and collect the blocks under it
-   * 
-   * @param src Path of a directory to delete
+   *
+   * @param iip the INodesInPath instance containing all the INodes for the path
    * @param collectedBlocks Blocks under the deleted directory
    * @param removedINodes INodes that should be removed from {@link #inodeMap}
    * @return the number of files that have been removed
    */
-  long delete(String src, BlocksMapUpdateInfo collectedBlocks,
+  long delete(INodesInPath iip, BlocksMapUpdateInfo collectedBlocks,
               List<INode> removedINodes, long mtime) throws IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + iip.getPath());
     }
     final long filesRemoved;
     writeLock();
     try {
-      final INodesInPath inodesInPath = getINodesInPath4Write(
-          normalizePath(src), false);
-      if (!deleteAllowed(inodesInPath, src) ) {
+      if (!deleteAllowed(iip, iip.getPath()) ) {
         filesRemoved = -1;
       } else {
         List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
-        FSDirSnapshotOp.checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
-        filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks,
+        FSDirSnapshotOp.checkSnapshot(iip.getLastINode(), snapshottableDirs);
+        filesRemoved = unprotectedDelete(iip, collectedBlocks,
             removedINodes, mtime);
         namesystem.removeSnapshottableDirs(snapshottableDirs);
       }
@@ -863,88 +846,15 @@ public class FSDirectory implements Closeable {
         parentPolicy;
   }
 
-  INode getINode4DotSnapshot(String src) throws UnresolvedLinkException {
-    Preconditions.checkArgument(
-        src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR),
-        "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
-    
-    final String dirPath = normalizePath(src.substring(0,
-        src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
-    
-    final INode node = this.getINode(dirPath);
-    if (node != null && node.isDirectory()
-        && node.asDirectory().isSnapshottable()) {
-      return node;
-    }
-    return null;
-  }
-
-  INodesInPath getExistingPathINodes(byte[][] components)
-      throws UnresolvedLinkException {
-    return INodesInPath.resolve(rootDir, components);
-  }
-
-  /**
-   * Get {@link INode} associated with the file / directory.
-   */
-  public INode getINode(String src) throws UnresolvedLinkException {
-    return getLastINodeInPath(src).getINode(0);
-  }
-
-  /**
-   * Get {@link INode} associated with the file / directory.
-   */
-  public INodesInPath getLastINodeInPath(String src)
-       throws UnresolvedLinkException {
-    readLock();
-    try {
-      return getLastINodeInPath(src, true);
-    } finally {
-      readUnlock();
-    }
-  }
-
-  /**
-   * Get {@link INode} associated with the file / directory.
-   */
-  public INodesInPath getINodesInPath4Write(String src
-      ) throws UnresolvedLinkException, SnapshotAccessControlException {
-    readLock();
-    try {
-      return getINodesInPath4Write(src, true);
-    } finally {
-      readUnlock();
-    }
-  }
-
-  /**
-   * Get {@link INode} associated with the file / directory.
-   * @throws SnapshotAccessControlException if path is in RO snapshot
-   */
-  public INode getINode4Write(String src) throws UnresolvedLinkException,
-      SnapshotAccessControlException {
-    readLock();
-    try {
-      return getINode4Write(src, true);
-    } finally {
-      readUnlock();
-    }
-  }
-
   /** 
    * Check whether the filepath could be created
    * @throws SnapshotAccessControlException if path is in RO snapshot
    */
-  boolean isValidToCreate(String src) throws UnresolvedLinkException,
-      SnapshotAccessControlException {
+  boolean isValidToCreate(String src, INodesInPath iip)
+      throws SnapshotAccessControlException {
     String srcs = normalizePath(src);
-    readLock();
-    try {
-      return srcs.startsWith("/") && !srcs.endsWith("/")
-              && getINode4Write(srcs, false) == null;
-    } finally {
-      readUnlock();
-    }
+    return srcs.startsWith("/") && !srcs.endsWith("/") &&
+        iip.getLastINode() == null;
   }
 
   /**
@@ -954,7 +864,7 @@ public class FSDirectory implements Closeable {
     src = normalizePath(src);
     readLock();
     try {
-      INode node = getNode(src, false);
+      INode node = getINode(src, false);
       return node != null && node.isDirectory();
     } finally {
       readUnlock();
@@ -963,21 +873,21 @@ public class FSDirectory implements Closeable {
 
   /** Updates namespace and diskspace consumed for all
    * directories until the parent directory of file represented by path.
-   * 
-   * @param path path for the file.
+   *
+   * @param iip the INodesInPath instance containing all the INodes for
+   *            updating quota usage
    * @param nsDelta the delta change of namespace
    * @param dsDelta the delta change of diskspace
    * @throws QuotaExceededException if the new count violates any quota limit
    * @throws FileNotFoundException if path does not exist.
    */
-  void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
+  void updateSpaceConsumed(INodesInPath iip, long nsDelta, long dsDelta)
       throws QuotaExceededException, FileNotFoundException,
           UnresolvedLinkException, SnapshotAccessControlException {
     writeLock();
     try {
-      final INodesInPath iip = getINodesInPath4Write(path, false);
       if (iip.getLastINode() == null) {
-        throw new FileNotFoundException("Path not found: " + path);
+        throw new FileNotFoundException("Path not found: " + iip.getPath());
       }
       updateCount(iip, nsDelta, dsDelta, true);
     } finally {
@@ -1097,17 +1007,15 @@ public class FSDirectory implements Closeable {
 
   /**
    * Add the given child to the namespace.
-   * @param src The full path name of the child node.
+   * @param iip the INodesInPath instance containing all the ancestral INodes
    * @throws QuotaExceededException is thrown if it violates quota limit
    */
-  private boolean addINode(String src, INode child)
+  private boolean addINode(INodesInPath iip, INode child)
       throws QuotaExceededException, UnresolvedLinkException {
-    byte[][] components = INode.getPathComponents(src);
-    child.setLocalName(components[components.length-1]);
+    child.setLocalName(iip.getLastLocalName());
     cacheName(child);
     writeLock();
     try {
-      final INodesInPath iip = getExistingPathINodes(components);
       return addLastINode(iip, child, true);
     } finally {
       writeUnlock();
@@ -1504,7 +1412,7 @@ public class FSDirectory implements Closeable {
   boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force) 
       throws UnresolvedLinkException, QuotaExceededException {
     assert hasWriteLock();
-    final INodesInPath i = getLastINodeInPath(src); 
+    final INodesInPath i = getINodesInPath(src, true);
     return unprotectedSetTimes(i.getLastINode(), mtime, atime, force,
         i.getLatestSnapshotId());
   }
@@ -1551,24 +1459,24 @@ public class FSDirectory implements Closeable {
   /**
    * Add the specified path into the namespace.
    */
-  INodeSymlink addSymlink(long id, String path, String target,
+  INodeSymlink addSymlink(INodesInPath iip, long id, String target,
                           long mtime, long atime, PermissionStatus perm)
           throws UnresolvedLinkException, QuotaExceededException {
     writeLock();
     try {
-      return unprotectedAddSymlink(id, path, target, mtime, atime, perm);
+      return unprotectedAddSymlink(iip, id, target, mtime, atime, perm);
     } finally {
       writeUnlock();
     }
   }
 
-  INodeSymlink unprotectedAddSymlink(long id, String path, String target,
+  INodeSymlink unprotectedAddSymlink(INodesInPath iip, long id, String target,
       long mtime, long atime, PermissionStatus perm)
       throws UnresolvedLinkException, QuotaExceededException {
     assert hasWriteLock();
     final INodeSymlink symlink = new INodeSymlink(id, null, perm, mtime, atime,
         target);
-    return addINode(path, symlink) ? symlink : null;
+    return addINode(iip, symlink) ? symlink : null;
   }
 
   boolean isInAnEZ(INodesInPath iip)
@@ -1704,11 +1612,10 @@ public class FSDirectory implements Closeable {
     }
   }
 
-  static INode resolveLastINode(String src, INodesInPath iip)
-      throws FileNotFoundException {
+  static INode resolveLastINode(INodesInPath iip) throws FileNotFoundException {
     INode inode = iip.getLastINode();
     if (inode == null) {
-      throw new FileNotFoundException("cannot find " + src);
+      throw new FileNotFoundException("cannot find " + iip.getPath());
     }
     return inode;
   }
@@ -1885,36 +1792,62 @@ public class FSDirectory implements Closeable {
     return path.toString();
   }
 
-  /** @return the {@link INodesInPath} containing only the last inode. */
-  INodesInPath getLastINodeInPath(
-      String path, boolean resolveLink) throws UnresolvedLinkException {
-    return INodesInPath.resolve(rootDir, INode.getPathComponents(path), 1,
-            resolveLink);
+  INode getINode4DotSnapshot(String src) throws UnresolvedLinkException {
+    Preconditions.checkArgument(
+        src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR),
+        "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
+
+    final String dirPath = normalizePath(src.substring(0,
+        src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
+
+    final INode node = this.getINode(dirPath);
+    if (node != null && node.isDirectory()
+        && node.asDirectory().isSnapshottable()) {
+      return node;
+    }
+    return null;
+  }
+
+  INodesInPath getExistingPathINodes(byte[][] components)
+      throws UnresolvedLinkException {
+    return INodesInPath.resolve(rootDir, components, false);
+  }
+
+  /**
+   * Get {@link INode} associated with the file / directory.
+   */
+  public INodesInPath getINodesInPath4Write(String src)
+      throws UnresolvedLinkException, SnapshotAccessControlException {
+    return getINodesInPath4Write(src, true);
+  }
+
+  /**
+   * Get {@link INode} associated with the file / directory.
+   * @throws SnapshotAccessControlException if path is in RO snapshot
+   */
+  public INode getINode4Write(String src) throws UnresolvedLinkException,
+      SnapshotAccessControlException {
+    return getINodesInPath4Write(src, true).getLastINode();
   }
 
   /** @return the {@link INodesInPath} containing all inodes in the path. */
-  INodesInPath getINodesInPath(String path, boolean resolveLink
-  ) throws UnresolvedLinkException {
+  public INodesInPath getINodesInPath(String path, boolean resolveLink)
+      throws UnresolvedLinkException {
     final byte[][] components = INode.getPathComponents(path);
-    return INodesInPath.resolve(rootDir, components, components.length,
-            resolveLink);
+    return INodesInPath.resolve(rootDir, components, resolveLink);
   }
 
   /** @return the last inode in the path. */
-  INode getNode(String path, boolean resolveLink)
-          throws UnresolvedLinkException {
-    return getLastINodeInPath(path, resolveLink).getINode(0);
+  INode getINode(String path, boolean resolveLink)
+      throws UnresolvedLinkException {
+    return getINodesInPath(path, resolveLink).getLastINode();
   }
 
   /**
-   * @return the INode of the last component in src, or null if the last
-   * component does not exist.
-   * @throws UnresolvedLinkException if symlink can't be resolved
-   * @throws SnapshotAccessControlException if path is in RO snapshot
+   * Get {@link INode} associated with the file / directory.
    */
-  INode getINode4Write(String src, boolean resolveLink)
-          throws UnresolvedLinkException, SnapshotAccessControlException {
-    return getINodesInPath4Write(src, resolveLink).getLastINode();
+  public INode getINode(String src) throws UnresolvedLinkException {
+    return getINode(src, true);
   }
 
   /**
@@ -1926,7 +1859,7 @@ public class FSDirectory implements Closeable {
           throws UnresolvedLinkException, SnapshotAccessControlException {
     final byte[][] components = INode.getPathComponents(src);
     INodesInPath inodesInPath = INodesInPath.resolve(rootDir, components,
-            components.length, resolveLink);
+        resolveLink);
     if (inodesInPath.isSnapshot()) {
       throw new SnapshotAccessControlException(
               "Modification on a read-only snapshot is disallowed");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c78e3a7c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 2721f85..833b9db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -342,7 +342,7 @@ public class FSEditLogLoader {
       // 3. OP_ADD to open file for append
 
       // See if the file already exists (persistBlocks call)
-      final INodesInPath iip = fsDir.getINodesInPath(path, true);
+      INodesInPath iip = fsDir.getINodesInPath(path, true);
       INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path, true);
       if (oldFile != null && addCloseOp.overwrite) {
         // This is OP_ADD with overwrite
@@ -361,11 +361,12 @@ public class FSEditLogLoader {
         inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion,
             lastInodeId);
         newFile = fsDir.unprotectedAddFile(inodeId,
-            path, addCloseOp.permissions, addCloseOp.aclEntries,
+            iip, addCloseOp.permissions, addCloseOp.aclEntries,
             addCloseOp.xAttrs,
             replication, addCloseOp.mtime, addCloseOp.atime,
             addCloseOp.blockSize, true, addCloseOp.clientName,
             addCloseOp.clientMachine, addCloseOp.storagePolicyId);
+        iip = INodesInPath.replace(iip, iip.length() - 1, newFile);
         fsNamesys.leaseManager.addLease(addCloseOp.clientName, path);
 
         // add the op into retry cache if necessary
@@ -384,10 +385,10 @@ public class FSEditLogLoader {
             FSNamesystem.LOG.debug("Reopening an already-closed file " +
                 "for append");
           }
-          LocatedBlock lb = fsNamesys.prepareFileForWrite(path,
-              oldFile, addCloseOp.clientName, addCloseOp.clientMachine, false, iip.getLatestSnapshotId(), false);
-          newFile = INodeFile.valueOf(fsDir.getINode(path),
-              path, true);
+          // Note we do not replace the INodeFile when converting it to
+          // under-construction
+          LocatedBlock lb = fsNamesys.prepareFileForWrite(path, iip,
+              addCloseOp.clientName, addCloseOp.clientMachine, false, false);
           
           // add the op into retry cache is necessary
           if (toAddRetryCache) {
@@ -408,7 +409,7 @@ public class FSEditLogLoader {
       // Update the salient file attributes.
       newFile.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
       newFile.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
-      updateBlocks(fsDir, addCloseOp, newFile);
+      updateBlocks(fsDir, addCloseOp, iip, newFile);
       break;
     }
     case OP_CLOSE: {
@@ -422,13 +423,13 @@ public class FSEditLogLoader {
             " clientMachine " + addCloseOp.clientMachine);
       }
 
-      final INodesInPath iip = fsDir.getLastINodeInPath(path);
-      final INodeFile file = INodeFile.valueOf(iip.getINode(0), path);
+      final INodesInPath iip = fsDir.getINodesInPath(path, true);
+      final INodeFile file = INodeFile.valueOf(iip.getLastINode(), path);
 
       // Update the salient file attributes.
       file.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
       file.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
-      updateBlocks(fsDir, addCloseOp, file);
+      updateBlocks(fsDir, addCloseOp, iip, file);
 
       // Now close the file
       if (!file.isUnderConstruction() &&
@@ -455,10 +456,10 @@ public class FSEditLogLoader {
         FSNamesystem.LOG.debug(op.opCode + ": " + path +
             " numblocks : " + updateOp.blocks.length);
       }
-      INodeFile oldFile = INodeFile.valueOf(fsDir.getINode(path),
-          path);
+      INodesInPath iip = fsDir.getINodesInPath(path, true);
+      INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
       // Update in-memory data structures
-      updateBlocks(fsDir, updateOp, oldFile);
+      updateBlocks(fsDir, updateOp, iip, oldFile);
       
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId);
@@ -587,8 +588,10 @@ public class FSEditLogLoader {
       SymlinkOp symlinkOp = (SymlinkOp)op;
       inodeId = getAndUpdateLastInodeId(symlinkOp.inodeId, logVersion,
           lastInodeId);
-      fsDir.unprotectedAddSymlink(inodeId,
-          renameReservedPathsOnUpgrade(symlinkOp.path, logVersion),
+      final String path = renameReservedPathsOnUpgrade(symlinkOp.path,
+          logVersion);
+      final INodesInPath iip = fsDir.getINodesInPath(path, false);
+      fsDir.unprotectedAddSymlink(iip, inodeId,
           symlinkOp.value, symlinkOp.mtime, symlinkOp.atime,
           symlinkOp.permissionStatus);
       
@@ -922,7 +925,7 @@ public class FSEditLogLoader {
    * @throws IOException
    */
   private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
-      INodeFile file) throws IOException {
+      INodesInPath iip, INodeFile file) throws IOException {
     // Update its block list
     BlockInfo[] oldBlocks = file.getBlocks();
     Block[] newBlocks = op.getBlocks();
@@ -976,7 +979,7 @@ public class FSEditLogLoader {
             + path);
       }
       Block oldBlock = oldBlocks[oldBlocks.length - 1];
-      boolean removed = fsDir.unprotectedRemoveBlock(path, file, oldBlock);
+      boolean removed = fsDir.unprotectedRemoveBlock(path, iip, file, oldBlock);
       if (!removed && !(op instanceof UpdateBlocksOp)) {
         throw new IOException("Trying to delete non-existant block " + oldBlock);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c78e3a7c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
index e26f052..0a92054 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
@@ -596,7 +596,7 @@ public class FSImageFormat {
      // Rename .snapshot paths if we're doing an upgrade
      parentPath = renameReservedPathsOnUpgrade(parentPath, getLayoutVersion());
      final INodeDirectory parent = INodeDirectory.valueOf(
-         namesystem.dir.getNode(parentPath, true), parentPath);
+         namesystem.dir.getINode(parentPath, true), parentPath);
      return loadChildren(parent, in, counter);
    }
 
@@ -940,8 +940,8 @@ public class FSImageFormat {
           inSnapshot = true;
         } else {
           path = renameReservedPathsOnUpgrade(path, getLayoutVersion());
-          final INodesInPath iip = fsDir.getLastINodeInPath(path);
-          oldnode = INodeFile.valueOf(iip.getINode(0), path);
+          final INodesInPath iip = fsDir.getINodesInPath(path, true);
+          oldnode = INodeFile.valueOf(iip.getLastINode(), path);
         }
 
         FileUnderConstructionFeature uc = cons.getFileUnderConstructionFeature();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c78e3a7c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 5dd5920..b4b897a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -2028,7 +2028,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       if (!createParent) {
         dir.verifyParentDir(iip, link);
       }
-      if (!dir.isValidToCreate(link)) {
+      if (!dir.isValidToCreate(link, iip)) {
         throw new IOException("failed to create link " + link 
             +" either because the filename is invalid or the file exists");
       }
@@ -2039,7 +2039,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkFsObjectLimit();
 
       // add symbolic link to namespace
-      addSymlink(link, target, dirPerms, createParent, logRetryCache);
+      addSymlink(link, iip, target, dirPerms, createParent, logRetryCache);
       resultingStat = getAuditFileInfo(link, false);
     } finally {
       writeUnlock();
@@ -2191,11 +2191,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     try {
       checkOperation(OperationCategory.READ);
       filename = dir.resolvePath(pc, filename, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath(filename, true);
+      final INodesInPath iip = dir.getINodesInPath(filename, false);
       if (isPermissionEnabled) {
         dir.checkTraverse(pc, iip);
       }
-      return dir.getPreferredBlockSize(filename);
+      return INodeFile.valueOf(iip.getLastINode(), filename)
+          .getPreferredBlockSize();
     } finally {
       readUnlock();
     }
@@ -2491,14 +2492,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         if (overwrite) {
           toRemoveBlocks = new BlocksMapUpdateInfo();
           List<INode> toRemoveINodes = new ChunkedArrayList<INode>();
-          long ret = dir.delete(src, toRemoveBlocks, toRemoveINodes, now());
+          long ret = dir.delete(iip, toRemoveBlocks, toRemoveINodes, now());
           if (ret >= 0) {
             incrDeletedFileCount(ret);
             removePathAndBlocks(src, null, toRemoveINodes, true);
           }
         } else {
           // If lease soft limit time is expired, recover the lease
-          recoverLeaseInternal(myFile, src, holder, clientMachine, false);
+          recoverLeaseInternal(iip, src, holder, clientMachine, false);
           throw new FileAlreadyExistsException(src + " for client " +
               clientMachine + " already exists");
         }
@@ -2508,10 +2509,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       INodeFile newNode = null;
 
       // Always do an implicit mkdirs for parent directory tree.
-      Path parent = new Path(src).getParent();
-      if (parent != null && FSDirMkdirOp.mkdirsRecursively(dir,
-          parent.toString(), permissions, true, now())) {
-        newNode = dir.addFile(src, permissions, replication, blockSize,
+      INodesInPath parentIIP = iip.getParentINodesInPath();
+      if (parentIIP != null && (parentIIP = FSDirMkdirOp.mkdirsRecursively(dir,
+          parentIIP, permissions, true, now())) != null) {
+        iip = INodesInPath.append(parentIIP, newNode, iip.getLastLocalName());
+        newNode = dir.addFile(iip, src, permissions, replication, blockSize,
                               holder, clientMachine);
       }
 
@@ -2621,12 +2623,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             "Cannot append to lazy persist file " + src);
       }
       // Opening an existing file for write - may need to recover lease.
-      recoverLeaseInternal(myFile, src, holder, clientMachine, false);
+      recoverLeaseInternal(iip, src, holder, clientMachine, false);
       
-      // recoverLeaseInternal may create a new InodeFile via 
-      // finalizeINodeFileUnderConstruction so we need to refresh 
-      // the referenced file.  
-      myFile = INodeFile.valueOf(dir.getINode(src), src, true);
       final BlockInfo lastBlock = myFile.getLastBlock();
       // Check that the block has at least minimum replication.
       if(lastBlock != null && lastBlock.isComplete() &&
@@ -2634,8 +2632,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         throw new IOException("append: lastBlock=" + lastBlock +
             " of src=" + src + " is not sufficiently replicated yet.");
       }
-      return prepareFileForWrite(src, myFile, holder, clientMachine, true,
-              iip.getLatestSnapshotId(), logRetryCache);
+      return prepareFileForWrite(src, iip, holder, clientMachine, true,
+              logRetryCache);
     } catch (IOException ie) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
       throw ie;
@@ -2643,11 +2641,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
   
   /**
-   * Replace current node with a INodeUnderConstruction.
+   * Convert current node to under construction.
    * Recreate in-memory lease record.
    * 
    * @param src path to the file
-   * @param file existing file object
    * @param leaseHolder identifier of the lease holder on this file
    * @param clientMachine identifier of the client machine
    * @param writeToEditLog whether to persist this change to the edit log
@@ -2657,26 +2654,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * @throws UnresolvedLinkException
    * @throws IOException
    */
-  LocatedBlock prepareFileForWrite(String src, INodeFile file,
-                                   String leaseHolder, String clientMachine,
-                                   boolean writeToEditLog,
-                                   int latestSnapshot, boolean logRetryCache)
-      throws IOException {
-    file.recordModification(latestSnapshot);
-    final INodeFile cons = file.toUnderConstruction(leaseHolder, clientMachine);
+  LocatedBlock prepareFileForWrite(String src, INodesInPath iip,
+      String leaseHolder, String clientMachine, boolean writeToEditLog,
+      boolean logRetryCache) throws IOException {
+    final INodeFile file = iip.getLastINode().asFile();
+    file.recordModification(iip.getLatestSnapshotId());
+    file.toUnderConstruction(leaseHolder, clientMachine);
 
-    leaseManager.addLease(cons.getFileUnderConstructionFeature()
-        .getClientName(), src);
+    leaseManager.addLease(
+        file.getFileUnderConstructionFeature().getClientName(), src);
     
-    LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
+    LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(file);
     if (ret != null) {
       // update the quota: use the preferred block size for UC block
       final long diff = file.getPreferredBlockSize() - ret.getBlockSize();
-      dir.updateSpaceConsumed(src, 0, diff * file.getBlockReplication());
+      dir.updateSpaceConsumed(iip, 0, diff * file.getBlockReplication());
     }
 
     if (writeToEditLog) {
-      getEditLog().logOpenFile(src, cons, false, logRetryCache);
+      getEditLog().logOpenFile(src, file, false, logRetryCache);
     }
     return ret;
   }
@@ -2716,7 +2712,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         dir.checkPathAccess(pc, iip, FsAction.WRITE);
       }
   
-      recoverLeaseInternal(inode, src, holder, clientMachine, true);
+      recoverLeaseInternal(iip, src, holder, clientMachine, true);
     } catch (StandbyException se) {
       skipSync = true;
       throw se;
@@ -2731,11 +2727,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return false;
   }
 
-  private void recoverLeaseInternal(INodeFile fileInode, 
+  private void recoverLeaseInternal(INodesInPath iip,
       String src, String holder, String clientMachine, boolean force)
       throws IOException {
     assert hasWriteLock();
-    if (fileInode != null && fileInode.isUnderConstruction()) {
+    INodeFile file = iip.getLastINode().asFile();
+    if (file != null && file.isUnderConstruction()) {
       //
       // If the file is under construction , then it must be in our
       // leases. Find the appropriate lease record.
@@ -2758,7 +2755,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       //
       // Find the original holder.
       //
-      FileUnderConstructionFeature uc = fileInode.getFileUnderConstructionFeature();
+      FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
       String clientName = uc.getClientName();
       lease = leaseManager.getLease(clientName);
       if (lease == null) {
@@ -2772,7 +2769,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         // close only the file src
         LOG.info("recoverLease: " + lease + ", src=" + src +
           " from client " + clientName);
-        internalReleaseLease(lease, src, holder);
+        internalReleaseLease(lease, src, iip, holder);
       } else {
         assert lease.getHolder().equals(clientName) :
           "Current lease holder " + lease.getHolder() +
@@ -2784,13 +2781,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         if (lease.expiredSoftLimit()) {
           LOG.info("startFile: recover " + lease + ", src=" + src + " client "
               + clientName);
-          boolean isClosed = internalReleaseLease(lease, src, null);
+          boolean isClosed = internalReleaseLease(lease, src, iip, null);
           if(!isClosed)
             throw new RecoveryInProgressException(
                 "Failed to close file " + src +
                 ". Lease recovery is in progress. Try again later.");
         } else {
-          final BlockInfo lastBlock = fileInode.getLastBlock();
+          final BlockInfo lastBlock = file.getLastBlock();
           if (lastBlock != null
               && lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
             throw new RecoveryInProgressException("Recovery in progress, file ["
@@ -2822,10 +2819,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   private LastBlockWithStatus appendFileInt(final String srcArg, String holder,
-      String clientMachine, boolean logRetryCache)
-      throws AccessControlException, SafeModeException,
-      FileAlreadyExistsException, FileNotFoundException,
-      ParentNotDirectoryException, IOException {
+      String clientMachine, boolean logRetryCache) throws IOException {
     String src = srcArg;
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: src=" + src
@@ -2892,10 +2886,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
       ExtendedBlock previous, Set<Node> excludedNodes, 
-      List<String> favoredNodes)
-      throws LeaseExpiredException, NotReplicatedYetException,
-      QuotaExceededException, SafeModeException, UnresolvedLinkException,
-      IOException {
+      List<String> favoredNodes) throws IOException {
     final long blockSize;
     final int replication;
     final byte storagePolicyID;
@@ -2983,7 +2974,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       }
 
       // commit the last block and complete it if it has minimum replicas
-      commitOrCompleteLastBlock(pendingFile,
+      commitOrCompleteLastBlock(pendingFile, fileState.iip,
                                 ExtendedBlock.getLocalBlock(previous));
 
       // allocate new block, record block locations in INode.
@@ -3023,10 +3014,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   static class FileState {
     public final INodeFile inode;
     public final String path;
+    public final INodesInPath iip;
 
-    public FileState(INodeFile inode, String fullPath) {
+    public FileState(INodeFile inode, String fullPath, INodesInPath iip) {
       this.inode = inode;
       this.path = fullPath;
+      this.iip = iip;
     }
   }
 
@@ -3046,18 +3039,22 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkFsObjectLimit();
 
     Block previousBlock = ExtendedBlock.getLocalBlock(previous);
-    INode inode;
+    final INode inode;
+    final INodesInPath iip;
     if (fileId == INodeId.GRANDFATHER_INODE_ID) {
       // Older clients may not have given us an inode ID to work with.
       // In this case, we have to try to resolve the path and hope it
       // hasn't changed or been deleted since the file was opened for write.
-      final INodesInPath iip = dir.getINodesInPath4Write(src);
+      iip = dir.getINodesInPath4Write(src);
       inode = iip.getLastINode();
     } else {
       // Newer clients pass the inode ID, so we can just get the inode
       // directly.
       inode = dir.getInode(fileId);
-      if (inode != null) src = inode.getFullPathName();
+      iip = INodesInPath.fromINode(inode);
+      if (inode != null) {
+        src = iip.getPath();
+      }
     }
     final INodeFile pendingFile = checkLease(src, clientName, inode, fileId);
     BlockInfo lastBlockInFile = pendingFile.getLastBlock();
@@ -3117,7 +3114,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
             ((BlockInfoUnderConstruction)lastBlockInFile).getExpectedStorageLocations(),
             offset);
-        return new FileState(pendingFile, src);
+        return new FileState(pendingFile, src, iip);
       } else {
         // Case 3
         throw new IOException("Cannot allocate block in " + src + ": " +
@@ -3130,7 +3127,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     if (!checkFileProgress(src, pendingFile, false)) {
       throw new NotReplicatedYetException("Not replicated yet: " + src);
     }
-    return new FileState(pendingFile, src);
+    return new FileState(pendingFile, src, iip);
   }
 
   LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
@@ -3208,8 +3205,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * The client would like to let go of the given block
    */
   boolean abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
-      throws LeaseExpiredException, FileNotFoundException,
-      UnresolvedLinkException, IOException {
+      throws IOException {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " + b
           + "of file " + src);
@@ -3225,21 +3221,24 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       src = dir.resolvePath(pc, src, pathComponents);
 
       final INode inode;
+      final INodesInPath iip;
       if (fileId == INodeId.GRANDFATHER_INODE_ID) {
         // Older clients may not have given us an inode ID to work with.
         // In this case, we have to try to resolve the path and hope it
         // hasn't changed or been deleted since the file was opened for write.
-        inode = dir.getINode(src);
+        iip = dir.getINodesInPath(src, true);
+        inode = iip.getLastINode();
       } else {
         inode = dir.getInode(fileId);
-        if (inode != null) src = inode.getFullPathName();
+        iip = INodesInPath.fromINode(inode);
+        if (inode != null) {
+          src = iip.getPath();
+        }
       }
       final INodeFile file = checkLease(src, holder, inode, fileId);
 
-      //
       // Remove the block from the pending creates list
-      //
-      boolean removed = dir.removeBlock(src, file,
+      boolean removed = dir.removeBlock(src, iip, file,
           ExtendedBlock.getLocalBlock(b));
       if (!removed) {
         return true;
@@ -3258,8 +3257,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   private INodeFile checkLease(String src, String holder, INode inode,
-                               long fileId)
-      throws LeaseExpiredException, FileNotFoundException {
+      long fileId) throws LeaseExpiredException, FileNotFoundException {
     assert hasReadLock();
     final String ident = src + " (inode " + fileId + ")";
     if (inode == null) {
@@ -3336,29 +3334,30 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return success;
   }
 
-  private boolean completeFileInternal(String src, 
-      String holder, Block last, long fileId) throws SafeModeException,
-      UnresolvedLinkException, IOException {
+  private boolean completeFileInternal(String src, String holder, Block last,
+      long fileId) throws IOException {
     assert hasWriteLock();
     final INodeFile pendingFile;
+    final INodesInPath iip;
+    INode inode = null;
     try {
-      final INode inode;
       if (fileId == INodeId.GRANDFATHER_INODE_ID) {
         // Older clients may not have given us an inode ID to work with.
         // In this case, we have to try to resolve the path and hope it
         // hasn't changed or been deleted since the file was opened for write.
-        final INodesInPath iip = dir.getLastINodeInPath(src);
-        inode = iip.getINode(0);
+        iip = dir.getINodesInPath(src, true);
+        inode = iip.getLastINode();
       } else {
         inode = dir.getInode(fileId);
-        if (inode != null) src = inode.getFullPathName();
+        iip = INodesInPath.fromINode(inode);
+        if (inode != null) {
+          src = iip.getPath();
+        }
       }
       pendingFile = checkLease(src, holder, inode, fileId);
     } catch (LeaseExpiredException lee) {
-      final INode inode = dir.getINode(src);
-      if (inode != null
-          && inode.isFile()
-          && !inode.asFile().isUnderConstruction()) {
+      if (inode != null && inode.isFile() &&
+          !inode.asFile().isUnderConstruction()) {
         // This could be a retry RPC - i.e the client tried to close
         // the file, but missed the RPC response. Thus, it is trying
         // again to close the file. If the file still exists and
@@ -3383,7 +3382,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
 
     // commit the last block and complete it if it has minimum replicas
-    commitOrCompleteLastBlock(pendingFile, last);
+    commitOrCompleteLastBlock(pendingFile, iip, last);
 
     if (!checkFileProgress(src, pendingFile, true)) {
       return false;
@@ -3618,7 +3617,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
       long mtime = now();
       // Unlink the target directory from directory tree
-      long filesRemoved = dir.delete(src, collectedBlocks, removedINodes,
+      long filesRemoved = dir.delete(iip, collectedBlocks, removedINodes,
               mtime);
       if (filesRemoved < 0) {
         return false;
@@ -3885,7 +3884,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * @throws IOException if path does not exist
    */
   void fsync(String src, long fileId, String clientName, long lastBlockLength)
-      throws IOException, UnresolvedLinkException {
+      throws IOException {
     NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
     checkOperation(OperationCategory.WRITE);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
@@ -3933,15 +3932,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    *         false if block recovery has been initiated. Since the lease owner
    *         has been changed and logged, caller should call logSync().
    */
-  boolean internalReleaseLease(Lease lease, String src, 
-      String recoveryLeaseHolder) throws AlreadyBeingCreatedException, 
-      IOException, UnresolvedLinkException {
+  boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
+      String recoveryLeaseHolder) throws IOException {
     LOG.info("Recovering " + lease + ", src=" + src);
     assert !isInSafeMode();
     assert hasWriteLock();
 
-    final INodesInPath iip = dir.getLastINodeInPath(src);
-    final INodeFile pendingFile = iip.getINode(0).asFile();
+    final INodeFile pendingFile = iip.getLastINode().asFile();
     int nrBlocks = pendingFile.numBlocks();
     BlockInfo[] blocks = pendingFile.getBlocks();
 
@@ -4070,7 +4067,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   private void commitOrCompleteLastBlock(final INodeFile fileINode,
-      final Block commitBlock) throws IOException {
+      final INodesInPath iip, final Block commitBlock) throws IOException {
     assert hasWriteLock();
     Preconditions.checkArgument(fileINode.isUnderConstruction());
     if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) {
@@ -4081,8 +4078,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final long diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes();    
     if (diff > 0) {
       try {
-        String path = fileINode.getFullPathName();
-        dir.updateSpaceConsumed(path, 0, -diff*fileINode.getFileReplication());
+        dir.updateSpaceConsumed(iip, 0, -diff*fileINode.getFileReplication());
       } catch (IOException e) {
         LOG.warn("Unexpected exception while updating disk space.", e);
       }
@@ -4090,8 +4086,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   private void finalizeINodeFileUnderConstruction(String src,
-      INodeFile pendingFile, int latestSnapshot) throws IOException,
-      UnresolvedLinkException {
+      INodeFile pendingFile, int latestSnapshot) throws IOException {
     assert hasWriteLock();
 
     FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
@@ -4103,13 +4098,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     // The file is no longer pending.
     // Create permanent INode, update blocks. No need to replace the inode here
     // since we just remove the uc feature from pendingFile
-    final INodeFile newFile = pendingFile.toCompleteFile(now());
+    pendingFile.toCompleteFile(now());
 
     waitForLoadingFSImage();
     // close file and persist block allocations for this file
-    closeFile(src, newFile);
+    closeFile(src, pendingFile);
 
-    blockManager.checkReplication(newFile);
+    blockManager.checkReplication(pendingFile);
   }
 
   @VisibleForTesting
@@ -4126,11 +4121,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       return false;
     }
 
-    INodeFile inodeUC = (INodeFile) bc;
-    String fullName = inodeUC.getName();
+    String fullName = bc.getName();
     try {
       if (fullName != null && fullName.startsWith(Path.SEPARATOR)
-          && dir.getINode(fullName) == inodeUC) {
+          && dir.getINode(fullName) == bc) {
         // If file exists in normal path then no need to look in snapshot
         return false;
       }
@@ -4139,7 +4133,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       return false;
     }
     /*
-     * 1. if bc is an instance of INodeFileUnderConstructionWithSnapshot, and
+     * 1. if bc is under construction and also with snapshot, and
      * bc is not in the current fsdirectory tree, bc must represent a snapshot
      * file. 
      * 2. if fullName is not an absolute path, bc cannot be existent in the 
@@ -4153,8 +4147,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   void commitBlockSynchronization(ExtendedBlock lastblock,
       long newgenerationstamp, long newlength,
       boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
-      String[] newtargetstorages)
-      throws IOException, UnresolvedLinkException {
+      String[] newtargetstorages) throws IOException {
     LOG.info("commitBlockSynchronization(lastblock=" + lastblock
              + ", newgenerationstamp=" + newgenerationstamp
              + ", newlength=" + newlength
@@ -4312,10 +4305,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   @VisibleForTesting
   String closeFileCommitBlocks(INodeFile pendingFile, BlockInfo storedBlock)
       throws IOException {
-    String src = pendingFile.getFullPathName();
+    final INodesInPath iip = INodesInPath.fromINode(pendingFile);
+    final String src = iip.getPath();
 
     // commit the last block and complete it if it has minimum replicas
-    commitOrCompleteLastBlock(pendingFile, storedBlock);
+    commitOrCompleteLastBlock(pendingFile, iip, storedBlock);
 
     //remove lease, close file
     finalizeINodeFileUnderConstruction(src, pendingFile,
@@ -4515,7 +4509,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   /**
    * Add the given symbolic link to the fs. Record it in the edits log.
    */
-  private INodeSymlink addSymlink(String path, String target,
+  private INodeSymlink addSymlink(String path, INodesInPath iip, String target,
                                   PermissionStatus dirPerms,
                                   boolean createParent, boolean logRetryCache)
       throws UnresolvedLinkException, FileAlreadyExistsException,
@@ -4524,15 +4518,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
     final long modTime = now();
     if (createParent) {
-      final String parent = new Path(path).getParent().toString();
-      if (!FSDirMkdirOp.mkdirsRecursively(dir, parent, dirPerms, true,
-          modTime)) {
+      INodesInPath parentIIP = iip.getParentINodesInPath();
+      if (parentIIP == null || (parentIIP = FSDirMkdirOp.mkdirsRecursively(dir,
+          parentIIP, dirPerms, true, modTime)) == null) {
         return null;
+      } else {
+        iip = INodesInPath.append(parentIIP, null, iip.getLastLocalName());
       }
     }
     final String userName = dirPerms.getUserName();
     long id = dir.allocateNewInodeId();
-    INodeSymlink newNode = dir.addSymlink(id, path, target, modTime, modTime,
+    INodeSymlink newNode = dir.addSymlink(iip, id, target, modTime, modTime,
             new PermissionStatus(userName, null, FsPermission.getDefault()));
     if (newNode == null) {
       NameNode.stateChangeLog.info("addSymlink: failed to add " + path);