You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2016/01/05 20:52:04 UTC

[04/50] [abbrv] hadoop git commit: [partial-ns] Implement rename() and rename2().

[partial-ns] Implement rename() and rename2().


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7a82dbd3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7a82dbd3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7a82dbd3

Branch: refs/heads/feature-HDFS-8286
Commit: 7a82dbd33bcdbe76c6f65e71fdabdf5ad9653f5d
Parents: 707775c
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Apr 30 17:22:39 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Jun 12 13:56:55 2015 -0700

----------------------------------------------------------------------
 .../hdfs/server/namenode/FSDirRenameOp.java     | 696 +++++++------------
 .../server/namenode/FSDirStatAndListingOp.java  |   3 +-
 .../server/namenode/TestGetBlockLocations.java  |   4 +-
 3 files changed, 264 insertions(+), 439 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a82dbd3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
index b69bb42..f322186 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.Options;
@@ -29,12 +29,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
-import org.apache.hadoop.hdfs.util.ReadOnlyList;
-import org.apache.hadoop.util.ChunkedArrayList;
-import org.apache.hadoop.util.Time;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -46,6 +41,7 @@ import java.util.Map;
 
 import static org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
 import static org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
+import static org.apache.hadoop.util.Time.now;
 
 class FSDirRenameOp {
   @Deprecated
@@ -63,59 +59,67 @@ class FSDirRenameOp {
       throw new IOException("Invalid name: " + dst);
     }
     FSPermissionChecker pc = fsd.getPermissionChecker();
-
-    byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
     HdfsFileStatus resultingStat = null;
-    src = fsd.resolvePath(pc, src, srcComponents);
-    dst = fsd.resolvePath(pc, dst, dstComponents);
-    @SuppressWarnings("deprecation")
-    final boolean status = renameTo(fsd, pc, src, dst, logRetryCache);
-    if (status) {
-      INodesInPath dstIIP = fsd.getINodesInPath(dst, false);
-      resultingStat = fsd.getAuditFileInfo(dstIIP);
-    }
-    return new RenameOldResult(status, resultingStat);
+    try (RWTransaction tx = fsd.newRWTransaction().begin()) {
+      // Rename does not operate on link targets
+      // Do not resolveLink when checking permissions of src and dst
+      // Check write access to parent of src
+      Resolver.Result srcPaths = Resolver.resolveNoSymlink(tx, src);
+      Resolver.Result dstPaths = Resolver.resolve(tx, dst);
+      @SuppressWarnings("deprecation")
+      final boolean status = renameTo(tx,
+          fsd, pc, srcPaths, dstPaths, logRetryCache);
+      if (status) {
+        dstPaths = Resolver.resolve(tx, dst);
+        resultingStat = fsd.getAuditFileInfo(dstPaths.inodesInPath());
+      }
+      tx.commit();
+      return new RenameOldResult(status, resultingStat);
+    }
   }
 
   /**
    * Verify quota for rename operation where srcInodes[srcInodes.length-1] moves
    * dstInodes[dstInodes.length-1]
    */
-  private static void verifyQuotaForRename(FSDirectory fsd, INodesInPath src,
-      INodesInPath dst) throws QuotaExceededException {
-    if (!fsd.getFSNamesystem().isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
-      // Do not check quota if edits log is still being processed
-      return;
-    }
-    int i = 0;
-    while(src.getINode(i) == dst.getINode(i)) { i++; }
-    // src[i - 1] is the last common ancestor.
-    BlockStoragePolicySuite bsps = fsd.getBlockStoragePolicySuite();
-    final QuotaCounts delta = src.getLastINode().computeQuotaUsage(bsps);
-
-    // Reduce the required quota by dst that is being removed
-    final INode dstINode = dst.getLastINode();
-    if (dstINode != null) {
-      delta.subtract(dstINode.computeQuotaUsage(bsps));
-    }
-    FSDirectory.verifyQuota(dst, dst.length() - 1, delta, src.getINode(i - 1));
+  private static void verifyQuotaForRename(
+      FSDirectory fsd, FlatINodesInPath src,
+      FlatINodesInPath dst) throws QuotaExceededException {
+    // TODO
+//    if (!fsd.getFSNamesystem().isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
+//      // Do not check quota if edits log is still being processed
+//      return;
+//    }
+//    int i = 0;
+//    while(src.getINode(i) == dst.getINode(i)) { i++; }
+//    // src[i - 1] is the last common ancestor.
+//    BlockStoragePolicySuite bsps = fsd.getBlockStoragePolicySuite();
+//    final QuotaCounts delta = src.getLastINode().computeQuotaUsage(bsps);
+//
+//    // Reduce the required quota by dst that is being removed
+//    final INode dstINode = dst.getLastINode();
+//    if (dstINode != null) {
+//      delta.subtract(dstINode.computeQuotaUsage(bsps));
+//    }
+//    FSDirectory.verifyQuota(dst, dst.length() - 1, delta, src.getINode(i - 1));
   }
 
   /**
    * Checks file system limits (max component length and max directory items)
    * during a rename operation.
    */
-  static void verifyFsLimitsForRename(FSDirectory fsd, INodesInPath srcIIP,
-      INodesInPath dstIIP)
+  static void verifyFsLimitsForRename(
+      FSDirectory fsd, FlatINodesInPath src,
+      FlatINodesInPath dst)
       throws PathComponentTooLongException, MaxDirectoryItemsExceededException {
-    byte[] dstChildName = dstIIP.getLastLocalName();
-    final String parentPath = dstIIP.getParentPath();
-    fsd.verifyMaxComponentLength(dstChildName, parentPath);
-    // Do not enforce max directory items if renaming within same directory.
-    if (srcIIP.getINode(-2) != dstIIP.getINode(-2)) {
-      fsd.verifyMaxDirItems(dstIIP.getINode(-2).asDirectory(), parentPath);
-    }
+    // TODO
+//    byte[] dstChildName = dstIIP.getLastLocalName();
+//    final String parentPath = dstIIP.getParentPath();
+//    fsd.verifyMaxComponentLength(dstChildName, parentPath);
+//    // Do not enforce max directory items if renaming within same directory.
+//    if (srcIIP.getINode(-2) != dstIIP.getINode(-2)) {
+//      fsd.verifyMaxDirItems(dstIIP.getINode(-2).asDirectory(), parentPath);
+//    }
   }
 
   /**
@@ -130,9 +134,11 @@ class FSDirRenameOp {
     if (fsd.isDir(dst)) {
       dst += Path.SEPARATOR + new Path(src).getName();
     }
-    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
-    final INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
-    return unprotectedRenameTo(fsd, src, dst, srcIIP, dstIIP, timestamp);
+    try (ReplayTransaction tx = fsd.newReplayTransaction().begin()) {
+      Resolver.Result srcPaths = Resolver.resolveNoSymlink(tx, src);
+      Resolver.Result dstPaths = Resolver.resolveNoSymlink(tx, dst);
+      return unprotectedRenameTo(tx, fsd, srcPaths, dstPaths, timestamp);
+    }
   }
 
   /**
@@ -146,13 +152,13 @@ class FSDirRenameOp {
    * boolean, Options.Rename...)}
    */
   @Deprecated
-  static boolean unprotectedRenameTo(FSDirectory fsd, String src, String dst,
-      final INodesInPath srcIIP, final INodesInPath dstIIP, long timestamp)
+  static boolean unprotectedRenameTo(RWTransaction ntx,
+      FSDirectory fsd, Resolver.Result src, Resolver.Result dst,
+      long timestamp)
       throws IOException {
     assert fsd.hasWriteLock();
-    final INode srcInode = srcIIP.getLastINode();
     try {
-      validateRenameSource(srcIIP);
+      validateRenameSource(src);
     } catch (SnapshotException e) {
       throw e;
     } catch (IOException ignored) {
@@ -165,59 +171,45 @@ class FSDirRenameOp {
     }
 
     try {
-      validateDestination(src, dst, srcInode);
+      validateDestination(src, dst);
     } catch (IOException ignored) {
       return false;
     }
 
-    if (dstIIP.getLastINode() != null) {
+    if (dst.ok()) {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
           "failed to rename " + src + " to " + dst + " because destination " +
           "exists");
       return false;
     }
-    INode dstParent = dstIIP.getINode(-2);
-    if (dstParent == null) {
+    if (FlatNSUtil.hasNextLevelInPath(dst.src, dst.offset)) {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
-          "failed to rename " + src + " to " + dst + " because destination's " +
-          "parent does not exist");
+          "failed to rename " + src.src + " to " + dst.src + " because " +
+          "destination's parent does not exist");
       return false;
     }
 
-    fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src);
+    // TODO: Handle encrytpion
+    // fsd.ezManager.checkMoveValidity(src, dst, src);
     // Ensure dst has quota to accommodate rename
-    verifyFsLimitsForRename(fsd, srcIIP, dstIIP);
-    verifyQuotaForRename(fsd, srcIIP, dstIIP);
-
-    RenameOperation tx = new RenameOperation(fsd, src, dst, srcIIP, dstIIP);
+    verifyFsLimitsForRename(fsd, src.inodesInPath(), dst.inodesInPath());
+    verifyQuotaForRename(fsd, src.inodesInPath(), dst.inodesInPath());
 
-    boolean added = false;
+    RenameOperation tx = new RenameOperation(ntx, src, dst);
 
-    try {
-      // remove src
-      if (!tx.removeSrc4OldRename()) {
-        return false;
-      }
+    boolean added;
 
-      added = tx.addSourceToDestination();
-      if (added) {
-        if (NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug("DIR* FSDirectory" +
-              ".unprotectedRenameTo: " + src + " is renamed to " + dst);
-        }
-
-        tx.updateMtimeAndLease(timestamp);
-        tx.updateQuotasInSourceTree(fsd.getBlockStoragePolicySuite());
+    // remove src
+    if (!tx.removeSrc4OldRename(timestamp)) {
+      return false;
+    }
 
-        return true;
-      }
-    } finally {
-      if (!added) {
-        tx.restoreSource();
-      }
+    added = tx.addSourceToDestination(timestamp);
+    if (added) {
+      return true;
     }
     NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
-        "failed to rename " + src + " to " + dst);
+        "failed to rename " + src.src + " to " + dst.src);
     return false;
   }
 
@@ -232,61 +224,55 @@ class FSDirRenameOp {
     String dst = dstArg;
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options -" +
-          " " + src + " to " + dst);
+                                        " " + src + " to " + dst);
     }
     if (!DFSUtil.isValidName(dst)) {
       throw new InvalidPathException("Invalid name: " + dst);
     }
     final FSPermissionChecker pc = fsd.getPermissionChecker();
 
-    byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-    src = fsd.resolvePath(pc, src, srcComponents);
-    dst = fsd.resolvePath(pc, dst, dstComponents);
-    renameTo(fsd, pc, src, dst, collectedBlocks, logRetryCache, options);
-    INodesInPath dstIIP = fsd.getINodesInPath(dst, false);
-    HdfsFileStatus resultingStat = fsd.getAuditFileInfo(dstIIP);
-
-    return new AbstractMap.SimpleImmutableEntry<>(
-        collectedBlocks, resultingStat);
+    try (RWTransaction tx = fsd.newRWTransaction().begin()) {
+      Resolver.Result srcPaths = Resolver.resolve(tx, src);
+      Resolver.Result dstPaths = Resolver.resolve(tx, dst);
+      renameTo(tx, fsd, pc, srcPaths, dstPaths, collectedBlocks, logRetryCache,
+               options);
+      dstPaths = Resolver.resolve(tx, dst);
+      HdfsFileStatus resultingStat =
+          fsd.getAuditFileInfo(dstPaths.inodesInPath());
+      tx.commit();
+      return new AbstractMap.SimpleImmutableEntry<>(
+          collectedBlocks, resultingStat);
+    }
   }
 
-  /**
-   * @see {@link #unprotectedRenameTo(FSDirectory, String, String, INodesInPath,
-   * INodesInPath, long, BlocksMapUpdateInfo, Options.Rename...)}
-   */
-  static void renameTo(FSDirectory fsd, FSPermissionChecker pc, String src,
-      String dst, BlocksMapUpdateInfo collectedBlocks, boolean logRetryCache,
+  static void renameTo(RWTransaction tx, FSDirectory fsd, FSPermissionChecker
+      pc, Resolver.Result src, Resolver.Result dst,
+      BlocksMapUpdateInfo collectedBlocks, boolean logRetryCache,
       Options.Rename... options) throws IOException {
-    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
-    final INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
+    validateRenameSource(src);
     if (fsd.isPermissionEnabled()) {
       // Rename does not operate on link targets
       // Do not resolveLink when checking permissions of src and dst
       // Check write access to parent of src
-      fsd.checkPermission(pc, srcIIP, false, null, FsAction.WRITE, null, null,
-          false);
-      // Check write access to ancestor of dst
-      fsd.checkPermission(pc, dstIIP, false, FsAction.WRITE, null, null, null,
-          false);
+      fsd.checkPermission(pc, src.inodesInPath(), false, null, FsAction.WRITE, null,
+                          null, false);
+      // TODO: Check write access to ancestor of dst
+      fsd.checkPermission(pc, dst.inodesInPath(), false, FsAction.WRITE, null,
+                          null, null, false);
     }
 
     if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src + " to "
-          + dst);
+      NameNode.stateChangeLog.debug(
+          "DIR* FSDirectory.renameTo: " + src + " to " + dst);
     }
-    final long mtime = Time.now();
-    fsd.writeLock();
-    try {
-      if (unprotectedRenameTo(fsd, src, dst, srcIIP, dstIIP, mtime,
-          collectedBlocks, options)) {
-        FSDirDeleteOp.incrDeletedFileCount(1);
-      }
-    } finally {
-      fsd.writeUnlock();
+    final long mtime = now();
+
+    if (unprotectedRenameTo(tx, fsd, src, dst, mtime, collectedBlocks,
+                            options)) {
+      FSDirDeleteOp.incrDeletedFileCount(1);
     }
-    fsd.getEditLog().logRename(src, dst, mtime, logRetryCache, options);
+    tx.logRename(src.src, dst.src, mtime, logRetryCache, options);
   }
 
   /**
@@ -307,12 +293,15 @@ class FSDirRenameOp {
       Options.Rename... options)
       throws IOException {
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
-    final INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
-    unprotectedRenameTo(fsd, src, dst, srcIIP, dstIIP, timestamp,
-        collectedBlocks, options);
-    if (!collectedBlocks.getToDeleteList().isEmpty()) {
-      fsd.getFSNamesystem().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
+    boolean ret;
+    try (ReplayTransaction tx = fsd.newReplayTransaction().begin()) {
+      Resolver.Result srcPaths = Resolver.resolveNoSymlink(tx, src);
+      Resolver.Result dstPaths = Resolver.resolveNoSymlink(tx, dst);
+      ret = unprotectedRenameTo(tx, fsd, srcPaths, dstPaths,
+                                timestamp, collectedBlocks, options);
+      if (!collectedBlocks.getToDeleteList().isEmpty()) {
+        fsd.getFSNamesystem().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
+      }
     }
   }
 
@@ -329,48 +318,48 @@ class FSDirRenameOp {
    * @param options         Rename options
    * @return whether a file/directory gets overwritten in the dst path
    */
-  static boolean unprotectedRenameTo(FSDirectory fsd, String src, String dst,
-      final INodesInPath srcIIP, final INodesInPath dstIIP, long timestamp,
+  static boolean unprotectedRenameTo(
+      RWTransaction ntx, FSDirectory fsd, final Resolver.Result src,
+      final Resolver.Result dst, long timestamp,
       BlocksMapUpdateInfo collectedBlocks, Options.Rename... options)
       throws IOException {
-    assert fsd.hasWriteLock();
     boolean overwrite = options != null
         && Arrays.asList(options).contains(Options.Rename.OVERWRITE);
 
-    final String error;
-    final INode srcInode = srcIIP.getLastINode();
-    validateRenameSource(srcIIP);
-
     // validate the destination
     if (dst.equals(src)) {
       throw new FileAlreadyExistsException("The source " + src +
           " and destination " + dst + " are the same");
     }
-    validateDestination(src, dst, srcInode);
 
-    if (dstIIP.length() == 1) {
+    validateDestination(src, dst);
+
+    String error;
+    if (dst.ok() && dst.inodesInPath().length() == 1) {
       error = "rename destination cannot be the root";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
           error);
       throw new IOException(error);
     }
 
-    BlockStoragePolicySuite bsps = fsd.getBlockStoragePolicySuite();
-    fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src);
-    final INode dstInode = dstIIP.getLastINode();
-    List<INodeDirectory> snapshottableDirs = new ArrayList<>();
-    if (dstInode != null) { // Destination exists
-      validateOverwrite(src, dst, overwrite, srcInode, dstInode);
-      FSDirSnapshotOp.checkSnapshot(dstInode, snapshottableDirs);
+    // TODO: Handle encrytion zone
+    // fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src);
+
+    if (dst.ok()) { // Destination exists
+      // TODO: Validate overwrite
+      // validateOverwrite(ntx, src, dst, overwrite, srcInode, dstInode);
+      // TODO: Check snapshot
+      // FSDirSnapshotOp.checkSnapshot(dstInode, snapshottableDirs);
     }
 
-    INode dstParent = dstIIP.getINode(-2);
-    if (dstParent == null) {
+    if (FlatNSUtil.hasNextLevelInPath(dst.src, dst.offset)) {
       error = "rename destination parent " + dst + " not found.";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
           error);
       throw new FileNotFoundException(error);
     }
+
+    FlatINode dstParent = dst.getLastINode(-2);
     if (!dstParent.isDirectory()) {
       error = "rename destination parent " + dst + " is a file.";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
@@ -378,61 +367,36 @@ class FSDirRenameOp {
       throw new ParentNotDirectoryException(error);
     }
 
-    // Ensure dst has quota to accommodate rename
-    verifyFsLimitsForRename(fsd, srcIIP, dstIIP);
-    verifyQuotaForRename(fsd, srcIIP, dstIIP);
+    // TODO: Ensure dst has quota to accommodate rename
+    // verifyFsLimitsForRename(fsd, srcIIP, dstIIP);
+    // verifyQuotaForRename(fsd, srcIIP, dstIIP);
 
-    RenameOperation tx = new RenameOperation(fsd, src, dst, srcIIP, dstIIP);
+    RenameOperation tx = new RenameOperation(ntx, src, dst);
 
-    boolean undoRemoveSrc = true;
-    tx.removeSrc();
+    tx.removeSrc(timestamp);
 
-    boolean undoRemoveDst = false;
-    long removedNum = 0;
-    try {
-      if (dstInode != null) { // dst exists, remove it
-        removedNum = tx.removeDst();
-        if (removedNum != -1) {
-          undoRemoveDst = true;
-        }
-      }
-
-      // add src as dst to complete rename
-      if (tx.addSourceToDestination()) {
-        undoRemoveSrc = false;
-        if (NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
-              + src + " is renamed to " + dst);
-        }
-
-        tx.updateMtimeAndLease(timestamp);
+    List<Long> removedINodes = new ArrayList<>();
+    List<Long> removedUCFiles = new ArrayList<>();
+    if (dst.ok()) { // dst exists, remove it
+      tx.removeDst(collectedBlocks, removedINodes, removedUCFiles);
+    }
 
-        // Collect the blocks and remove the lease for previous dst
-        boolean filesDeleted = false;
-        if (undoRemoveDst) {
-          undoRemoveDst = false;
-          if (removedNum > 0) {
-            filesDeleted = tx.cleanDst(bsps, collectedBlocks);
-          }
-        }
+    // add src as dst to complete rename
+    if (tx.addSourceToDestination(timestamp)) {
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
+                                          + src + " is renamed to " + dst);
+      }
 
-        if (snapshottableDirs.size() > 0) {
-          // There are snapshottable directories (without snapshots) to be
-          // deleted. Need to update the SnapshotManager.
-          fsd.getFSNamesystem().removeSnapshottableDirs(snapshottableDirs);
-        }
+      // Collect the blocks and remove the lease for previous dst
+      boolean filesDeleted = false;
+      // TODO: Handle snapshots
 
-        tx.updateQuotasInSourceTree(bsps);
-        return filesDeleted;
-      }
-    } finally {
-      if (undoRemoveSrc) {
-        tx.restoreSource();
-      }
-      if (undoRemoveDst) { // Rename failed - restore dst
-        tx.restoreDst(bsps);
-      }
+      FSNamesystem fsn = fsd.getFSNamesystem();
+      fsn.removeLeases(removedUCFiles);
+      return filesDeleted;
     }
+
     NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
         "failed to rename " + src + " to " + dst);
     throw new IOException("rename from " + src + " to " + dst + " failed.");
@@ -444,315 +408,177 @@ class FSDirRenameOp {
    */
   @Deprecated
   @SuppressWarnings("deprecation")
-  private static boolean renameTo(FSDirectory fsd, FSPermissionChecker pc,
-      String src, String dst, boolean logRetryCache) throws IOException {
-    // Rename does not operate on link targets
-    // Do not resolveLink when checking permissions of src and dst
-    // Check write access to parent of src
-    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
+  private static boolean renameTo(
+      RWTransaction tx, FSDirectory fsd,
+      FSPermissionChecker pc, Resolver.Result src, Resolver.Result dst,
+      boolean logRetryCache)
+      throws IOException {
+
+    Resolver.Result dstArg = dst;
     // Note: We should not be doing this.  This is move() not renameTo().
-    final String actualDst = fsd.isDir(dst) ?
-        dst + Path.SEPARATOR + new Path(src).getName() : dst;
-    final INodesInPath dstIIP = fsd.getINodesInPath4Write(actualDst, false);
+    if (dst.ok() && dst.inodesInPath().getLastINode().isDirectory()) {
+      String actualDst = dst.src + Path.SEPARATOR + new Path(src.src).getName();
+      dst = Resolver.resolve(tx, actualDst);
+    }
+
     if (fsd.isPermissionEnabled()) {
-      fsd.checkPermission(pc, srcIIP, false, null, FsAction.WRITE, null, null,
-          false);
+      fsd.checkPermission(pc, src.inodesInPath(),
+                          false, null, FsAction.WRITE, null, null, false);
       // Check write access to ancestor of dst
-      fsd.checkPermission(pc, dstIIP, false, FsAction.WRITE, null, null,
-          null, false);
+      fsd.checkPermission(pc, dst.inodesInPath(), false, FsAction.WRITE,
+                          null, null, null, false);
     }
 
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src + " to "
           + dst);
     }
-    final long mtime = Time.now();
-    boolean stat = false;
-    fsd.writeLock();
-    try {
-      stat = unprotectedRenameTo(fsd, src, actualDst, srcIIP, dstIIP, mtime);
-    } finally {
-      fsd.writeUnlock();
-    }
+    final long mtime = now();
+    boolean stat = unprotectedRenameTo(tx, fsd, src, dst, mtime);
     if (stat) {
-      fsd.getEditLog().logRename(src, actualDst, mtime, logRetryCache);
+      fsd.getEditLog().logRename(src.src, dstArg.src, mtime, logRetryCache);
       return true;
     }
     return false;
   }
 
-  private static void validateDestination(
-      String src, String dst, INode srcInode)
-      throws IOException {
+  private static void validateDestination(Resolver.Result src, Resolver
+      .Result dst) throws IOException {
     String error;
-    if (srcInode.isSymlink() &&
-        dst.equals(srcInode.asSymlink().getSymlinkString())) {
-      throw new FileAlreadyExistsException("Cannot rename symlink " + src
-          + " to its target " + dst);
-    }
+    // TODO: Handle symlink
+//    if (srcInode.isSymlink() &&
+//        dst.equals(srcInode.asSymlink().getSymlinkString())) {
+//      throw new FileAlreadyExistsException("Cannot rename symlink " + src
+//                                               + " to its target " + dst);
+//    }
+
     // dst cannot be a directory or a file under src
-    if (dst.startsWith(src)
-        && dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
+    if (dst.inodesInPath().length() > src.inodesInPath().length()) {
+      List<Map.Entry<ByteString, FlatINode>> srcINodes = src.inodesInPath()
+          .inodes();
+      List<Map.Entry<ByteString, FlatINode>> dstINodes = dst.inodesInPath()
+          .inodes();
+      for (int i = 0, e = srcINodes.size(); i < e; ++i) {
+        if (srcINodes.get(i).getValue().id() != dstINodes.get(i).getValue()
+            .id()) {
+          return;
+        }
+      }
       error = "Rename destination " + dst
           + " is a directory or file under source " + src;
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
+                                       + error);
       throw new IOException(error);
     }
   }
 
-  private static void validateOverwrite(
-      String src, String dst, boolean overwrite, INode srcInode, INode dstInode)
+  private static void validateOverwrite(RWTransaction tx,
+      String src, String dst, boolean overwrite,
+      FlatINode srcInode, FlatINode dstInode)
       throws IOException {
     String error;// It's OK to rename a file to a symlink and vice versa
     if (dstInode.isDirectory() != srcInode.isDirectory()) {
       error = "Source " + src + " and destination " + dst
           + " must both be directories";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
+                                       + error);
       throw new IOException(error);
     }
     if (!overwrite) { // If destination exists, overwrite flag must be true
       error = "rename destination " + dst + " already exists";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
+                                       + error);
       throw new FileAlreadyExistsException(error);
     }
     if (dstInode.isDirectory()) {
-      final ReadOnlyList<INode> children = dstInode.asDirectory()
-          .getChildrenList(Snapshot.CURRENT_STATE_ID);
-      if (!children.isEmpty()) {
+      boolean hasChildren = !tx.childrenView(dstInode.id()).isEmpty();
+      if (hasChildren) {
         error = "rename destination directory is not empty: " + dst;
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
+                                         + error);
         throw new IOException(error);
       }
     }
   }
 
-  private static void validateRenameSource(INodesInPath srcIIP)
+  private static void validateRenameSource(Resolver.Result paths)
       throws IOException {
     String error;
-    final INode srcInode = srcIIP.getLastINode();
-    // validate source
-    if (srcInode == null) {
-      error = "rename source " + srcIIP.getPath() + " is not found.";
+    if (paths.invalidPath()) {
+      throw new InvalidPathException(paths.src);
+    } else if (paths.notFound()) {
+      error = "rename source " + paths.src + " is not found.";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
+                                       + error);
       throw new FileNotFoundException(error);
-    }
-    if (srcIIP.length() == 1) {
+    } else if (paths.inodesInPath().length() == 1) {
       error = "rename source cannot be the root";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
+                                       + error);
       throw new IOException(error);
     }
-    // srcInode and its subtree cannot contain snapshottable directories with
-    // snapshots
-    FSDirSnapshotOp.checkSnapshot(srcInode, null);
+    // TODO: srcInode and its subtree cannot contain snapshottable
+    // directories with snapshots
   }
 
   private static class RenameOperation {
-    private final FSDirectory fsd;
-    private INodesInPath srcIIP;
-    private final INodesInPath srcParentIIP;
-    private INodesInPath dstIIP;
-    private final INodesInPath dstParentIIP;
-    private final String src;
-    private final String dst;
-    private final INodeReference.WithCount withCount;
-    private final int srcRefDstSnapshot;
-    private final INodeDirectory srcParent;
-    private final byte[] srcChildName;
-    private final boolean isSrcInSnapshot;
-    private final boolean srcChildIsReference;
-    private final QuotaCounts oldSrcCounts;
-    private INode srcChild;
-    private INode oldDstChild;
-
-    RenameOperation(FSDirectory fsd, String src, String dst,
-                    INodesInPath srcIIP, INodesInPath dstIIP)
-        throws QuotaExceededException {
-      this.fsd = fsd;
+    private final RWTransaction tx;
+    private final Resolver.Result src;
+    private final Resolver.Result dst;
+    private final ByteString srclocalName;
+
+    RenameOperation(
+        RWTransaction tx, Resolver.Result src,
+        Resolver.Result dst) {
+      this.tx = tx;
       this.src = src;
       this.dst = dst;
-      this.srcIIP = srcIIP;
-      this.dstIIP = dstIIP;
-      this.srcParentIIP = srcIIP.getParentINodesInPath();
-      this.dstParentIIP = dstIIP.getParentINodesInPath();
-
-      BlockStoragePolicySuite bsps = fsd.getBlockStoragePolicySuite();
-      srcChild = this.srcIIP.getLastINode();
-      srcChildName = srcChild.getLocalNameBytes();
-      final int srcLatestSnapshotId = srcIIP.getLatestSnapshotId();
-      isSrcInSnapshot = srcChild.isInLatestSnapshot(srcLatestSnapshotId);
-      srcChildIsReference = srcChild.isReference();
-      srcParent = this.srcIIP.getINode(-2).asDirectory();
-
-      // Record the snapshot on srcChild. After the rename, before any new
-      // snapshot is taken on the dst tree, changes will be recorded in the
-      // latest snapshot of the src tree.
-      if (isSrcInSnapshot) {
-        srcChild.recordModification(srcLatestSnapshotId);
-      }
-
-      // check srcChild for reference
-      srcRefDstSnapshot = srcChildIsReference ?
-          srcChild.asReference().getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
-      oldSrcCounts = new QuotaCounts.Builder().build();
-      if (isSrcInSnapshot) {
-        final INodeReference.WithName withName = srcParent
-            .replaceChild4ReferenceWithName(srcChild, srcLatestSnapshotId);
-        withCount = (INodeReference.WithCount) withName.getReferredINode();
-        srcChild = withName;
-        this.srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1,
-            srcChild);
-        // get the counts before rename
-        oldSrcCounts.add(withCount.getReferredINode().computeQuotaUsage(bsps));
-      } else if (srcChildIsReference) {
-        // srcChild is reference but srcChild is not in latest snapshot
-        withCount = (INodeReference.WithCount) srcChild.asReference()
-            .getReferredINode();
-      } else {
-        withCount = null;
-      }
+      srclocalName = src.inodesInPath().inodes().get(
+          src.inodesInPath().length() - 1).getKey();
     }
 
-    long removeSrc() throws IOException {
-      long removedNum = fsd.removeLastINode(srcIIP);
-      if (removedNum == -1) {
-        String error = "Failed to rename " + src + " to " + dst +
-            " because the source can not be removed";
-        NameNode.stateChangeLog.warn("DIR* FSDirRenameOp.unprotectedRenameTo:" +
-            error);
-        throw new IOException(error);
-      } else {
-        // update the quota count if necessary
-        fsd.updateCountForDelete(srcChild, srcIIP);
-        srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1, null);
-        return removedNum;
-      }
+    long removeSrc(long mtime) throws IOException {
+      FlatINode parent = src.inodesInPath().getLastINode(-2);
+      ByteString newParent = new FlatINode.Builder()
+          .mergeFrom(parent).mtime(mtime).build();
+      tx.deleteChild(parent.id(), srclocalName.asReadOnlyByteBuffer());
+      tx.putINode(parent.id(), newParent);
+      return 1;
+      // TODO: Handle quota
     }
 
-    boolean removeSrc4OldRename() {
-      final long removedSrc = fsd.removeLastINode(srcIIP);
-      if (removedSrc == -1) {
-        NameNode.stateChangeLog.warn("DIR* FSDirRenameOp.unprotectedRenameTo: "
-            + "failed to rename " + src + " to " + dst + " because the source" +
-            " can not be removed");
-        return false;
-      } else {
-        // update the quota count if necessary
-        fsd.updateCountForDelete(srcChild, srcIIP);
-        srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1, null);
+    boolean removeSrc4OldRename(long mtime) {
+      try {
+        removeSrc(mtime);
         return true;
+      } catch (IOException ignored) {
+        return false;
       }
     }
 
-    long removeDst() {
-      long removedNum = fsd.removeLastINode(dstIIP);
-      if (removedNum != -1) {
-        oldDstChild = dstIIP.getLastINode();
-        // update the quota count if necessary
-        fsd.updateCountForDelete(oldDstChild, dstIIP);
-        dstIIP = INodesInPath.replace(dstIIP, dstIIP.length() - 1, null);
-      }
-      return removedNum;
-    }
-
-    boolean addSourceToDestination() {
-      final INode dstParent = dstParentIIP.getLastINode();
-      final byte[] dstChildName = dstIIP.getLastLocalName();
-      final INode toDst;
-      if (withCount == null) {
-        srcChild.setLocalName(dstChildName);
-        toDst = srcChild;
-      } else {
-        withCount.getReferredINode().setLocalName(dstChildName);
-        toDst = new INodeReference.DstReference(dstParent.asDirectory(),
-            withCount, dstIIP.getLatestSnapshotId());
-      }
-      return fsd.addLastINodeNoQuotaCheck(dstParentIIP, toDst) != null;
-    }
-
-    void updateMtimeAndLease(long timestamp) throws QuotaExceededException {
-      srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshotId());
-      final INode dstParent = dstParentIIP.getLastINode();
-      dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshotId());
-    }
-
-    void restoreSource() throws QuotaExceededException {
-      // Rename failed - restore src
-      final INode oldSrcChild = srcChild;
-      // put it back
-      if (withCount == null) {
-        srcChild.setLocalName(srcChildName);
-      } else if (!srcChildIsReference) { // src must be in snapshot
-        // the withCount node will no longer be used thus no need to update
-        // its reference number here
-        srcChild = withCount.getReferredINode();
-        srcChild.setLocalName(srcChildName);
-      } else {
-        withCount.removeReference(oldSrcChild.asReference());
-        srcChild = new INodeReference.DstReference(srcParent, withCount,
-            srcRefDstSnapshot);
-        withCount.getReferredINode().setLocalName(srcChildName);
-      }
-
-      if (isSrcInSnapshot) {
-        srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild);
-      } else {
-        // srcParent is not an INodeDirectoryWithSnapshot, we only need to add
-        // the srcChild back
-        fsd.addLastINodeNoQuotaCheck(srcParentIIP, srcChild);
-      }
-    }
-
-    void restoreDst(BlockStoragePolicySuite bsps) throws QuotaExceededException {
-      Preconditions.checkState(oldDstChild != null);
-      final INodeDirectory dstParent = dstParentIIP.getLastINode().asDirectory();
-      if (dstParent.isWithSnapshot()) {
-        dstParent.undoRename4DstParent(bsps, oldDstChild, dstIIP.getLatestSnapshotId());
-      } else {
-        fsd.addLastINodeNoQuotaCheck(dstParentIIP, oldDstChild);
-      }
-      if (oldDstChild != null && oldDstChild.isReference()) {
-        final INodeReference removedDstRef = oldDstChild.asReference();
-        final INodeReference.WithCount wc = (INodeReference.WithCount)
-            removedDstRef.getReferredINode().asReference();
-        wc.addReference(removedDstRef);
-      }
-    }
-
-    boolean cleanDst(BlockStoragePolicySuite bsps, BlocksMapUpdateInfo collectedBlocks)
-        throws QuotaExceededException {
-      Preconditions.checkState(oldDstChild != null);
-      List<INode> removedINodes = new ChunkedArrayList<>();
-      List<Long> removedUCFiles = new ChunkedArrayList<>();
-      INode.ReclaimContext context = new INode.ReclaimContext(bsps,
-          collectedBlocks, removedINodes, removedUCFiles);
-      final boolean filesDeleted;
-      if (!oldDstChild.isInLatestSnapshot(dstIIP.getLatestSnapshotId())) {
-        oldDstChild.destroyAndCollectBlocks(context);
-        filesDeleted = true;
-      } else {
-        oldDstChild.cleanSubtree(context, Snapshot.CURRENT_STATE_ID,
-            dstIIP.getLatestSnapshotId());
-        filesDeleted = context.quotaDelta().getNsDelta() >= 0;
-      }
-      fsd.getFSNamesystem().removeLeasesAndINodes(
-          removedUCFiles, removedINodes, false);
-      return filesDeleted;
-    }
-
-    void updateQuotasInSourceTree(BlockStoragePolicySuite bsps) throws QuotaExceededException {
-      // update the quota usage in src tree
-      if (isSrcInSnapshot) {
-        // get the counts after rename
-        QuotaCounts newSrcCounts = srcChild.computeQuotaUsage(bsps, false);
-        newSrcCounts.subtract(oldSrcCounts);
-        srcParent.addSpaceConsumed(newSrcCounts, false);
+    long removeDst(BlocksMapUpdateInfo removedBlocks,
+        List<Long> removedINodes, List<Long> removedUCFiles) {
+      try {
+        long deleted = FSDirDeleteOp.delete(
+            tx, dst, removedBlocks, removedUCFiles, now());
+        return deleted;
+      } catch (IOException ignored) {
       }
+      return -1;
+    }
+
+    boolean addSourceToDestination(long mtime) {
+      FlatINode parent = dst.ok()
+          ? dst.inodesInPath().getLastINode(-2)
+          : dst.inodesInPath().getLastINode();
+      ByteString localName = ByteString.copyFromUtf8(
+          FlatNSUtil.getNextComponent(dst.src, dst.offset));
+      ByteString newParent = new FlatINode.Builder()
+          .mergeFrom(parent).mtime(mtime).build();
+      tx.putINode(parent.id(), newParent);
+      tx.putChild(parent.id(), localName.asReadOnlyByteBuffer(),
+                  src.inodesInPath().getLastINode().id());
+      return true;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a82dbd3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index e3b2eba..0e274b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -417,7 +417,8 @@ class FSDirStatAndListingOp {
 
     if (node.isFile()) {
       FlatINodeFileFeature f = node.feature(FlatINodeFileFeature.class);
-      size = f.fileSize();
+      size =
+          f.fileSize();
       replication = f.replication();
       blocksize = f.blockSize();
       isEncrypted = false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a82dbd3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetBlockLocations.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetBlockLocations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetBlockLocations.java
index ce5b1dd..24e97cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetBlockLocations.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetBlockLocations.java
@@ -93,9 +93,7 @@ public class TestGetBlockLocations {
       public Void answer(InvocationOnMock invocation) throws Throwable {
         invocation.callRealMethod();
         if (!renamed[0]) {
-          FSDirRenameOp.renameTo(fsd, fsd.getPermissionChecker(), FILE_PATH,
-                                 DST_PATH, new INode.BlocksMapUpdateInfo(),
-                                 false);
+          FSDirRenameOp.renameToInt(fsd, FILE_PATH, DST_PATH, false);
           renamed[0] = true;
         }
         return null;