You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2014/12/24 20:35:59 UTC

[45/50] hadoop git commit: HDFS-7484. Make FSDirectory#addINode take existing INodes as its parameter. Contributed by Jing Zhao.

HDFS-7484. Make FSDirectory#addINode take existing INodes as its parameter. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a060e6f0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a060e6f0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a060e6f0

Branch: refs/heads/HDFS-EC
Commit: a060e6f03f983b6fa4f34650a570519f3967abbe
Parents: 7efc73c
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Dec 22 23:19:20 2014 -0800
Committer: Zhe Zhang <zh...@cloudera.com>
Committed: Wed Dec 24 11:22:19 2014 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/fs/SymlinkBaseTest.java   |   3 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/namenode/FSDirMkdirOp.java      | 325 ++++++++++---------
 .../hdfs/server/namenode/FSDirRenameOp.java     | 178 ++++++----
 .../hdfs/server/namenode/FSDirSymlinkOp.java    |  32 +-
 .../hdfs/server/namenode/FSDirectory.java       | 143 ++++----
 .../hdfs/server/namenode/FSEditLogLoader.java   |  37 ++-
 .../hdfs/server/namenode/FSNamesystem.java      |  35 +-
 .../hdfs/server/namenode/INodesInPath.java      |  64 ++--
 .../snapshot/TestRenameWithSnapshots.java       |   8 +-
 10 files changed, 444 insertions(+), 384 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a060e6f0/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/SymlinkBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/SymlinkBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/SymlinkBaseTest.java
index 1f5b863..4d738f7 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/SymlinkBaseTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/SymlinkBaseTest.java
@@ -577,7 +577,8 @@ public abstract class SymlinkBaseTest {
     } catch (IOException e) {
       // Expected
       if (wrapper instanceof FileContextTestWrapper) {
-        assertEquals("No AbstractFileSystem for scheme: null", e.getMessage());
+        GenericTestUtils.assertExceptionContains(
+            "No AbstractFileSystem configured for scheme: null", e);
       } else if (wrapper instanceof FileSystemTestWrapper) {
         assertEquals("No FileSystem for scheme: null", e.getMessage());
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a060e6f0/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 66b3a8d..8525402 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -476,6 +476,9 @@ Release 2.7.0 - UNRELEASED
 
     HDFS-7530. Allow renaming of encryption zone roots. (Charles Lamb via wang)
 
+    HDFS-7484. Make FSDirectory#addINode take existing INodes as its parameter.
+    (jing9)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a060e6f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
index 4ea77e6..f51427f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
@@ -17,9 +17,10 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.Charsets;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.InvalidPathException;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -29,18 +30,19 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
 import java.io.IOException;
+import java.util.AbstractMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.hadoop.util.Time.now;
 
 class FSDirMkdirOp {
-  static HdfsFileStatus mkdirs(
-      FSNamesystem fsn, String src, PermissionStatus permissions,
-      boolean createParent) throws IOException {
+
+  static HdfsFileStatus mkdirs(FSNamesystem fsn, String src,
+      PermissionStatus permissions, boolean createParent) throws IOException {
     FSDirectory fsd = fsn.getFSDirectory();
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
@@ -50,188 +52,193 @@ class FSDirMkdirOp {
     }
     FSPermissionChecker pc = fsd.getPermissionChecker();
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    src = fsd.resolvePath(pc, src, pathComponents);
-    INodesInPath iip = fsd.getINodesInPath4Write(src);
-    if (fsd.isPermissionEnabled()) {
-      fsd.checkTraverse(pc, iip);
-    }
-
-    if (!isDirMutable(fsd, iip)) {
+    fsd.writeLock();
+    try {
+      src = fsd.resolvePath(pc, src, pathComponents);
+      INodesInPath iip = fsd.getINodesInPath4Write(src);
       if (fsd.isPermissionEnabled()) {
-        fsd.checkAncestorAccess(pc, iip, FsAction.WRITE);
+        fsd.checkTraverse(pc, iip);
       }
 
-      if (!createParent) {
-        fsd.verifyParentDir(iip, src);
+      final INode lastINode = iip.getLastINode();
+      if (lastINode != null && lastINode.isFile()) {
+        throw new FileAlreadyExistsException("Path is not a directory: " + src);
       }
 
-      // validate that we have enough inodes. This is, at best, a
-      // heuristic because the mkdirs() operation might need to
-      // create multiple inodes.
-      fsn.checkFsObjectLimit();
-      iip = mkdirsRecursively(fsd, iip, permissions, false, now());
-      if (iip == null) {
-        throw new IOException("Failed to create directory: " + src);
+      INodesInPath existing = lastINode != null ? iip : iip.getExistingINodes();
+      if (lastINode == null) {
+        if (fsd.isPermissionEnabled()) {
+          fsd.checkAncestorAccess(pc, iip, FsAction.WRITE);
+        }
+
+        if (!createParent) {
+          fsd.verifyParentDir(iip, src);
+        }
+
+        // validate that we have enough inodes. This is, at best, a
+        // heuristic because the mkdirs() operation might need to
+        // create multiple inodes.
+        fsn.checkFsObjectLimit();
+
+        List<String> nonExisting = iip.getPath(existing.length(),
+            iip.length() - existing.length());
+        int length = nonExisting.size();
+        if (length > 1) {
+          List<String> ancestors = nonExisting.subList(0, length - 1);
+          // Ensure that the user can traversal the path by adding implicit
+          // u+wx permission to all ancestor directories
+          existing = createChildrenDirectories(fsd, existing, ancestors,
+              addImplicitUwx(permissions, permissions));
+          if (existing == null) {
+            throw new IOException("Failed to create directory: " + src);
+          }
+        }
+
+        if ((existing = createChildrenDirectories(fsd, existing,
+            nonExisting.subList(length - 1, length), permissions)) == null) {
+          throw new IOException("Failed to create directory: " + src);
+        }
       }
+      return fsd.getAuditFileInfo(existing);
+    } finally {
+      fsd.writeUnlock();
     }
-    return fsd.getAuditFileInfo(iip);
   }
 
-  static INode unprotectedMkdir(
-      FSDirectory fsd, long inodeId, String src,
-      PermissionStatus permissions, List<AclEntry> aclEntries, long timestamp)
-      throws QuotaExceededException, UnresolvedLinkException, AclException {
-    assert fsd.hasWriteLock();
-    byte[][] components = INode.getPathComponents(src);
-    final INodesInPath iip = fsd.getExistingPathINodes(components);
-    final int pos = iip.length() - 1;
-    final INodesInPath newiip = unprotectedMkdir(fsd, inodeId, iip, pos,
-        components[pos], permissions, aclEntries, timestamp);
-    return newiip.getINode(pos);
+  /**
+   * For a given absolute path, create all ancestors as directories along the
+   * path. All ancestors inherit their parent's permission plus an implicit
+   * u+wx permission. This is used by create() and addSymlink() for
+   * implicitly creating all directories along the path.
+   *
+   * For example, path="/foo/bar/spam", "/foo" is an existing directory,
+   * "/foo/bar" is not existing yet, the function will create directory bar.
+   *
+   * @return a tuple which contains both the new INodesInPath (with all the
+   * existing and newly created directories) and the last component in the
+   * relative path. Or return null if there are errors.
+   */
+  static Map.Entry<INodesInPath, String> createAncestorDirectories(
+      FSDirectory fsd, INodesInPath iip, PermissionStatus permission)
+      throws IOException {
+    final String last = new String(iip.getLastLocalName(), Charsets.UTF_8);
+    INodesInPath existing = iip.getExistingINodes();
+    List<String> children = iip.getPath(existing.length(),
+        iip.length() - existing.length());
+    int size = children.size();
+    if (size > 1) { // otherwise all ancestors have been created
+      List<String> directories = children.subList(0, size - 1);
+      INode parentINode = existing.getLastINode();
+      // Ensure that the user can traversal the path by adding implicit
+      // u+wx permission to all ancestor directories
+      existing = createChildrenDirectories(fsd, existing, directories,
+          addImplicitUwx(parentINode.getPermissionStatus(), permission));
+      if (existing == null) {
+        return null;
+      }
+    }
+    return new AbstractMap.SimpleImmutableEntry<>(existing, last);
   }
 
   /**
-   * Create a directory
-   * If ancestor directories do not exist, automatically create them.
-
+   * Create the directory {@code parent} / {@code children} and all ancestors
+   * along the path.
+   *
    * @param fsd FSDirectory
-   * @param iip the INodesInPath instance containing all the existing INodes
-   *            and null elements for non-existing components in the path
-   * @param permissions the permission of the directory
-   * @param inheritPermission
-   *   if the permission of the directory should inherit from its parent or not.
-   *   u+wx is implicitly added to the automatically created directories,
-   *   and to the given directory if inheritPermission is true
-   * @param now creation time
-   * @return non-null INodesInPath instance if operation succeeds
-   * @throws QuotaExceededException if directory creation violates
-   *                                any quota limit
-   * @throws UnresolvedLinkException if a symlink is encountered in src.
-   * @throws SnapshotAccessControlException if path is in RO snapshot
+   * @param existing The INodesInPath instance containing all the existing
+   *                 ancestral INodes
+   * @param children The relative path from the parent towards children,
+   *                 starting with "/"
+   * @param perm the permission of the directory. Note that all ancestors
+   *             created along the path has implicit {@code u+wx} permissions.
+   *
+   * @return {@link INodesInPath} which contains all inodes to the
+   * target directory, After the execution parentPath points to the path of
+   * the returned INodesInPath. The function return null if the operation has
+   * failed.
    */
-  static INodesInPath mkdirsRecursively(FSDirectory fsd, INodesInPath iip,
-      PermissionStatus permissions, boolean inheritPermission, long now)
-      throws FileAlreadyExistsException, QuotaExceededException,
-             UnresolvedLinkException, SnapshotAccessControlException,
-             AclException {
-    final int lastInodeIndex = iip.length() - 1;
-    final byte[][] components = iip.getPathComponents();
-    final String[] names = new String[components.length];
-    for (int i = 0; i < components.length; i++) {
-      names[i] = DFSUtil.bytes2String(components[i]);
-    }
+  private static INodesInPath createChildrenDirectories(FSDirectory fsd,
+      INodesInPath existing, List<String> children, PermissionStatus perm)
+      throws IOException {
+    assert fsd.hasWriteLock();
 
-    fsd.writeLock();
-    try {
-      if (iip.isSnapshot()) {
-        throw new SnapshotAccessControlException(
-                "Modification on RO snapshot is disallowed");
-      }
-      final int length = iip.length();
-      // find the index of the first null in inodes[]
-      StringBuilder pathbuilder = new StringBuilder();
-      int i = 1;
-      INode curNode;
-      for(; i < length && (curNode = iip.getINode(i)) != null; i++) {
-        pathbuilder.append(Path.SEPARATOR).append(names[i]);
-        if (!curNode.isDirectory()) {
-          throw new FileAlreadyExistsException("Parent path is not a directory: "
-                  + pathbuilder + " " + curNode.getLocalName());
-        }
+    for (String component : children) {
+      existing = createSingleDirectory(fsd, existing, component, perm);
+      if (existing == null) {
+        return null;
       }
+    }
+    return existing;
+  }
 
-      // default to creating parent dirs with the given perms
-      PermissionStatus parentPermissions = permissions;
-
-      // if not inheriting and it's the last inode, there's no use in
-      // computing perms that won't be used
-      if (inheritPermission || (i < lastInodeIndex)) {
-        // if inheriting (ie. creating a file or symlink), use the parent dir,
-        // else the supplied permissions
-        // NOTE: the permissions of the auto-created directories violate posix
-        FsPermission parentFsPerm = inheritPermission ?
-            iip.getINode(i-1).getFsPermission() : permissions.getPermission();
-
-        // ensure that the permissions allow user write+execute
-        if (!parentFsPerm.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
-          parentFsPerm = new FsPermission(
-                  parentFsPerm.getUserAction().or(FsAction.WRITE_EXECUTE),
-                  parentFsPerm.getGroupAction(),
-                  parentFsPerm.getOtherAction()
-          );
-        }
+  static void mkdirForEditLog(FSDirectory fsd, long inodeId, String src,
+      PermissionStatus permissions, List<AclEntry> aclEntries, long timestamp)
+      throws QuotaExceededException, UnresolvedLinkException, AclException,
+      FileAlreadyExistsException {
+    assert fsd.hasWriteLock();
+    INodesInPath iip = fsd.getINodesInPath(src, false);
+    final byte[] localName = iip.getLastLocalName();
+    final INodesInPath existing = iip.getParentINodesInPath();
+    Preconditions.checkState(existing.getLastINode() != null);
+    unprotectedMkdir(fsd, inodeId, existing, localName, permissions, aclEntries,
+        timestamp);
+  }
 
-        if (!parentPermissions.getPermission().equals(parentFsPerm)) {
-          parentPermissions = new PermissionStatus(
-                  parentPermissions.getUserName(),
-                  parentPermissions.getGroupName(),
-                  parentFsPerm
-          );
-          // when inheriting, use same perms for entire path
-          if (inheritPermission) permissions = parentPermissions;
-        }
-      }
+  private static INodesInPath createSingleDirectory(FSDirectory fsd,
+      INodesInPath existing, String localName, PermissionStatus perm)
+      throws IOException {
+    assert fsd.hasWriteLock();
+    existing = unprotectedMkdir(fsd, fsd.allocateNewInodeId(), existing,
+        localName.getBytes(Charsets.UTF_8), perm, null, now());
+    if (existing == null) {
+      return null;
+    }
 
-      // create directories beginning from the first null index
-      for(; i < length; i++) {
-        pathbuilder.append(Path.SEPARATOR).append(names[i]);
-        iip = unprotectedMkdir(fsd, fsd.allocateNewInodeId(), iip, i,
-            components[i], (i < lastInodeIndex) ? parentPermissions :
-                permissions, null, now);
-        if (iip.getINode(i) == null) {
-          return null;
-        }
-        // Directory creation also count towards FilesCreated
-        // to match count of FilesDeleted metric.
-        NameNode.getNameNodeMetrics().incrFilesCreated();
-
-        final String cur = pathbuilder.toString();
-        fsd.getEditLog().logMkDir(cur, iip.getINode(i));
-        if(NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug(
-                  "mkdirs: created directory " + cur);
-        }
-      }
-    } finally {
-      fsd.writeUnlock();
+    final INode newNode = existing.getLastINode();
+    // Directory creation also count towards FilesCreated
+    // to match count of FilesDeleted metric.
+    NameNode.getNameNodeMetrics().incrFilesCreated();
+
+    String cur = existing.getPath();
+    fsd.getEditLog().logMkDir(cur, newNode);
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("mkdirs: created directory " + cur);
     }
-    return iip;
+    return existing;
   }
 
-  /**
-   * Check whether the path specifies a directory
-   * @throws SnapshotAccessControlException if path is in RO snapshot
-   */
-  private static boolean isDirMutable(FSDirectory fsd, INodesInPath iip)
-      throws SnapshotAccessControlException {
-    fsd.readLock();
-    try {
-      INode node = iip.getLastINode();
-      return node != null && node.isDirectory();
-    } finally {
-      fsd.readUnlock();
-    }
+  private static PermissionStatus addImplicitUwx(PermissionStatus parentPerm,
+      PermissionStatus perm) {
+    FsPermission p = parentPerm.getPermission();
+    FsPermission ancestorPerm = new FsPermission(
+        p.getUserAction().or(FsAction.WRITE_EXECUTE),
+        p.getGroupAction(),
+        p.getOtherAction());
+    return new PermissionStatus(perm.getUserName(), perm.getGroupName(),
+        ancestorPerm);
   }
 
-  /** create a directory at index pos.
-   * The parent path to the directory is at [0, pos-1].
-   * All ancestors exist. Newly created one stored at index pos.
+  /**
+   * create a directory at path specified by parent
    */
-  private static INodesInPath unprotectedMkdir(
-      FSDirectory fsd, long inodeId, INodesInPath inodesInPath, int pos,
-      byte[] name, PermissionStatus permission, List<AclEntry> aclEntries,
-      long timestamp)
-      throws QuotaExceededException, AclException {
+  private static INodesInPath unprotectedMkdir(FSDirectory fsd, long inodeId,
+      INodesInPath parent, byte[] name, PermissionStatus permission,
+      List<AclEntry> aclEntries, long timestamp)
+      throws QuotaExceededException, AclException, FileAlreadyExistsException {
     assert fsd.hasWriteLock();
+    assert parent.getLastINode() != null;
+    if (!parent.getLastINode().isDirectory()) {
+      throw new FileAlreadyExistsException("Parent path is not a directory: " +
+          parent.getPath() + " " + DFSUtil.bytes2String(name));
+    }
     final INodeDirectory dir = new INodeDirectory(inodeId, name, permission,
         timestamp);
-    if (fsd.addChild(inodesInPath, pos, dir, true)) {
-      if (aclEntries != null) {
-        AclStorage.updateINodeAcl(dir, aclEntries, Snapshot.CURRENT_STATE_ID);
-      }
-      return INodesInPath.replace(inodesInPath, pos, dir);
-    } else {
-      return inodesInPath;
+
+    INodesInPath iip = fsd.addLastINode(parent, dir, true);
+    if (iip != null && aclEntries != null) {
+      AclStorage.updateINodeAcl(dir, aclEntries, Snapshot.CURRENT_STATE_ID);
     }
+    return iip;
   }
 }
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a060e6f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
index 4239f46..2023b1e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.Options;
@@ -124,7 +125,7 @@ class FSDirRenameOp {
    */
   @Deprecated
   @SuppressWarnings("deprecation")
-  static boolean unprotectedRenameTo(FSDirectory fsd, String src, String dst,
+  static boolean renameForEditLog(FSDirectory fsd, String src, String dst,
       long timestamp) throws IOException {
     if (fsd.isDir(dst)) {
       dst += Path.SEPARATOR + new Path(src).getName();
@@ -194,11 +195,7 @@ class FSDirRenameOp {
 
     try {
       // remove src
-      final long removedSrc = fsd.removeLastINode(tx.srcIIP);
-      if (removedSrc == -1) {
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + "failed to rename " + src + " to " + dst + " because the source" +
-            " can not be removed");
+      if (!tx.removeSrc4OldRename()) {
         return false;
       }
 
@@ -256,8 +253,8 @@ class FSDirRenameOp {
   }
 
   /**
-   * @see #unprotectedRenameTo(FSDirectory, String, String, long,
-   * org.apache.hadoop.fs.Options.Rename...)
+   * @see {@link #unprotectedRenameTo(FSDirectory, String, String, INodesInPath,
+   * INodesInPath, long, BlocksMapUpdateInfo, Options.Rename...)}
    */
   static void renameTo(FSDirectory fsd, FSPermissionChecker pc, String src,
       String dst, BlocksMapUpdateInfo collectedBlocks, boolean logRetryCache,
@@ -305,7 +302,7 @@ class FSDirRenameOp {
    * @param timestamp modification time
    * @param options   Rename options
    */
-  static boolean unprotectedRenameTo(
+  static boolean renameForEditLog(
       FSDirectory fsd, String src, String dst, long timestamp,
       Options.Rename... options)
       throws IOException {
@@ -331,6 +328,7 @@ class FSDirRenameOp {
    * @param timestamp       modification time
    * @param collectedBlocks blocks to be removed
    * @param options         Rename options
+   * @return whether a file/directory gets overwritten in the dst path
    */
   static boolean unprotectedRenameTo(FSDirectory fsd, String src, String dst,
       final INodesInPath srcIIP, final INodesInPath dstIIP, long timestamp,
@@ -387,22 +385,14 @@ class FSDirRenameOp {
     RenameOperation tx = new RenameOperation(fsd, src, dst, srcIIP, dstIIP);
 
     boolean undoRemoveSrc = true;
-    final long removedSrc = fsd.removeLastINode(tx.srcIIP);
-    if (removedSrc == -1) {
-      error = "Failed to rename " + src + " to " + dst +
-          " because the source can not be removed";
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
-          error);
-      throw new IOException(error);
-    }
+    tx.removeSrc();
 
     boolean undoRemoveDst = false;
-    INode removedDst = null;
     long removedNum = 0;
     try {
-      if (dstInode != null) { // dst exists remove it
-        if ((removedNum = fsd.removeLastINode(tx.dstIIP)) != -1) {
-          removedDst = tx.dstIIP.getLastINode();
+      if (dstInode != null) { // dst exists, remove it
+        removedNum = tx.removeDst();
+        if (removedNum != -1) {
           undoRemoveDst = true;
         }
       }
@@ -419,22 +409,10 @@ class FSDirRenameOp {
 
         // Collect the blocks and remove the lease for previous dst
         boolean filesDeleted = false;
-        if (removedDst != null) {
+        if (undoRemoveDst) {
           undoRemoveDst = false;
           if (removedNum > 0) {
-            List<INode> removedINodes = new ChunkedArrayList<>();
-            if (!removedDst.isInLatestSnapshot(tx.dstIIP.getLatestSnapshotId())) {
-              removedDst.destroyAndCollectBlocks(collectedBlocks,
-                  removedINodes);
-              filesDeleted = true;
-            } else {
-              filesDeleted = removedDst.cleanSubtree(
-                  Snapshot.CURRENT_STATE_ID, tx.dstIIP.getLatestSnapshotId(),
-                  collectedBlocks, removedINodes, true)
-                  .get(Quota.NAMESPACE) >= 0;
-            }
-            fsd.getFSNamesystem().removePathAndBlocks(src, null,
-                removedINodes, false);
+            filesDeleted = tx.cleanDst(collectedBlocks);
           }
         }
 
@@ -451,22 +429,8 @@ class FSDirRenameOp {
       if (undoRemoveSrc) {
         tx.restoreSource();
       }
-
-      if (undoRemoveDst) {
-        // Rename failed - restore dst
-        if (dstParent.isDirectory() &&
-            dstParent.asDirectory().isWithSnapshot()) {
-          dstParent.asDirectory().undoRename4DstParent(removedDst,
-              dstIIP.getLatestSnapshotId());
-        } else {
-          fsd.addLastINodeNoQuotaCheck(tx.dstIIP, removedDst);
-        }
-        if (removedDst.isReference()) {
-          final INodeReference removedDstRef = removedDst.asReference();
-          final INodeReference.WithCount wc = (INodeReference.WithCount)
-              removedDstRef.getReferredINode().asReference();
-          wc.addReference(removedDstRef);
-        }
+      if (undoRemoveDst) { // Rename failed - restore dst
+        tx.restoreDst();
       }
     }
     NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
@@ -590,8 +554,10 @@ class FSDirRenameOp {
 
   private static class RenameOperation {
     private final FSDirectory fsd;
-    private final INodesInPath srcIIP;
-    private final INodesInPath dstIIP;
+    private INodesInPath srcIIP;
+    private final INodesInPath srcParentIIP;
+    private INodesInPath dstIIP;
+    private final INodesInPath dstParentIIP;
     private final String src;
     private final String dst;
     private final INodeReference.WithCount withCount;
@@ -602,25 +568,31 @@ class FSDirRenameOp {
     private final boolean srcChildIsReference;
     private final Quota.Counts oldSrcCounts;
     private INode srcChild;
+    private INode oldDstChild;
 
     RenameOperation(FSDirectory fsd, String src, String dst,
                     INodesInPath srcIIP, INodesInPath dstIIP)
         throws QuotaExceededException {
       this.fsd = fsd;
-      this.dstIIP = dstIIP;
       this.src = src;
       this.dst = dst;
-      srcChild = srcIIP.getLastINode();
+      this.srcIIP = srcIIP;
+      this.dstIIP = dstIIP;
+      this.srcParentIIP = srcIIP.getParentINodesInPath();
+      this.dstParentIIP = dstIIP.getParentINodesInPath();
+
+      srcChild = this.srcIIP.getLastINode();
       srcChildName = srcChild.getLocalNameBytes();
-      isSrcInSnapshot = srcChild.isInLatestSnapshot(srcIIP.getLatestSnapshotId());
+      final int srcLatestSnapshotId = srcIIP.getLatestSnapshotId();
+      isSrcInSnapshot = srcChild.isInLatestSnapshot(srcLatestSnapshotId);
       srcChildIsReference = srcChild.isReference();
-      srcParent = srcIIP.getINode(-2).asDirectory();
+      srcParent = this.srcIIP.getINode(-2).asDirectory();
 
       // Record the snapshot on srcChild. After the rename, before any new
       // snapshot is taken on the dst tree, changes will be recorded in the
       // latest snapshot of the src tree.
       if (isSrcInSnapshot) {
-        srcChild.recordModification(srcIIP.getLatestSnapshotId());
+        srcChild.recordModification(srcLatestSnapshotId);
       }
 
       // check srcChild for reference
@@ -628,12 +600,12 @@ class FSDirRenameOp {
           srcChild.asReference().getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
       oldSrcCounts = Quota.Counts.newInstance();
       if (isSrcInSnapshot) {
-        final INodeReference.WithName withName =
-            srcIIP.getINode(-2).asDirectory().replaceChild4ReferenceWithName(
-                srcChild, srcIIP.getLatestSnapshotId());
+        final INodeReference.WithName withName = srcParent
+            .replaceChild4ReferenceWithName(srcChild, srcLatestSnapshotId);
         withCount = (INodeReference.WithCount) withName.getReferredINode();
         srcChild = withName;
-        srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1, srcChild);
+        this.srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1,
+            srcChild);
         // get the counts before rename
         withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
       } else if (srcChildIsReference) {
@@ -643,12 +615,45 @@ class FSDirRenameOp {
       } else {
         withCount = null;
       }
-      this.srcIIP = srcIIP;
+    }
+
+    long removeSrc() throws IOException {
+      long removedNum = fsd.removeLastINode(srcIIP);
+      if (removedNum == -1) {
+        String error = "Failed to rename " + src + " to " + dst +
+            " because the source can not be removed";
+        NameNode.stateChangeLog.warn("DIR* FSDirRenameOp.unprotectedRenameTo:" +
+            error);
+        throw new IOException(error);
+      }
+      srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1, null);
+      return removedNum;
+    }
+
+    boolean removeSrc4OldRename() throws IOException {
+      final long removedSrc = fsd.removeLastINode(srcIIP);
+      if (removedSrc == -1) {
+        NameNode.stateChangeLog.warn("DIR* FSDirRenameOp.unprotectedRenameTo: "
+            + "failed to rename " + src + " to " + dst + " because the source" +
+            " can not be removed");
+        return false;
+      } else {
+        srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1, null);
+        return true;
+      }
+    }
+
+    long removeDst() throws IOException {
+      long removedNum = fsd.removeLastINode(dstIIP);
+      if (removedNum != -1) {
+        oldDstChild = dstIIP.getLastINode();
+        dstIIP = INodesInPath.replace(dstIIP, dstIIP.length() - 1, null);
+      }
+      return removedNum;
     }
 
     boolean addSourceToDestination() {
-      final INode dstParent = dstIIP.getINode(-2);
-      srcChild = srcIIP.getLastINode();
+      final INode dstParent = dstParentIIP.getLastINode();
       final byte[] dstChildName = dstIIP.getLastLocalName();
       final INode toDst;
       if (withCount == null) {
@@ -656,16 +661,15 @@ class FSDirRenameOp {
         toDst = srcChild;
       } else {
         withCount.getReferredINode().setLocalName(dstChildName);
-        int dstSnapshotId = dstIIP.getLatestSnapshotId();
         toDst = new INodeReference.DstReference(dstParent.asDirectory(),
-            withCount, dstSnapshotId);
+            withCount, dstIIP.getLatestSnapshotId());
       }
-      return fsd.addLastINodeNoQuotaCheck(dstIIP, toDst);
+      return fsd.addLastINodeNoQuotaCheck(dstParentIIP, toDst) != null;
     }
 
     void updateMtimeAndLease(long timestamp) throws QuotaExceededException {
       srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshotId());
-      final INode dstParent = dstIIP.getINode(-2);
+      final INode dstParent = dstParentIIP.getLastINode();
       dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshotId());
       // update moved lease with new filename
       fsd.getFSNamesystem().unprotectedChangeLease(src, dst);
@@ -694,10 +698,44 @@ class FSDirRenameOp {
       } else {
         // srcParent is not an INodeDirectoryWithSnapshot, we only need to add
         // the srcChild back
-        fsd.addLastINodeNoQuotaCheck(srcIIP, srcChild);
+        fsd.addLastINodeNoQuotaCheck(srcParentIIP, srcChild);
       }
     }
 
+    void restoreDst() throws QuotaExceededException {
+      Preconditions.checkState(oldDstChild != null);
+      final INodeDirectory dstParent = dstParentIIP.getLastINode().asDirectory();
+      if (dstParent.isWithSnapshot()) {
+        dstParent.undoRename4DstParent(oldDstChild, dstIIP.getLatestSnapshotId());
+      } else {
+        fsd.addLastINodeNoQuotaCheck(dstParentIIP, oldDstChild);
+      }
+      if (oldDstChild != null && oldDstChild.isReference()) {
+        final INodeReference removedDstRef = oldDstChild.asReference();
+        final INodeReference.WithCount wc = (INodeReference.WithCount)
+            removedDstRef.getReferredINode().asReference();
+        wc.addReference(removedDstRef);
+      }
+    }
+
+    boolean cleanDst(BlocksMapUpdateInfo collectedBlocks)
+        throws QuotaExceededException {
+      Preconditions.checkState(oldDstChild != null);
+      List<INode> removedINodes = new ChunkedArrayList<>();
+      final boolean filesDeleted;
+      if (!oldDstChild.isInLatestSnapshot(dstIIP.getLatestSnapshotId())) {
+        oldDstChild.destroyAndCollectBlocks(collectedBlocks, removedINodes);
+        filesDeleted = true;
+      } else {
+        filesDeleted = oldDstChild.cleanSubtree(Snapshot.CURRENT_STATE_ID,
+            dstIIP.getLatestSnapshotId(), collectedBlocks, removedINodes,
+            true).get(Quota.NAMESPACE) >= 0;
+      }
+      fsd.getFSNamesystem().removePathAndBlocks(src, null, removedINodes,
+          false);
+      return filesDeleted;
+    }
+
     void updateQuotasInSourceTree() throws QuotaExceededException {
       // update the quota usage in src tree
       if (isSrcInSnapshot) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a060e6f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSymlinkOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSymlinkOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSymlinkOp.java
index d232b87..8970fbe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSymlinkOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSymlinkOp.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 
 import java.io.IOException;
+import java.util.Map;
 
 import static org.apache.hadoop.util.Time.now;
 
@@ -80,40 +81,39 @@ class FSDirSymlinkOp {
     return fsd.getAuditFileInfo(iip);
   }
 
-  static INodeSymlink unprotectedAddSymlink(
-      FSDirectory fsd, INodesInPath iip, long id, String target, long mtime,
-      long atime, PermissionStatus perm)
+  static INodeSymlink unprotectedAddSymlink(FSDirectory fsd, INodesInPath iip,
+      byte[] localName, long id, String target, long mtime, long atime,
+      PermissionStatus perm)
       throws UnresolvedLinkException, QuotaExceededException {
     assert fsd.hasWriteLock();
     final INodeSymlink symlink = new INodeSymlink(id, null, perm, mtime, atime,
         target);
-    return fsd.addINode(iip, symlink) ? symlink : null;
+    symlink.setLocalName(localName);
+    return fsd.addINode(iip, symlink) != null ? symlink : null;
   }
 
   /**
    * Add the given symbolic link to the fs. Record it in the edits log.
    */
-  private static INodeSymlink addSymlink(
-      FSDirectory fsd, String path, INodesInPath iip, String target,
-      PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
-      throws IOException {
+  private static INodeSymlink addSymlink(FSDirectory fsd, String path,
+      INodesInPath iip, String target, PermissionStatus dirPerms,
+      boolean createParent, boolean logRetryCache) throws IOException {
     final long mtime = now();
+    final byte[] localName = iip.getLastLocalName();
     if (createParent) {
-      INodesInPath parentIIP = iip.getParentINodesInPath();
-      if (parentIIP == null || (parentIIP = FSDirMkdirOp.mkdirsRecursively(
-          fsd,
-          parentIIP, dirPerms, true, mtime)) == null) {
+      Map.Entry<INodesInPath, String> e = FSDirMkdirOp
+          .createAncestorDirectories(fsd, iip, dirPerms);
+      if (e == null) {
         return null;
-      } else {
-        iip = INodesInPath.append(parentIIP, null, iip.getLastLocalName());
       }
+      iip = INodesInPath.append(e.getKey(), null, localName);
     }
     final String userName = dirPerms.getUserName();
     long id = fsd.allocateNewInodeId();
     PermissionStatus perm = new PermissionStatus(
         userName, null, FsPermission.getDefault());
-    INodeSymlink newNode =
-        unprotectedAddSymlink(fsd, iip, id, target, mtime, mtime, perm);
+    INodeSymlink newNode = unprotectedAddSymlink(fsd, iip.getExistingINodes(),
+        localName, id, target, mtime, mtime, perm);
     if (newNode == null) {
       NameNode.stateChangeLog.info("addSymlink: failed to add " + path);
       return null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a060e6f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index c025e01..b39519f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.io.Charsets;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -81,6 +82,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DE
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
+import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
 import static org.apache.hadoop.util.Time.now;
 
 /**
@@ -370,53 +372,44 @@ public class FSDirectory implements Closeable {
 
   /**
    * Add the given filename to the fs.
-   * @throws FileAlreadyExistsException
-   * @throws QuotaExceededException
-   * @throws UnresolvedLinkException
-   * @throws SnapshotAccessControlException 
+   * @return the new INodesInPath instance that contains the new INode
    */
-  INodeFile addFile(INodesInPath iip, String path, PermissionStatus permissions,
-                    short replication, long preferredBlockSize,
-                    String clientName, String clientMachine)
+  INodesInPath addFile(INodesInPath existing, String localName, PermissionStatus
+      permissions, short replication, long preferredBlockSize,
+      String clientName, String clientMachine)
     throws FileAlreadyExistsException, QuotaExceededException,
       UnresolvedLinkException, SnapshotAccessControlException, AclException {
 
     long modTime = now();
     INodeFile newNode = newINodeFile(allocateNewInodeId(), permissions, modTime,
         modTime, replication, preferredBlockSize);
+    newNode.setLocalName(localName.getBytes(Charsets.UTF_8));
     newNode.toUnderConstruction(clientName, clientMachine);
 
-    boolean added = false;
+    INodesInPath newiip;
     writeLock();
     try {
-      added = addINode(iip, newNode);
+      newiip = addINode(existing, newNode);
     } finally {
       writeUnlock();
     }
-    if (!added) {
-      NameNode.stateChangeLog.info("DIR* addFile: failed to add " + path);
+    if (newiip == null) {
+      NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
+          existing.getPath() + "/" + localName);
       return null;
     }
 
     if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* addFile: " + path + " is added");
-    }
-    return newNode;
-  }
-
-  INodeFile unprotectedAddFile(long id,
-                            INodesInPath iip,
-                            PermissionStatus permissions,
-                            List<AclEntry> aclEntries,
-                            List<XAttr> xAttrs,
-                            short replication,
-                            long modificationTime,
-                            long atime,
-                            long preferredBlockSize,
-                            boolean underConstruction,
-                            String clientName,
-                            String clientMachine,
-                            byte storagePolicyId) {
+      NameNode.stateChangeLog.debug("DIR* addFile: " + localName + " is added");
+    }
+    return newiip;
+  }
+
+  INodeFile addFileForEditLog(long id, INodesInPath existing, byte[] localName,
+      PermissionStatus permissions, List<AclEntry> aclEntries,
+      List<XAttr> xAttrs, short replication, long modificationTime, long atime,
+      long preferredBlockSize, boolean underConstruction, String clientName,
+      String clientMachine, byte storagePolicyId) {
     final INodeFile newNode;
     assert hasWriteLock();
     if (underConstruction) {
@@ -428,15 +421,15 @@ public class FSDirectory implements Closeable {
           replication, preferredBlockSize, storagePolicyId);
     }
 
+    newNode.setLocalName(localName);
     try {
-      if (addINode(iip, newNode)) {
+      INodesInPath iip = addINode(existing, newNode);
+      if (iip != null) {
         if (aclEntries != null) {
-          AclStorage.updateINodeAcl(newNode, aclEntries,
-            Snapshot.CURRENT_STATE_ID);
+          AclStorage.updateINodeAcl(newNode, aclEntries, CURRENT_STATE_ID);
         }
         if (xAttrs != null) {
-          XAttrStorage.updateINodeXAttrs(newNode, xAttrs,
-              Snapshot.CURRENT_STATE_ID);
+          XAttrStorage.updateINodeXAttrs(newNode, xAttrs, CURRENT_STATE_ID);
         }
         return newNode;
       }
@@ -444,7 +437,7 @@ public class FSDirectory implements Closeable {
       if(NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug(
             "DIR* FSDirectory.unprotectedAddFile: exception when add "
-                + iip.getPath() + " to the file system", e);
+                + existing.getPath() + " to the file system", e);
       }
     }
     return null;
@@ -683,7 +676,7 @@ public class FSDirectory implements Closeable {
     if (!targetNode.isInLatestSnapshot(latestSnapshot)) {
       targetNode.destroyAndCollectBlocks(collectedBlocks, removedINodes);
     } else {
-      Quota.Counts counts = targetNode.cleanSubtree(Snapshot.CURRENT_STATE_ID,
+      Quota.Counts counts = targetNode.cleanSubtree(CURRENT_STATE_ID,
           latestSnapshot, collectedBlocks, removedINodes, true);
       parent.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
           -counts.get(Quota.DISKSPACE), true);
@@ -857,16 +850,18 @@ public class FSDirectory implements Closeable {
 
   /**
    * Add the given child to the namespace.
-   * @param iip the INodesInPath instance containing all the ancestral INodes
+   * @param existing the INodesInPath containing all the ancestral INodes
+   * @param child the new INode to add
+   * @return a new INodesInPath instance containing the new child INode. Null
+   * if the adding fails.
    * @throws QuotaExceededException is thrown if it violates quota limit
    */
-  boolean addINode(INodesInPath iip, INode child)
+  INodesInPath addINode(INodesInPath existing, INode child)
       throws QuotaExceededException, UnresolvedLinkException {
-    child.setLocalName(iip.getLastLocalName());
     cacheName(child);
     writeLock();
     try {
-      return addLastINode(iip, child, true);
+      return addLastINode(existing, child, true);
     } finally {
       writeUnlock();
     }
@@ -958,7 +953,7 @@ public class FSDirectory implements Closeable {
    */
   void verifyMaxDirItems(INodeDirectory parent, String parentPath)
       throws MaxDirectoryItemsExceededException {
-    final int count = parent.getChildrenList(Snapshot.CURRENT_STATE_ID).size();
+    final int count = parent.getChildrenList(CURRENT_STATE_ID).size();
     if (count >= maxDirItems) {
       final MaxDirectoryItemsExceededException e
           = new MaxDirectoryItemsExceededException(maxDirItems, count);
@@ -974,35 +969,27 @@ public class FSDirectory implements Closeable {
   }
   
   /**
-   * The same as {@link #addChild(INodesInPath, int, INode, boolean)}
-   * with pos = length - 1.
+   * Add a child to the end of the path specified by INodesInPath.
+   * @return an INodesInPath instance containing the new INode
    */
-  private boolean addLastINode(INodesInPath inodesInPath, INode inode,
+  INodesInPath addLastINode(INodesInPath existing, INode inode,
       boolean checkQuota) throws QuotaExceededException {
-    final int pos = inodesInPath.length() - 1;
-    return addChild(inodesInPath, pos, inode, checkQuota);
-  }
+    assert existing.getLastINode() != null &&
+        existing.getLastINode().isDirectory();
 
-  /** Add a node child to the inodes at index pos. 
-   * Its ancestors are stored at [0, pos-1].
-   * @return false if the child with this name already exists; 
-   *         otherwise return true;
-   * @throws QuotaExceededException is thrown if it violates quota limit
-   */
-  boolean addChild(INodesInPath iip, int pos, INode child, boolean checkQuota)
-      throws QuotaExceededException {
+    final int pos = existing.length();
     // Disallow creation of /.reserved. This may be created when loading
     // editlog/fsimage during upgrade since /.reserved was a valid name in older
     // release. This may also be called when a user tries to create a file
     // or directory /.reserved.
-    if (pos == 1 && iip.getINode(0) == rootDir && isReservedName(child)) {
+    if (pos == 1 && existing.getINode(0) == rootDir && isReservedName(inode)) {
       throw new HadoopIllegalArgumentException(
-          "File name \"" + child.getLocalName() + "\" is reserved and cannot "
+          "File name \"" + inode.getLocalName() + "\" is reserved and cannot "
               + "be created. If this is during upgrade change the name of the "
               + "existing file or directory to another name before upgrading "
               + "to the new release.");
     }
-    final INodeDirectory parent = iip.getINode(pos-1).asDirectory();
+    final INodeDirectory parent = existing.getINode(pos - 1).asDirectory();
     // The filesystem limits are not really quotas, so this check may appear
     // odd. It's because a rename operation deletes the src, tries to add
     // to the dest, if that fails, re-adds the src from whence it came.
@@ -1010,46 +997,47 @@ public class FSDirectory implements Closeable {
     // original location becase a quota violation would cause the the item
     // to go "poof".  The fs limits must be bypassed for the same reason.
     if (checkQuota) {
-      final String parentPath = iip.getPath(pos - 1);
-      verifyMaxComponentLength(child.getLocalNameBytes(), parentPath);
+      final String parentPath = existing.getPath(pos - 1);
+      verifyMaxComponentLength(inode.getLocalNameBytes(), parentPath);
       verifyMaxDirItems(parent, parentPath);
     }
     // always verify inode name
-    verifyINodeName(child.getLocalNameBytes());
-    
-    final Quota.Counts counts = child.computeQuotaUsage();
-    updateCount(iip, pos,
+    verifyINodeName(inode.getLocalNameBytes());
+
+    final Quota.Counts counts = inode.computeQuotaUsage();
+    updateCount(existing, pos,
         counts.get(Quota.NAMESPACE), counts.get(Quota.DISKSPACE), checkQuota);
-    boolean isRename = (child.getParent() != null);
+    boolean isRename = (inode.getParent() != null);
     boolean added;
     try {
-      added = parent.addChild(child, true, iip.getLatestSnapshotId());
+      added = parent.addChild(inode, true, existing.getLatestSnapshotId());
     } catch (QuotaExceededException e) {
-      updateCountNoQuotaCheck(iip, pos,
+      updateCountNoQuotaCheck(existing, pos,
           -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
       throw e;
     }
     if (!added) {
-      updateCountNoQuotaCheck(iip, pos,
+      updateCountNoQuotaCheck(existing, pos,
           -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
+      return null;
     } else {
       if (!isRename) {
-        AclStorage.copyINodeDefaultAcl(child);
+        AclStorage.copyINodeDefaultAcl(inode);
       }
-      addToInodeMap(child);
+      addToInodeMap(inode);
     }
-    return added;
+    return INodesInPath.append(existing, inode, inode.getLocalNameBytes());
   }
-  
-  boolean addLastINodeNoQuotaCheck(INodesInPath inodesInPath, INode i) {
+
+  INodesInPath addLastINodeNoQuotaCheck(INodesInPath existing, INode i) {
     try {
-      return addLastINode(inodesInPath, i, false);
+      return addLastINode(existing, i, false);
     } catch (QuotaExceededException e) {
       NameNode.LOG.warn("FSDirectory.addChildNoQuotaCheck - unexpected", e);
     }
-    return false;
+    return null;
   }
-  
+
   /**
    * Remove the last inode in the path from the namespace.
    * Count of each ancestor with quota is also updated.
@@ -1058,8 +1046,7 @@ public class FSDirectory implements Closeable {
    *            reference nodes;
    *         >0 otherwise. 
    */
-  long removeLastINode(final INodesInPath iip)
-      throws QuotaExceededException {
+  long removeLastINode(final INodesInPath iip) throws QuotaExceededException {
     final int latestSnapshot = iip.getLatestSnapshotId();
     final INode last = iip.getLastINode();
     final INodeDirectory parent = iip.getINode(-2).asDirectory();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a060e6f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 2ae5e03..0118926 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -347,6 +347,7 @@ public class FSEditLogLoader {
       if (oldFile != null && addCloseOp.overwrite) {
         // This is OP_ADD with overwrite
         fsDir.unprotectedDelete(path, addCloseOp.mtime);
+        iip = INodesInPath.replace(iip, iip.length() - 1, null);
         oldFile = null;
       }
       INodeFile newFile = oldFile;
@@ -358,13 +359,16 @@ public class FSEditLogLoader {
         assert addCloseOp.blocks.length == 0;
 
         // add to the file tree
-        inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion,
-            lastInodeId);
-        newFile = fsDir.unprotectedAddFile(
-            inodeId, iip, addCloseOp.permissions, addCloseOp.aclEntries,
-            addCloseOp.xAttrs, replication, addCloseOp.mtime,
-            addCloseOp.atime, addCloseOp.blockSize, true,
-            addCloseOp.clientName, addCloseOp.clientMachine,
+        inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion, lastInodeId);
+        newFile = fsDir.addFileForEditLog(inodeId, iip.getExistingINodes(),
+            iip.getLastLocalName(),
+            addCloseOp.permissions,
+            addCloseOp.aclEntries,
+            addCloseOp.xAttrs, replication,
+            addCloseOp.mtime, addCloseOp.atime,
+            addCloseOp.blockSize, true,
+            addCloseOp.clientName,
+            addCloseOp.clientMachine,
             addCloseOp.storagePolicyId);
         iip = INodesInPath.replace(iip, iip.length() - 1, newFile);
         fsNamesys.leaseManager.addLease(addCloseOp.clientName, path);
@@ -506,7 +510,7 @@ public class FSEditLogLoader {
       RenameOldOp renameOp = (RenameOldOp)op;
       final String src = renameReservedPathsOnUpgrade(renameOp.src, logVersion);
       final String dst = renameReservedPathsOnUpgrade(renameOp.dst, logVersion);
-      FSDirRenameOp.unprotectedRenameTo(fsDir, src, dst, renameOp.timestamp);
+      FSDirRenameOp.renameForEditLog(fsDir, src, dst, renameOp.timestamp);
       
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId);
@@ -528,7 +532,7 @@ public class FSEditLogLoader {
       MkdirOp mkdirOp = (MkdirOp)op;
       inodeId = getAndUpdateLastInodeId(mkdirOp.inodeId, logVersion,
           lastInodeId);
-      FSDirMkdirOp.unprotectedMkdir(fsDir, inodeId,
+      FSDirMkdirOp.mkdirForEditLog(fsDir, inodeId,
           renameReservedPathsOnUpgrade(mkdirOp.path, logVersion),
           mkdirOp.permissions, mkdirOp.aclEntries, mkdirOp.timestamp);
       break;
@@ -568,9 +572,10 @@ public class FSEditLogLoader {
     }
 
     case OP_SET_QUOTA:
-      SetQuotaOp setQuotaOp = (SetQuotaOp)op;
-      FSDirAttrOp.unprotectedSetQuota(fsDir, renameReservedPathsOnUpgrade(
-          setQuotaOp.src, logVersion), setQuotaOp.nsQuota, setQuotaOp.dsQuota);
+      SetQuotaOp setQuotaOp = (SetQuotaOp) op;
+      FSDirAttrOp.unprotectedSetQuota(fsDir,
+          renameReservedPathsOnUpgrade(setQuotaOp.src, logVersion),
+          setQuotaOp.nsQuota, setQuotaOp.dsQuota);
       break;
 
     case OP_TIMES: {
@@ -587,9 +592,9 @@ public class FSEditLogLoader {
       final String path = renameReservedPathsOnUpgrade(symlinkOp.path,
           logVersion);
       final INodesInPath iip = fsDir.getINodesInPath(path, false);
-      FSDirSymlinkOp.unprotectedAddSymlink(fsDir, iip, inodeId, symlinkOp.value,
-                                           symlinkOp.mtime, symlinkOp.atime,
-                                           symlinkOp.permissionStatus);
+      FSDirSymlinkOp.unprotectedAddSymlink(fsDir, iip.getExistingINodes(),
+          iip.getLastLocalName(), inodeId, symlinkOp.value, symlinkOp.mtime,
+          symlinkOp.atime, symlinkOp.permissionStatus);
       
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(symlinkOp.rpcClientId, symlinkOp.rpcCallId);
@@ -598,7 +603,7 @@ public class FSEditLogLoader {
     }
     case OP_RENAME: {
       RenameOp renameOp = (RenameOp)op;
-      FSDirRenameOp.unprotectedRenameTo(fsDir,
+      FSDirRenameOp.renameForEditLog(fsDir,
           renameReservedPathsOnUpgrade(renameOp.src, logVersion),
           renameReservedPathsOnUpgrade(renameOp.dst, logVersion),
           renameOp.timestamp, renameOp.options);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a060e6f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index bb1c4ed..0e25189 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -2211,13 +2211,21 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot create file" + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath4Write(src);
-      toRemoveBlocks = startFileInternal(pc, iip, permissions, holder,
-          clientMachine, create, overwrite, createParent, replication, 
-          blockSize, isLazyPersist, suite, protocolVersion, edek, logRetryCache);
-      stat = FSDirStatAndListingOp.getFileInfo(dir, src, false,
-          FSDirectory.isReservedRawName(srcArg), true);
+      dir.writeLock();
+      try {
+        src = dir.resolvePath(pc, src, pathComponents);
+        final INodesInPath iip = dir.getINodesInPath4Write(src);
+        toRemoveBlocks = startFileInternal(
+            pc, iip, permissions, holder,
+            clientMachine, create, overwrite,
+            createParent, replication, blockSize,
+            isLazyPersist, suite, protocolVersion, edek,
+            logRetryCache);
+        stat = FSDirStatAndListingOp.getFileInfo(
+            dir, src, false, FSDirectory.isReservedRawName(srcArg), true);
+      } finally {
+        dir.writeUnlock();
+      }
     } catch (StandbyException se) {
       skipSync = true;
       throw se;
@@ -2311,6 +2319,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           List<INode> toRemoveINodes = new ChunkedArrayList<INode>();
           long ret = dir.delete(iip, toRemoveBlocks, toRemoveINodes, now());
           if (ret >= 0) {
+            iip = INodesInPath.replace(iip, iip.length() - 1, null);
             incrDeletedFileCount(ret);
             removePathAndBlocks(src, null, toRemoveINodes, true);
           }
@@ -2326,12 +2335,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       INodeFile newNode = null;
 
       // Always do an implicit mkdirs for parent directory tree.
-      INodesInPath parentIIP = iip.getParentINodesInPath();
-      if (parentIIP != null && (parentIIP = FSDirMkdirOp.mkdirsRecursively(dir,
-          parentIIP, permissions, true, now())) != null) {
-        iip = INodesInPath.append(parentIIP, newNode, iip.getLastLocalName());
-        newNode = dir.addFile(iip, src, permissions, replication, blockSize,
-                              holder, clientMachine);
+      Map.Entry<INodesInPath, String> parent = FSDirMkdirOp
+          .createAncestorDirectories(dir, iip, permissions);
+      if (parent != null) {
+        iip = dir.addFile(parent.getKey(), parent.getValue(), permissions,
+            replication, blockSize, holder, clientMachine);
+        newNode = iip != null ? iip.getLastINode().asFile() : null;
       }
 
       if (newNode == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a060e6f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
index cfc7a24..389b62b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -352,6 +353,21 @@ public class INodesInPath {
     return DFSUtil.byteArray2PathString(path, 0, pos);
   }
 
+  /**
+   * @param offset start endpoint (inclusive)
+   * @param length number of path components
+   * @return sub-list of the path
+   */
+  public List<String> getPath(int offset, int length) {
+    Preconditions.checkArgument(offset >= 0 && length >= 0 && offset + length
+        <= path.length);
+    ImmutableList.Builder<String> components = ImmutableList.builder();
+    for (int i = offset; i < offset + length; i++) {
+      components.add(DFSUtil.bytes2String(path[i]));
+    }
+    return components.build();
+  }
+
   public int length() {
     return inodes.length;
   }
@@ -363,27 +379,17 @@ public class INodesInPath {
   /**
    * @param length number of ancestral INodes in the returned INodesInPath
    *               instance
-   * @return the INodesInPath instance containing ancestral INodes
+   * @return the INodesInPath instance containing ancestral INodes. Note that
+   * this method only handles non-snapshot paths.
    */
   private INodesInPath getAncestorINodesInPath(int length) {
     Preconditions.checkArgument(length >= 0 && length < inodes.length);
+    Preconditions.checkState(!isSnapshot());
     final INode[] anodes = new INode[length];
-    final byte[][] apath;
-    final boolean isSnapshot;
-    final int snapshotId;
-    int dotSnapshotIndex = getDotSnapshotIndex();
-    if (this.isSnapshot && length >= dotSnapshotIndex + 1) {
-      apath = new byte[length + 1][];
-      isSnapshot = true;
-      snapshotId = this.snapshotId;
-    } else {
-      apath = new byte[length][];
-      isSnapshot = false;
-      snapshotId = this.isSnapshot ? CURRENT_STATE_ID : this.snapshotId;
-    }
+    final byte[][] apath = new byte[length][];
     System.arraycopy(this.inodes, 0, anodes, 0, length);
-    System.arraycopy(this.path, 0, apath, 0, apath.length);
-    return new INodesInPath(anodes, apath, isSnapshot, snapshotId);
+    System.arraycopy(this.path, 0, apath, 0, length);
+    return new INodesInPath(anodes, apath, false, snapshotId);
   }
 
   /**
@@ -395,19 +401,23 @@ public class INodesInPath {
         null;
   }
 
-  private int getDotSnapshotIndex() {
-    if (isSnapshot) {
-      for (int i = 0; i < path.length; i++) {
-        if (isDotSnapshotDir(path[i])) {
-          return i;
-        }
+  /**
+   * @return a new INodesInPath instance that only contains exisitng INodes.
+   * Note that this method only handles non-snapshot paths.
+   */
+  public INodesInPath getExistingINodes() {
+    Preconditions.checkState(!isSnapshot());
+    int i = 0;
+    for (; i < inodes.length; i++) {
+      if (inodes[i] == null) {
+        break;
       }
-      throw new IllegalStateException("The path " + getPath()
-          + " is a snapshot path but does not contain "
-          + HdfsConstants.DOT_SNAPSHOT_DIR);
-    } else {
-      return -1;
     }
+    INode[] existing = new INode[i];
+    byte[][] existingPath = new byte[i][];
+    System.arraycopy(inodes, 0, existing, 0, i);
+    System.arraycopy(path, 0, existingPath, 0, i);
+    return new INodesInPath(existing, existingPath, false, snapshotId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a060e6f0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
index 62f0841..e133d06 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
@@ -76,7 +76,7 @@ import org.mockito.Mockito;
 
 /** Testing rename with snapshots. */
 public class TestRenameWithSnapshots {
-  {
+  static {
     SnapshotTestHelper.disableLogs();
   }
   private static final Log LOG = LogFactory.getLog(TestRenameWithSnapshots.class);
@@ -2066,10 +2066,10 @@ public class TestRenameWithSnapshots {
   
   /**
    * This test demonstrates that 
-   * {@link INodeDirectory#removeChild(INode, Snapshot)}
+   * {@link INodeDirectory#removeChild}
    * and 
-   * {@link INodeDirectory#addChild(INode, boolean, Snapshot)}
-   * should use {@link INode#isInLatestSnapshot(Snapshot)} to check if the 
+   * {@link INodeDirectory#addChild}
+   * should use {@link INode#isInLatestSnapshot} to check if the
    * added/removed child should be recorded in snapshots.
    */
   @Test