You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/12/03 06:24:02 UTC

[30/50] [abbrv] hadoop git commit: HDFS-7438. Consolidate the implementation of rename() into a single class. Contributed by Haohui Mai.

HDFS-7438. Consolidate the implementation of rename() into a single class. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/04269940
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/04269940
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/04269940

Branch: refs/heads/HDFS-EC
Commit: 042699401ebe5186fa5556a79f8f9a206e5ebcd7
Parents: 0af44ea
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Dec 1 21:48:28 2014 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Mon Dec 1 21:48:28 2014 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/namenode/FSDirRenameOp.java     | 691 +++++++++++++++++++
 .../hdfs/server/namenode/FSDirectory.java       | 525 +-------------
 .../hdfs/server/namenode/FSEditLogLoader.java   |   5 +-
 .../hadoop/hdfs/server/namenode/FSImage.java    |   5 -
 .../hdfs/server/namenode/FSNamesystem.java      | 154 +----
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |   8 +
 .../apache/hadoop/hdfs/TestRenameWhileOpen.java |   2 +-
 .../hdfs/server/namenode/NameNodeAdapter.java   |   3 +-
 .../server/namenode/TestNameNodeRecovery.java   |   3 +-
 10 files changed, 743 insertions(+), 656 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/04269940/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index d5c1fe5..52a439f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -410,6 +410,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7450. Consolidate the implementation of GetFileInfo(), GetListings() and
     GetContentSummary() into a single class. (wheat9)
 
+    HDFS-7438. Consolidate the implementation of rename() into a single class.
+    (wheat9)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04269940/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
new file mode 100644
index 0000000..f371f05
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
@@ -0,0 +1,691 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.util.ChunkedArrayList;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.util.Time.now;
+
+class FSDirRenameOp {
+  static RenameOldResult renameToInt(
+      FSDirectory fsd, final String srcArg, final String dstArg,
+      boolean logRetryCache)
+      throws IOException {
+    String src = srcArg;
+    String dst = dstArg;
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
+          " to " + dst);
+    }
+    if (!DFSUtil.isValidName(dst)) {
+      throw new IOException("Invalid name: " + dst);
+    }
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+
+    byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
+    HdfsFileStatus resultingStat = null;
+    src = fsd.resolvePath(pc, src, srcComponents);
+    dst = fsd.resolvePath(pc, dst, dstComponents);
+    @SuppressWarnings("deprecation")
+    final boolean status = renameToInternal(fsd, pc, src, dst, logRetryCache);
+    if (status) {
+      resultingStat = fsd.getAuditFileInfo(dst, false);
+    }
+    return new RenameOldResult(status, resultingStat);
+  }
+
+  /**
+   * Change a path name
+   *
+   * @param fsd FSDirectory
+   * @param src source path
+   * @param dst destination path
+   * @return true if rename succeeds; false otherwise
+   * @deprecated See {@link #renameToInt(FSDirectory, String, String,
+   * boolean, Options.Rename...)}
+   */
+  @Deprecated
+  static boolean unprotectedRenameTo(
+      FSDirectory fsd, String src, String dst, long timestamp)
+      throws IOException {
+    assert fsd.hasWriteLock();
+    INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
+    final INode srcInode = srcIIP.getLastINode();
+    try {
+      validateRenameSource(src, srcIIP);
+    } catch (SnapshotException e) {
+      throw e;
+    } catch (IOException ignored) {
+      return false;
+    }
+
+    if (fsd.isDir(dst)) {
+      dst += Path.SEPARATOR + new Path(src).getName();
+    }
+
+    // validate the destination
+    if (dst.equals(src)) {
+      return true;
+    }
+
+    try {
+      validateDestination(src, dst, srcInode);
+    } catch (IOException ignored) {
+      return false;
+    }
+
+    INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
+    if (dstIIP.getLastINode() != null) {
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
+          "failed to rename " + src + " to " + dst + " because destination " +
+          "exists");
+      return false;
+    }
+    INode dstParent = dstIIP.getINode(-2);
+    if (dstParent == null) {
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
+          "failed to rename " + src + " to " + dst + " because destination's " +
+          "parent does not exist");
+      return false;
+    }
+
+    fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src);
+    // Ensure dst has quota to accommodate rename
+    fsd.verifyFsLimitsForRename(srcIIP, dstIIP);
+    fsd.verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
+
+    RenameOperation tx = new RenameOperation(fsd, src, dst, srcIIP, dstIIP);
+
+    boolean added = false;
+
+    try {
+      // remove src
+      final long removedSrc = fsd.removeLastINode(srcIIP);
+      if (removedSrc == -1) {
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + "failed to rename " + src + " to " + dst + " because the source" +
+            " can not be removed");
+        return false;
+      }
+
+      added = tx.addSourceToDestination();
+      if (added) {
+        if (NameNode.stateChangeLog.isDebugEnabled()) {
+          NameNode.stateChangeLog.debug("DIR* FSDirectory" +
+              ".unprotectedRenameTo: " + src + " is renamed to " + dst);
+        }
+
+        tx.updateMtimeAndLease(timestamp);
+        tx.updateQuotasInSourceTree();
+
+        return true;
+      }
+    } finally {
+      if (!added) {
+        tx.restoreSource();
+      }
+    }
+    NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
+        "failed to rename " + src + " to " + dst);
+    return false;
+  }
+
+  /**
+   * The new rename which has the POSIX semantic.
+   */
+  static Map.Entry<BlocksMapUpdateInfo, HdfsFileStatus> renameToInt(
+      FSDirectory fsd, final String srcArg, final String dstArg,
+      boolean logRetryCache, Options.Rename... options)
+      throws IOException {
+    String src = srcArg;
+    String dst = dstArg;
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options -" +
+          " " + src + " to " + dst);
+    }
+    if (!DFSUtil.isValidName(dst)) {
+      throw new InvalidPathException("Invalid name: " + dst);
+    }
+    final FSPermissionChecker pc = fsd.getPermissionChecker();
+
+    byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
+    BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
+    src = fsd.resolvePath(pc, src, srcComponents);
+    dst = fsd.resolvePath(pc, dst, dstComponents);
+    renameToInternal(fsd, pc, src, dst, logRetryCache, collectedBlocks,
+        options);
+    HdfsFileStatus resultingStat = fsd.getAuditFileInfo(dst, false);
+
+    return new AbstractMap.SimpleImmutableEntry<BlocksMapUpdateInfo,
+        HdfsFileStatus>(collectedBlocks, resultingStat);
+  }
+
+  /**
+   * @see #unprotectedRenameTo(FSDirectory, String, String, long,
+   * org.apache.hadoop.fs.Options.Rename...)
+   */
+  static void renameTo(
+      FSDirectory fsd, String src, String dst, long mtime,
+      BlocksMapUpdateInfo collectedBlocks, Options.Rename... options)
+      throws IOException {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src + " to "
+          + dst);
+    }
+    fsd.writeLock();
+    try {
+      if (unprotectedRenameTo(fsd, src, dst, mtime, collectedBlocks, options)) {
+        fsd.getFSNamesystem().incrDeletedFileCount(1);
+      }
+    } finally {
+      fsd.writeUnlock();
+    }
+  }
+
+  /**
+   * Rename src to dst.
+   * <br>
+   * Note: This is to be used by {@link org.apache.hadoop.hdfs.server
+   * .namenode.FSEditLog} only.
+   * <br>
+   *
+   * @param fsd       FSDirectory
+   * @param src       source path
+   * @param dst       destination path
+   * @param timestamp modification time
+   * @param options   Rename options
+   */
+  static boolean unprotectedRenameTo(
+      FSDirectory fsd, String src, String dst, long timestamp,
+      Options.Rename... options)
+      throws IOException {
+    BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
+    boolean ret = unprotectedRenameTo(fsd, src, dst, timestamp,
+        collectedBlocks, options);
+    if (!collectedBlocks.getToDeleteList().isEmpty()) {
+      fsd.getFSNamesystem().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
+    }
+    return ret;
+  }
+
+  /**
+   * Rename src to dst.
+   * See {@link DistributedFileSystem#rename(Path, Path, Options.Rename...)}
+   * for details related to rename semantics and exceptions.
+   *
+   * @param fsd             FSDirectory
+   * @param src             source path
+   * @param dst             destination path
+   * @param timestamp       modification time
+   * @param collectedBlocks blocks to be removed
+   * @param options         Rename options
+   */
+  static boolean unprotectedRenameTo(
+      FSDirectory fsd, String src, String dst, long timestamp,
+      BlocksMapUpdateInfo collectedBlocks, Options.Rename... options)
+      throws IOException {
+    assert fsd.hasWriteLock();
+    boolean overwrite = options != null
+        && Arrays.asList(options).contains(Options.Rename.OVERWRITE);
+
+    final String error;
+    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
+    final INode srcInode = srcIIP.getLastINode();
+    validateRenameSource(src, srcIIP);
+
+    // validate the destination
+    if (dst.equals(src)) {
+      throw new FileAlreadyExistsException("The source " + src +
+          " and destination " + dst + " are the same");
+    }
+    validateDestination(src, dst, srcInode);
+
+    INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
+    if (dstIIP.getINodes().length == 1) {
+      error = "rename destination cannot be the root";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
+          error);
+      throw new IOException(error);
+    }
+
+    fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src);
+    final INode dstInode = dstIIP.getLastINode();
+    List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
+    if (dstInode != null) { // Destination exists
+      validateOverwrite(src, dst, overwrite, srcInode, dstInode);
+      FSDirSnapshotOp.checkSnapshot(dstInode, snapshottableDirs);
+    }
+
+    INode dstParent = dstIIP.getINode(-2);
+    if (dstParent == null) {
+      error = "rename destination parent " + dst + " not found.";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
+          error);
+      throw new FileNotFoundException(error);
+    }
+    if (!dstParent.isDirectory()) {
+      error = "rename destination parent " + dst + " is a file.";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
+          error);
+      throw new ParentNotDirectoryException(error);
+    }
+
+    // Ensure dst has quota to accommodate rename
+    fsd.verifyFsLimitsForRename(srcIIP, dstIIP);
+    fsd.verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
+
+    RenameOperation tx = new RenameOperation(fsd, src, dst, srcIIP, dstIIP);
+
+    boolean undoRemoveSrc = true;
+    final long removedSrc = fsd.removeLastINode(srcIIP);
+    if (removedSrc == -1) {
+      error = "Failed to rename " + src + " to " + dst +
+          " because the source can not be removed";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
+          error);
+      throw new IOException(error);
+    }
+
+    boolean undoRemoveDst = false;
+    INode removedDst = null;
+    long removedNum = 0;
+    try {
+      if (dstInode != null) { // dst exists remove it
+        if ((removedNum = fsd.removeLastINode(dstIIP)) != -1) {
+          removedDst = dstIIP.getLastINode();
+          undoRemoveDst = true;
+        }
+      }
+
+      // add src as dst to complete rename
+      if (tx.addSourceToDestination()) {
+        undoRemoveSrc = false;
+        if (NameNode.stateChangeLog.isDebugEnabled()) {
+          NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
+              + src + " is renamed to " + dst);
+        }
+
+        tx.updateMtimeAndLease(timestamp);
+
+        // Collect the blocks and remove the lease for previous dst
+        boolean filesDeleted = false;
+        if (removedDst != null) {
+          undoRemoveDst = false;
+          if (removedNum > 0) {
+            List<INode> removedINodes = new ChunkedArrayList<INode>();
+            if (!removedDst.isInLatestSnapshot(dstIIP.getLatestSnapshotId())) {
+              removedDst.destroyAndCollectBlocks(collectedBlocks,
+                  removedINodes);
+              filesDeleted = true;
+            } else {
+              filesDeleted = removedDst.cleanSubtree(
+                  Snapshot.CURRENT_STATE_ID, dstIIP.getLatestSnapshotId(),
+                  collectedBlocks, removedINodes, true)
+                  .get(Quota.NAMESPACE) >= 0;
+            }
+            fsd.getFSNamesystem().removePathAndBlocks(src, null,
+                removedINodes, false);
+          }
+        }
+
+        if (snapshottableDirs.size() > 0) {
+          // There are snapshottable directories (without snapshots) to be
+          // deleted. Need to update the SnapshotManager.
+          fsd.getFSNamesystem().removeSnapshottableDirs(snapshottableDirs);
+        }
+
+        tx.updateQuotasInSourceTree();
+        return filesDeleted;
+      }
+    } finally {
+      if (undoRemoveSrc) {
+        tx.restoreSource();
+      }
+
+      if (undoRemoveDst) {
+        // Rename failed - restore dst
+        if (dstParent.isDirectory() &&
+            dstParent.asDirectory().isWithSnapshot()) {
+          dstParent.asDirectory().undoRename4DstParent(removedDst,
+              dstIIP.getLatestSnapshotId());
+        } else {
+          fsd.addLastINodeNoQuotaCheck(dstIIP, removedDst);
+        }
+        assert removedDst != null;
+        if (removedDst.isReference()) {
+          final INodeReference removedDstRef = removedDst.asReference();
+          final INodeReference.WithCount wc = (INodeReference.WithCount)
+              removedDstRef.getReferredINode().asReference();
+          wc.addReference(removedDstRef);
+        }
+      }
+    }
+    NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
+        "failed to rename " + src + " to " + dst);
+    throw new IOException("rename from " + src + " to " + dst + " failed.");
+  }
+
+  /**
+   * @see #unprotectedRenameTo(FSDirectory, String, String, long)
+   * @deprecated Use {@link #renameToInt(FSDirectory, String, String,
+   * boolean, Options.Rename...)}
+   */
+  @Deprecated
+  @SuppressWarnings("deprecation")
+  private static boolean renameTo(
+      FSDirectory fsd, String src, String dst, long mtime)
+      throws IOException {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src + " to "
+          + dst);
+    }
+    boolean stat = false;
+    fsd.writeLock();
+    try {
+      stat = unprotectedRenameTo(fsd, src, dst, mtime);
+    } finally {
+      fsd.writeUnlock();
+    }
+    return stat;
+  }
+
+  /**
+   * @deprecated See {@link #renameTo(FSDirectory, String, String, long)}
+   */
+  @Deprecated
+  private static boolean renameToInternal(
+      FSDirectory fsd, FSPermissionChecker pc, String src, String dst,
+      boolean logRetryCache)
+      throws IOException {
+    if (fsd.isPermissionEnabled()) {
+      //We should not be doing this.  This is move() not renameTo().
+      //but for now,
+      //NOTE: yes, this is bad!  it's assuming much lower level behavior
+      //      of rewriting the dst
+      String actualdst = fsd.isDir(dst) ? dst + Path.SEPARATOR + new Path
+          (src).getName() : dst;
+      // Rename does not operates on link targets
+      // Do not resolveLink when checking permissions of src and dst
+      // Check write access to parent of src
+      fsd.checkPermission(pc, src, false, null, FsAction.WRITE, null, null,
+          false, false);
+      // Check write access to ancestor of dst
+      fsd.checkPermission(pc, actualdst, false, FsAction.WRITE, null, null,
+          null, false, false);
+    }
+
+    long mtime = now();
+    @SuppressWarnings("deprecation")
+    final boolean stat = renameTo(fsd, src, dst, mtime);
+    if (stat) {
+      fsd.getEditLog().logRename(src, dst, mtime, logRetryCache);
+      return true;
+    }
+    return false;
+  }
+
+  private static void renameToInternal(
+      FSDirectory fsd, FSPermissionChecker pc, String src, String dst,
+      boolean logRetryCache, BlocksMapUpdateInfo collectedBlocks,
+      Options.Rename... options)
+      throws IOException {
+    if (fsd.isPermissionEnabled()) {
+      // Rename does not operates on link targets
+      // Do not resolveLink when checking permissions of src and dst
+      // Check write access to parent of src
+      fsd.checkPermission(pc, src, false, null, FsAction.WRITE, null, null,
+          false, false);
+      // Check write access to ancestor of dst
+      fsd.checkPermission(pc, dst, false, FsAction.WRITE, null, null, null,
+          false, false);
+    }
+
+    long mtime = now();
+    renameTo(fsd, src, dst, mtime, collectedBlocks, options);
+    fsd.getEditLog().logRename(src, dst, mtime, logRetryCache, options);
+  }
+
+  private static void validateDestination(
+      String src, String dst, INode srcInode)
+      throws IOException {
+    String error;
+    if (srcInode.isSymlink() &&
+        dst.equals(srcInode.asSymlink().getSymlinkString())) {
+      throw new FileAlreadyExistsException("Cannot rename symlink " + src
+          + " to its target " + dst);
+    }
+    // dst cannot be a directory or a file under src
+    if (dst.startsWith(src)
+        && dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
+      error = "Rename destination " + dst
+          + " is a directory or file under source " + src;
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+  }
+
+  private static void validateOverwrite(
+      String src, String dst, boolean overwrite, INode srcInode, INode dstInode)
+      throws IOException {
+    String error;// It's OK to rename a file to a symlink and vice versa
+    if (dstInode.isDirectory() != srcInode.isDirectory()) {
+      error = "Source " + src + " and destination " + dst
+          + " must both be directories";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+    if (!overwrite) { // If destination exists, overwrite flag must be true
+      error = "rename destination " + dst + " already exists";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new FileAlreadyExistsException(error);
+    }
+    if (dstInode.isDirectory()) {
+      final ReadOnlyList<INode> children = dstInode.asDirectory()
+          .getChildrenList(Snapshot.CURRENT_STATE_ID);
+      if (!children.isEmpty()) {
+        error = "rename destination directory is not empty: " + dst;
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + error);
+        throw new IOException(error);
+      }
+    }
+  }
+
+  private static void validateRenameSource(String src, INodesInPath srcIIP)
+      throws IOException {
+    String error;
+    final INode srcInode = srcIIP.getLastINode();
+    // validate source
+    if (srcInode == null) {
+      error = "rename source " + src + " is not found.";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new FileNotFoundException(error);
+    }
+    if (srcIIP.getINodes().length == 1) {
+      error = "rename source cannot be the root";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+    // srcInode and its subtree cannot contain snapshottable directories with
+    // snapshots
+    FSDirSnapshotOp.checkSnapshot(srcInode, null);
+  }
+
+  private static class RenameOperation {
+    private final FSDirectory fsd;
+    private final INodesInPath srcIIP;
+    private final INodesInPath dstIIP;
+    private final String src;
+    private final String dst;
+    private final INodeReference.WithCount withCount;
+    private final int srcRefDstSnapshot;
+    private final INodeDirectory srcParent;
+    private final byte[] srcChildName;
+    private final boolean isSrcInSnapshot;
+    private final boolean srcChildIsReference;
+    private final Quota.Counts oldSrcCounts;
+    private INode srcChild;
+
+    RenameOperation(FSDirectory fsd, String src, String dst,
+                    INodesInPath srcIIP, INodesInPath dstIIP)
+        throws QuotaExceededException {
+      this.fsd = fsd;
+      this.srcIIP = srcIIP;
+      this.dstIIP = dstIIP;
+      this.src = src;
+      this.dst = dst;
+      srcChild = srcIIP.getLastINode();
+      srcChildName = srcChild.getLocalNameBytes();
+      isSrcInSnapshot = srcChild.isInLatestSnapshot(srcIIP
+          .getLatestSnapshotId());
+      srcChildIsReference = srcChild.isReference();
+      srcParent = srcIIP.getINode(-2).asDirectory();
+
+      // Record the snapshot on srcChild. After the rename, before any new
+      // snapshot is taken on the dst tree, changes will be recorded in the
+      // latest snapshot of the src tree.
+      if (isSrcInSnapshot) {
+        srcChild.recordModification(srcIIP.getLatestSnapshotId());
+      }
+
+      // check srcChild for reference
+      srcRefDstSnapshot = srcChildIsReference ?
+          srcChild.asReference().getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
+      oldSrcCounts = Quota.Counts.newInstance();
+      if (isSrcInSnapshot) {
+        final INodeReference.WithName withName =
+            srcIIP.getINode(-2).asDirectory().replaceChild4ReferenceWithName(
+                srcChild, srcIIP.getLatestSnapshotId());
+        withCount = (INodeReference.WithCount) withName.getReferredINode();
+        srcChild = withName;
+        srcIIP.setLastINode(srcChild);
+        // get the counts before rename
+        withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
+      } else if (srcChildIsReference) {
+        // srcChild is reference but srcChild is not in latest snapshot
+        withCount = (INodeReference.WithCount) srcChild.asReference()
+            .getReferredINode();
+      } else {
+        withCount = null;
+      }
+    }
+
+    boolean addSourceToDestination() {
+      final INode dstParent = dstIIP.getINode(-2);
+      srcChild = srcIIP.getLastINode();
+      final byte[] dstChildName = dstIIP.getLastLocalName();
+      final INode toDst;
+      if (withCount == null) {
+        srcChild.setLocalName(dstChildName);
+        toDst = srcChild;
+      } else {
+        withCount.getReferredINode().setLocalName(dstChildName);
+        int dstSnapshotId = dstIIP.getLatestSnapshotId();
+        toDst = new INodeReference.DstReference(dstParent.asDirectory(),
+            withCount, dstSnapshotId);
+      }
+      return fsd.addLastINodeNoQuotaCheck(dstIIP, toDst);
+    }
+
+    void updateMtimeAndLease(long timestamp) throws QuotaExceededException {
+      srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshotId());
+      final INode dstParent = dstIIP.getINode(-2);
+      dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshotId());
+      // update moved lease with new filename
+      fsd.getFSNamesystem().unprotectedChangeLease(src, dst);
+    }
+
+    void restoreSource() throws QuotaExceededException {
+      // Rename failed - restore src
+      final INode oldSrcChild = srcChild;
+      // put it back
+      if (withCount == null) {
+        srcChild.setLocalName(srcChildName);
+      } else if (!srcChildIsReference) { // src must be in snapshot
+        // the withCount node will no longer be used thus no need to update
+        // its reference number here
+        srcChild = withCount.getReferredINode();
+        srcChild.setLocalName(srcChildName);
+      } else {
+        withCount.removeReference(oldSrcChild.asReference());
+        srcChild = new INodeReference.DstReference(srcParent, withCount,
+            srcRefDstSnapshot);
+        withCount.getReferredINode().setLocalName(srcChildName);
+      }
+
+      if (isSrcInSnapshot) {
+        srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild);
+      } else {
+        // srcParent is not an INodeDirectoryWithSnapshot, we only need to add
+        // the srcChild back
+        fsd.addLastINodeNoQuotaCheck(srcIIP, srcChild);
+      }
+    }
+
+    void updateQuotasInSourceTree() throws QuotaExceededException {
+      // update the quota usage in src tree
+      if (isSrcInSnapshot) {
+        // get the counts after rename
+        Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
+            Quota.Counts.newInstance(), false);
+        newSrcCounts.subtract(oldSrcCounts);
+        srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
+            newSrcCounts.get(Quota.DISKSPACE), false);
+      }
+    }
+  }
+
+  static class RenameOldResult {
+    final boolean success;
+    final HdfsFileStatus auditStat;
+
+    RenameOldResult(boolean success, HdfsFileStatus auditStat) {
+      this.success = success;
+      this.auditStat = auditStat;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04269940/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 0c0b2af..7d656f4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -42,9 +42,6 @@ import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileEncryptionInfo;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
@@ -57,7 +54,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -80,7 +76,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
-import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ByteArray;
 import org.apache.hadoop.hdfs.util.ChunkedArrayList;
@@ -476,398 +471,6 @@ public class FSDirectory implements Closeable {
   }
 
   /**
-   * @throws SnapshotAccessControlException 
-   * @see #unprotectedRenameTo(String, String, long)
-   * @deprecated Use {@link #renameTo(String, String, long,
-   *                                  BlocksMapUpdateInfo, Rename...)}
-   */
-  @Deprecated
-  boolean renameTo(String src, String dst, long mtime)
-      throws QuotaExceededException, UnresolvedLinkException, 
-      FileAlreadyExistsException, SnapshotAccessControlException, IOException {
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
-          +src+" to "+dst);
-    }
-    writeLock();
-    try {
-      if (!unprotectedRenameTo(src, dst, mtime))
-        return false;
-    } finally {
-      writeUnlock();
-    }
-    return true;
-  }
-
-  /**
-   * @see #unprotectedRenameTo(String, String, long, Options.Rename...)
-   */
-  void renameTo(String src, String dst, long mtime,
-      BlocksMapUpdateInfo collectedBlocks, Options.Rename... options)
-      throws FileAlreadyExistsException, FileNotFoundException,
-      ParentNotDirectoryException, QuotaExceededException,
-      UnresolvedLinkException, IOException {
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src
-          + " to " + dst);
-    }
-    writeLock();
-    try {
-      if (unprotectedRenameTo(src, dst, mtime, collectedBlocks, options)) {
-        namesystem.incrDeletedFileCount(1);
-      }
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  /**
-   * Change a path name
-   * 
-   * @param src source path
-   * @param dst destination path
-   * @return true if rename succeeds; false otherwise
-   * @throws QuotaExceededException if the operation violates any quota limit
-   * @throws FileAlreadyExistsException if the src is a symlink that points to dst
-   * @throws SnapshotAccessControlException if path is in RO snapshot
-   * @deprecated See {@link #renameTo(String, String, long, BlocksMapUpdateInfo, Rename...)}
-   */
-  @Deprecated
-  boolean unprotectedRenameTo(String src, String dst, long timestamp)
-    throws QuotaExceededException, UnresolvedLinkException, 
-    FileAlreadyExistsException, SnapshotAccessControlException, IOException {
-    assert hasWriteLock();
-    INodesInPath srcIIP = getINodesInPath4Write(src, false);
-    final INode srcInode = srcIIP.getLastINode();
-    try {
-      validateRenameSource(src, srcIIP);
-    } catch (SnapshotException e) {
-      throw e;
-    } catch (IOException ignored) {
-      return false;
-    }
-
-    if (isDir(dst)) {
-      dst += Path.SEPARATOR + new Path(src).getName();
-    }
-
-    // validate the destination
-    if (dst.equals(src)) {
-      return true;
-    }
-
-    try {
-      validateRenameDestination(src, dst, srcInode);
-    } catch (IOException ignored) {
-      return false;
-    }
-
-    INodesInPath dstIIP = getINodesInPath4Write(dst, false);
-    if (dstIIP.getLastINode() != null) {
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-                                   +"failed to rename "+src+" to "+dst+ 
-                                   " because destination exists");
-      return false;
-    }
-    INode dstParent = dstIIP.getINode(-2);
-    if (dstParent == null) {
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          +"failed to rename "+src+" to "+dst+ 
-          " because destination's parent does not exist");
-      return false;
-    }
-    
-    ezManager.checkMoveValidity(srcIIP, dstIIP, src);
-    // Ensure dst has quota to accommodate rename
-    verifyFsLimitsForRename(srcIIP, dstIIP);
-    verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
-
-    RenameOperation tx = new RenameOperation(src, dst, srcIIP, dstIIP);
-
-    boolean added = false;
-
-    try {
-      // remove src
-      final long removedSrc = removeLastINode(srcIIP);
-      if (removedSrc == -1) {
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + "failed to rename " + src + " to " + dst
-            + " because the source can not be removed");
-        return false;
-      }
-
-      added = tx.addSourceToDestination();
-      if (added) {
-        if (NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: " 
-              + src + " is renamed to " + dst);
-        }
-
-        tx.updateMtimeAndLease(timestamp);
-        tx.updateQuotasInSourceTree();
-        
-        return true;
-      }
-    } finally {
-      if (!added) {
-        tx.restoreSource();
-      }
-    }
-    NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-        +"failed to rename "+src+" to "+dst);
-    return false;
-  }
-
-  /**
-   * Rename src to dst.
-   * <br>
-   * Note: This is to be used by {@link FSEditLog} only.
-   * <br>
-   * 
-   * @param src source path
-   * @param dst destination path
-   * @param timestamp modification time
-   * @param options Rename options
-   */
-  boolean unprotectedRenameTo(String src, String dst, long timestamp,
-      Options.Rename... options) throws FileAlreadyExistsException, 
-      FileNotFoundException, ParentNotDirectoryException, 
-      QuotaExceededException, UnresolvedLinkException, IOException {
-    BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-    boolean ret = unprotectedRenameTo(src, dst, timestamp, 
-        collectedBlocks, options);
-    if (!collectedBlocks.getToDeleteList().isEmpty()) {
-      getFSNamesystem().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
-    }
-    return ret;
-  }
-  
-  /**
-   * Rename src to dst.
-   * See {@link DistributedFileSystem#rename(Path, Path, Options.Rename...)}
-   * for details related to rename semantics and exceptions.
-   * 
-   * @param src source path
-   * @param dst destination path
-   * @param timestamp modification time
-   * @param collectedBlocks blocks to be removed
-   * @param options Rename options
-   */
-  boolean unprotectedRenameTo(String src, String dst, long timestamp,
-      BlocksMapUpdateInfo collectedBlocks, Options.Rename... options) 
-      throws FileAlreadyExistsException, FileNotFoundException, 
-      ParentNotDirectoryException, QuotaExceededException, 
-      UnresolvedLinkException, IOException {
-    assert hasWriteLock();
-    boolean overwrite = options != null && Arrays.asList(options).contains
-            (Rename.OVERWRITE);
-
-    final String error;
-    final INodesInPath srcIIP = getINodesInPath4Write(src, false);
-    final INode srcInode = srcIIP.getLastINode();
-    validateRenameSource(src, srcIIP);
-
-    // validate the destination
-    if (dst.equals(src)) {
-      throw new FileAlreadyExistsException(
-          "The source "+src+" and destination "+dst+" are the same");
-    }
-    validateRenameDestination(src, dst, srcInode);
-
-    INodesInPath dstIIP = getINodesInPath4Write(dst, false);
-    if (dstIIP.getINodes().length == 1) {
-      error = "rename destination cannot be the root";
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new IOException(error);
-    }
-
-    ezManager.checkMoveValidity(srcIIP, dstIIP, src);
-    final INode dstInode = dstIIP.getLastINode();
-    List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
-    if (dstInode != null) { // Destination exists
-      validateRenameOverwrite(src, dst, overwrite, srcInode, dstInode);
-      FSDirSnapshotOp.checkSnapshot(dstInode, snapshottableDirs);
-    }
-
-    INode dstParent = dstIIP.getINode(-2);
-    if (dstParent == null) {
-      error = "rename destination parent " + dst + " not found.";
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new FileNotFoundException(error);
-    }
-    if (!dstParent.isDirectory()) {
-      error = "rename destination parent " + dst + " is a file.";
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new ParentNotDirectoryException(error);
-    }
-
-    // Ensure dst has quota to accommodate rename
-    verifyFsLimitsForRename(srcIIP, dstIIP);
-    verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
-
-    RenameOperation tx = new RenameOperation(src, dst, srcIIP, dstIIP);
-
-    boolean undoRemoveSrc = true;
-    final long removedSrc = removeLastINode(srcIIP);
-    if (removedSrc == -1) {
-      error = "Failed to rename " + src + " to " + dst
-          + " because the source can not be removed";
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new IOException(error);
-    }
-    
-    boolean undoRemoveDst = false;
-    INode removedDst = null;
-    long removedNum = 0;
-    try {
-      if (dstInode != null) { // dst exists remove it
-        if ((removedNum = removeLastINode(dstIIP)) != -1) {
-          removedDst = dstIIP.getLastINode();
-          undoRemoveDst = true;
-        }
-      }
-
-      // add src as dst to complete rename
-      if (tx.addSourceToDestination()) {
-        undoRemoveSrc = false;
-        if (NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug(
-              "DIR* FSDirectory.unprotectedRenameTo: " + src
-              + " is renamed to " + dst);
-        }
-
-        tx.updateMtimeAndLease(timestamp);
-
-        // Collect the blocks and remove the lease for previous dst
-        boolean filesDeleted = false;
-        if (removedDst != null) {
-          undoRemoveDst = false;
-          if (removedNum > 0) {
-            List<INode> removedINodes = new ChunkedArrayList<INode>();
-            if (!removedDst.isInLatestSnapshot(dstIIP.getLatestSnapshotId())) {
-              removedDst.destroyAndCollectBlocks(collectedBlocks, removedINodes);
-              filesDeleted = true;
-            } else {
-              filesDeleted = removedDst.cleanSubtree(Snapshot.CURRENT_STATE_ID,
-                  dstIIP.getLatestSnapshotId(), collectedBlocks, removedINodes,
-                  true).get(Quota.NAMESPACE) >= 0;
-            }
-            getFSNamesystem().removePathAndBlocks(src, null, 
-                removedINodes, false);
-          }
-        }
-
-        if (snapshottableDirs.size() > 0) {
-          // There are snapshottable directories (without snapshots) to be
-          // deleted. Need to update the SnapshotManager.
-          namesystem.removeSnapshottableDirs(snapshottableDirs);
-        }
-
-        tx.updateQuotasInSourceTree();
-        return filesDeleted;
-      }
-    } finally {
-      if (undoRemoveSrc) {
-        tx.restoreSource();
-      }
-
-      if (undoRemoveDst) {
-        // Rename failed - restore dst
-        if (dstParent.isDirectory() && dstParent.asDirectory().isWithSnapshot()) {
-          dstParent.asDirectory().undoRename4DstParent(removedDst,
-              dstIIP.getLatestSnapshotId());
-        } else {
-          addLastINodeNoQuotaCheck(dstIIP, removedDst);
-        }
-        if (removedDst.isReference()) {
-          final INodeReference removedDstRef = removedDst.asReference();
-          final INodeReference.WithCount wc = 
-              (WithCount) removedDstRef.getReferredINode().asReference();
-          wc.addReference(removedDstRef);
-        }
-      }
-    }
-    NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-        + "failed to rename " + src + " to " + dst);
-    throw new IOException("rename from " + src + " to " + dst + " failed.");
-  }
-
-  private static void validateRenameOverwrite(String src, String dst,
-                                              boolean overwrite,
-                                              INode srcInode, INode dstInode)
-          throws IOException {
-    String error;// It's OK to rename a file to a symlink and vice versa
-    if (dstInode.isDirectory() != srcInode.isDirectory()) {
-      error = "Source " + src + " and destination " + dst
-          + " must both be directories";
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new IOException(error);
-    }
-    if (!overwrite) { // If destination exists, overwrite flag must be true
-      error = "rename destination " + dst + " already exists";
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new FileAlreadyExistsException(error);
-    }
-    if (dstInode.isDirectory()) {
-      final ReadOnlyList<INode> children = dstInode.asDirectory()
-          .getChildrenList(Snapshot.CURRENT_STATE_ID);
-      if (!children.isEmpty()) {
-        error = "rename destination directory is not empty: " + dst;
-        NameNode.stateChangeLog.warn(
-            "DIR* FSDirectory.unprotectedRenameTo: " + error);
-        throw new IOException(error);
-      }
-    }
-  }
-
-  private static void validateRenameDestination(String src, String dst, INode srcInode)
-          throws IOException {
-    String error;
-    if (srcInode.isSymlink() &&
-        dst.equals(srcInode.asSymlink().getSymlinkString())) {
-      throw new FileAlreadyExistsException(
-          "Cannot rename symlink "+src+" to its target "+dst);
-    }
-    // dst cannot be a directory or a file under src
-    if (dst.startsWith(src) &&
-        dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
-      error = "Rename destination " + dst
-          + " is a directory or file under source " + src;
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new IOException(error);
-    }
-  }
-
-  private static void validateRenameSource(String src, INodesInPath srcIIP)
-          throws IOException {
-    String error;
-    final INode srcInode = srcIIP.getLastINode();
-    // validate source
-    if (srcInode == null) {
-      error = "rename source " + src + " is not found.";
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new FileNotFoundException(error);
-    }
-    if (srcIIP.getINodes().length == 1) {
-      error = "rename source cannot be the root";
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new IOException(error);
-    }
-    // srcInode and its subtree cannot contain snapshottable directories with
-    // snapshots
-    FSDirSnapshotOp.checkSnapshot(srcInode, null);
-  }
-
-  /**
    * This is a wrapper for resolvePath(). If the path passed
    * is prefixed with /.reserved/raw, then it checks to ensure that the caller
    * has super user has super user privileges.
@@ -890,126 +493,6 @@ public class FSDirectory implements Closeable {
     return resolvePath(path, pathComponents, this);
   }
 
-  private class RenameOperation {
-    private final INodesInPath srcIIP;
-    private final INodesInPath dstIIP;
-    private final String src;
-    private final String dst;
-
-    private INode srcChild;
-    private final INodeReference.WithCount withCount;
-    private final int srcRefDstSnapshot;
-    private final INodeDirectory srcParent;
-    private final byte[] srcChildName;
-    private final boolean isSrcInSnapshot;
-    private final boolean srcChildIsReference;
-    private final Quota.Counts oldSrcCounts;
-
-    private RenameOperation(String src, String dst, INodesInPath srcIIP, INodesInPath dstIIP)
-            throws QuotaExceededException {
-      this.srcIIP = srcIIP;
-      this.dstIIP = dstIIP;
-      this.src = src;
-      this.dst = dst;
-      srcChild = srcIIP.getLastINode();
-      srcChildName = srcChild.getLocalNameBytes();
-      isSrcInSnapshot = srcChild.isInLatestSnapshot(
-              srcIIP.getLatestSnapshotId());
-      srcChildIsReference = srcChild.isReference();
-      srcParent = srcIIP.getINode(-2).asDirectory();
-
-      // Record the snapshot on srcChild. After the rename, before any new
-      // snapshot is taken on the dst tree, changes will be recorded in the latest
-      // snapshot of the src tree.
-      if (isSrcInSnapshot) {
-        srcChild.recordModification(srcIIP.getLatestSnapshotId());
-      }
-
-      // check srcChild for reference
-      srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference()
-              .getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
-      oldSrcCounts = Quota.Counts.newInstance();
-      if (isSrcInSnapshot) {
-        final INodeReference.WithName withName = srcIIP.getINode(-2).asDirectory()
-                .replaceChild4ReferenceWithName(srcChild, srcIIP.getLatestSnapshotId());
-        withCount = (INodeReference.WithCount) withName.getReferredINode();
-        srcChild = withName;
-        srcIIP.setLastINode(srcChild);
-        // get the counts before rename
-        withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
-      } else if (srcChildIsReference) {
-        // srcChild is reference but srcChild is not in latest snapshot
-        withCount = (WithCount) srcChild.asReference().getReferredINode();
-      } else {
-        withCount = null;
-      }
-    }
-
-    boolean addSourceToDestination() {
-      final INode dstParent = dstIIP.getINode(-2);
-      srcChild = srcIIP.getLastINode();
-      final byte[] dstChildName = dstIIP.getLastLocalName();
-      final INode toDst;
-      if (withCount == null) {
-        srcChild.setLocalName(dstChildName);
-        toDst = srcChild;
-      } else {
-        withCount.getReferredINode().setLocalName(dstChildName);
-        int dstSnapshotId = dstIIP.getLatestSnapshotId();
-        toDst = new INodeReference.DstReference(
-                dstParent.asDirectory(), withCount, dstSnapshotId);
-      }
-      return addLastINodeNoQuotaCheck(dstIIP, toDst);
-    }
-
-    void updateMtimeAndLease(long timestamp) throws QuotaExceededException {
-      srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshotId());
-      final INode dstParent = dstIIP.getINode(-2);
-      dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshotId());
-      // update moved lease with new filename
-      getFSNamesystem().unprotectedChangeLease(src, dst);
-    }
-
-    void restoreSource() throws QuotaExceededException {
-      // Rename failed - restore src
-      final INode oldSrcChild = srcChild;
-      // put it back
-      if (withCount == null) {
-        srcChild.setLocalName(srcChildName);
-      } else if (!srcChildIsReference) { // src must be in snapshot
-        // the withCount node will no longer be used thus no need to update
-        // its reference number here
-        srcChild = withCount.getReferredINode();
-        srcChild.setLocalName(srcChildName);
-      } else {
-        withCount.removeReference(oldSrcChild.asReference());
-        srcChild = new INodeReference.DstReference(
-                srcParent, withCount, srcRefDstSnapshot);
-        withCount.getReferredINode().setLocalName(srcChildName);
-      }
-
-      if (isSrcInSnapshot) {
-        srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild);
-      } else {
-        // srcParent is not an INodeDirectoryWithSnapshot, we only need to add
-        // the srcChild back
-        addLastINodeNoQuotaCheck(srcIIP, srcChild);
-      }
-    }
-
-    void updateQuotasInSourceTree() throws QuotaExceededException {
-      // update the quota usage in src tree
-      if (isSrcInSnapshot) {
-        // get the counts after rename
-        Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
-                Quota.Counts.newInstance(), false);
-        newSrcCounts.subtract(oldSrcCounts);
-        srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
-                newSrcCounts.get(Quota.DISKSPACE), false);
-      }
-    }
-  }
-
   /**
    * Set file replication
    * 
@@ -1695,7 +1178,7 @@ public class FSDirectory implements Closeable {
    * @param dst directory to where node is moved to.
    * @throws QuotaExceededException if quota limit is exceeded.
    */
-  private void verifyQuotaForRename(INode[] src, INode[] dst)
+  void verifyQuotaForRename(INode[] src, INode[] dst)
       throws QuotaExceededException {
     if (!namesystem.isImageLoaded() || skipQuotaCheck) {
       // Do not check quota if edits log is still being processed
@@ -1725,7 +1208,7 @@ public class FSDirectory implements Closeable {
    * @throws PathComponentTooLongException child's name is too long.
    * @throws MaxDirectoryItemsExceededException too many children.
    */
-  private void verifyFsLimitsForRename(INodesInPath srcIIP, INodesInPath dstIIP)
+  void verifyFsLimitsForRename(INodesInPath srcIIP, INodesInPath dstIIP)
       throws PathComponentTooLongException, MaxDirectoryItemsExceededException {
     byte[] dstChildName = dstIIP.getLastLocalName();
     INode[] dstInodes = dstIIP.getINodes();
@@ -1884,7 +1367,7 @@ public class FSDirectory implements Closeable {
     return added;
   }
   
-  private boolean addLastINodeNoQuotaCheck(INodesInPath inodesInPath, INode i) {
+  boolean addLastINodeNoQuotaCheck(INodesInPath inodesInPath, INode i) {
     try {
       return addLastINode(inodesInPath, i, false);
     } catch (QuotaExceededException e) {
@@ -1901,7 +1384,7 @@ public class FSDirectory implements Closeable {
    *            reference nodes;
    *         >0 otherwise. 
    */
-  private long removeLastINode(final INodesInPath iip)
+  long removeLastINode(final INodesInPath iip)
       throws QuotaExceededException {
     final int latestSnapshot = iip.getLatestSnapshotId();
     final INode last = iip.getLastINode();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04269940/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index d57d9b8..1c89849 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -508,8 +508,7 @@ public class FSEditLogLoader {
       RenameOldOp renameOp = (RenameOldOp)op;
       final String src = renameReservedPathsOnUpgrade(renameOp.src, logVersion);
       final String dst = renameReservedPathsOnUpgrade(renameOp.dst, logVersion);
-      fsDir.unprotectedRenameTo(src, dst,
-                                renameOp.timestamp);
+      FSDirRenameOp.unprotectedRenameTo(fsDir, src, dst, renameOp.timestamp);
       
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId);
@@ -601,7 +600,7 @@ public class FSEditLogLoader {
     }
     case OP_RENAME: {
       RenameOp renameOp = (RenameOp)op;
-      fsDir.unprotectedRenameTo(
+      FSDirRenameOp.unprotectedRenameTo(fsDir,
           renameReservedPathsOnUpgrade(renameOp.src, logVersion),
           renameReservedPathsOnUpgrade(renameOp.dst, logVersion),
           renameOp.timestamp, renameOp.options);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04269940/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index 4624a70..8ac6926 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -554,11 +554,6 @@ public class FSImage implements Closeable {
     return editLog;
   }
 
-  @VisibleForTesting
-  public void setEditLogForTesting(FSEditLog newLog) {
-    editLog = newLog;
-  }
-
   void openEditLogForWrite() throws IOException {
     assert editLog != null : "editLog must be initialized";
     editLog.openForWrite();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04269940/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 543f47a..47d6455 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -149,7 +149,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
@@ -3561,155 +3560,62 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   @Deprecated
   boolean renameTo(String src, String dst, boolean logRetryCache)
-      throws IOException, UnresolvedLinkException {
-    boolean ret = false;
-    try {
-      ret = renameToInt(src, dst, logRetryCache);
-    } catch (AccessControlException e) {
-      logAuditEvent(false, "rename", src, dst, null);
-      throw e;
-    }
-    return ret;
-  }
-
-  private boolean renameToInt(final String srcArg, final String dstArg,
-    boolean logRetryCache)
-    throws IOException, UnresolvedLinkException {
-    String src = srcArg;
-    String dst = dstArg;
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
-          " to " + dst);
-    }
-    if (!DFSUtil.isValidName(dst)) {
-      throw new IOException("Invalid name: " + dst);
-    }
-    FSPermissionChecker pc = getPermissionChecker();
+      throws IOException {
+    waitForLoadingFSImage();
     checkOperation(OperationCategory.WRITE);
-    byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
-    boolean status = false;
-    HdfsFileStatus resultingStat = null;
+    FSDirRenameOp.RenameOldResult ret = null;
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot rename " + src);
-      waitForLoadingFSImage();
-      src = dir.resolvePath(pc, src, srcComponents);
-      dst = dir.resolvePath(pc, dst, dstComponents);
-      checkOperation(OperationCategory.WRITE);
-      status = renameToInternal(pc, src, dst, logRetryCache);
-      if (status) {
-        resultingStat = getAuditFileInfo(dst, false);
-      }
+      ret = FSDirRenameOp.renameToInt(dir, src, dst, logRetryCache);
+    } catch (AccessControlException e)  {
+      logAuditEvent(false, "rename", src, dst, null);
+      throw e;
     } finally {
       writeUnlock();
     }
-    getEditLog().logSync();
-    if (status) {
-      logAuditEvent(true, "rename", srcArg, dstArg, resultingStat);
-    }
-    return status;
-  }
-
-  /** @deprecated See {@link #renameTo(String, String, boolean)} */
-  @Deprecated
-  private boolean renameToInternal(FSPermissionChecker pc, String src,
-      String dst, boolean logRetryCache) throws IOException,
-      UnresolvedLinkException {
-    assert hasWriteLock();
-    if (isPermissionEnabled) {
-      //We should not be doing this.  This is move() not renameTo().
-      //but for now,
-      //NOTE: yes, this is bad!  it's assuming much lower level behavior
-      //      of rewriting the dst
-      String actualdst = dir.isDir(dst)?
-          dst + Path.SEPARATOR + new Path(src).getName(): dst;
-      // Rename does not operates on link targets
-      // Do not resolveLink when checking permissions of src and dst
-      // Check write access to parent of src
-      checkPermission(pc, src, false, null, FsAction.WRITE, null, null,
-          false, false);
-      // Check write access to ancestor of dst
-      checkPermission(pc, actualdst, false, FsAction.WRITE, null, null, null,
-          false, false);
-    }
-
-    long mtime = now();
-    if (dir.renameTo(src, dst, mtime)) {
-      getEditLog().logRename(src, dst, mtime, logRetryCache);
-      return true;
+    boolean success = ret != null && ret.success;
+    if (success) {
+      getEditLog().logSync();
     }
-    return false;
+    logAuditEvent(success, "rename", src, dst,
+        ret == null ? null : ret.auditStat);
+    return success;
   }
-  
 
-  /** Rename src to dst */
-  void renameTo(final String srcArg, final String dstArg, boolean logRetryCache,
-      Options.Rename... options) throws IOException, UnresolvedLinkException {
-    String src = srcArg;
-    String dst = dstArg;
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
-          + src + " to " + dst);
-    }
-    if (!DFSUtil.isValidName(dst)) {
-      throw new InvalidPathException("Invalid name: " + dst);
-    }
-    final FSPermissionChecker pc = getPermissionChecker();
-    
+  void renameTo(final String src, final String dst,
+                boolean logRetryCache, Options.Rename... options)
+      throws IOException {
+    waitForLoadingFSImage();
     checkOperation(OperationCategory.WRITE);
-
-    byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
-    HdfsFileStatus resultingStat = null;
+    Map.Entry<BlocksMapUpdateInfo, HdfsFileStatus> res = null;
     writeLock();
-    BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot rename " + src);
-      src = dir.resolvePath(pc, src, srcComponents);
-      dst = dir.resolvePath(pc, dst, dstComponents);
-      renameToInternal(pc, src, dst, logRetryCache, collectedBlocks, options);
-      resultingStat = getAuditFileInfo(dst, false);
+      res = FSDirRenameOp.renameToInt(dir, src, dst, logRetryCache, options);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "rename (options=" + Arrays.toString(options) +
+          ")", src, dst, null);
+      throw e;
     } finally {
       writeUnlock();
     }
+
     getEditLog().logSync();
+
+    BlocksMapUpdateInfo collectedBlocks = res.getKey();
+    HdfsFileStatus auditStat = res.getValue();
     if (!collectedBlocks.getToDeleteList().isEmpty()) {
       removeBlocks(collectedBlocks);
       collectedBlocks.clear();
     }
-    if (resultingStat != null) {
-      StringBuilder cmd = new StringBuilder("rename options=");
-      for (Rename option : options) {
-        cmd.append(option.value()).append(" ");
-      }
-      logAuditEvent(true, cmd.toString(), srcArg, dstArg, resultingStat);
-    }
-  }
-
-  private void renameToInternal(FSPermissionChecker pc, String src, 
-      String dst, boolean logRetryCache, BlocksMapUpdateInfo collectedBlocks, 
-      Options.Rename... options) throws IOException {
-    assert hasWriteLock();
-    if (isPermissionEnabled) {
-      // Rename does not operates on link targets
-      // Do not resolveLink when checking permissions of src and dst
-      // Check write access to parent of src
-      checkPermission(pc, src, false, null, FsAction.WRITE, null, null, false,
-          false);
-      // Check write access to ancestor of dst
-      checkPermission(pc, dst, false, FsAction.WRITE, null, null, null, false,
-          false);
-    }
 
-    waitForLoadingFSImage();
-    long mtime = now();
-    dir.renameTo(src, dst, mtime, collectedBlocks, options);
-    getEditLog().logRename(src, dst, mtime, logRetryCache, options);
+    logAuditEvent(true, "rename (options=" + Arrays.toString(options) +
+        ")", src, dst, auditStat);
   }
-  
+
   /**
    * Remove the indicated file from namespace.
    * 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04269940/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 4e85311..3814ffc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
 import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.ha
@@ -81,6 +82,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.VersionInfo;
 import org.junit.Assume;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import java.io.*;
 import java.lang.reflect.Field;
@@ -195,6 +197,12 @@ public class DFSTestUtil {
         logicalName, "nn2"), "127.0.0.1:12346");
   }
 
+  public static void setEditLogForTesting(NameNode nn, FSEditLog newLog) {
+    Whitebox.setInternalState(nn.getFSImage(), "editLog", newLog);
+    Whitebox.setInternalState(nn.getNamesystem().getFSDirectory(), "editLog",
+        newLog);
+  }
+
 
   /** class MyFile contains enough information to recreate the contents of
    * a single file.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04269940/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRenameWhileOpen.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRenameWhileOpen.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRenameWhileOpen.java
index a5d0425..2ee72e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRenameWhileOpen.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRenameWhileOpen.java
@@ -77,7 +77,7 @@ public class TestRenameWhileOpen {
       FSEditLog spyLog =
           spy(cluster.getNameNode().getFSImage().getEditLog());
       doNothing().when(spyLog).endCurrentLogSegment(Mockito.anyBoolean());
-      cluster.getNameNode().getFSImage().setEditLogForTesting(spyLog);
+      DFSTestUtil.setEditLogForTesting(cluster.getNameNode(), spyLog);
 
       final int nnport = cluster.getNameNodePort();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04269940/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 1a42e28..e3cd918 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -26,6 +26,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -187,7 +188,7 @@ public class NameNodeAdapter {
   
   public static FSEditLog spyOnEditLog(NameNode nn) {
     FSEditLog spyEditLog = spy(nn.getNamesystem().getFSImage().getEditLog());
-    nn.getFSImage().setEditLogForTesting(spyEditLog);
+    DFSTestUtil.setEditLogForTesting(nn, spyEditLog);
     EditLogTailer tailer = nn.getNamesystem().getEditLogTailer();
     if (tailer != null) {
       tailer.setEditLog(spyEditLog);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04269940/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
index 178c127..c19c469 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -540,7 +541,7 @@ public class TestNameNodeRecovery {
         FSEditLog spyLog =
             spy(cluster.getNameNode().getFSImage().getEditLog());
         doNothing().when(spyLog).endCurrentLogSegment(true);
-        cluster.getNameNode().getFSImage().setEditLogForTesting(spyLog);
+        DFSTestUtil.setEditLogForTesting(cluster.getNameNode(), spyLog);
       }
       fileSys = cluster.getFileSystem();
       final FSNamesystem namesystem = cluster.getNamesystem();