You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2014/12/03 00:00:31 UTC

hadoop git commit: HDFS-7462. Consolidate implementation of mkdirs() into a single class. Contributed by Haohui Mai.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 52bcefca8 -> 185e0c7b4


HDFS-7462. Consolidate implementation of mkdirs() into a single class. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/185e0c7b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/185e0c7b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/185e0c7b

Branch: refs/heads/trunk
Commit: 185e0c7b4c056b88f606362c71e4a22aae7076e0
Parents: 52bcefc
Author: Haohui Mai <wh...@apache.org>
Authored: Tue Dec 2 14:53:45 2014 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Dec 2 14:53:45 2014 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/namenode/FSDirMkdirOp.java      | 238 ++++++++++++++++++
 .../hdfs/server/namenode/FSDirectory.java       | 111 +++++----
 .../hdfs/server/namenode/FSEditLogLoader.java   |  10 +-
 .../hdfs/server/namenode/FSImageFormat.java     |   6 +-
 .../server/namenode/FSImageFormatPBINode.java   |   4 +-
 .../server/namenode/FSImageSerialization.java   |   2 +-
 .../hdfs/server/namenode/FSNamesystem.java      | 244 ++-----------------
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |   8 +-
 .../apache/hadoop/hdfs/TestRenameWhileOpen.java |   2 +-
 .../hdfs/server/namenode/NameNodeAdapter.java   |   2 +-
 .../hdfs/server/namenode/TestEditLog.java       |   6 +-
 .../hdfs/server/namenode/TestEditLogRace.java   |   4 +-
 .../hdfs/server/namenode/TestINodeFile.java     |  20 +-
 .../server/namenode/TestNameNodeRecovery.java   |   2 +-
 .../namenode/ha/TestEditLogsDuringFailover.java |   2 +-
 .../namenode/ha/TestStandbyCheckpoints.java     |   2 +-
 17 files changed, 346 insertions(+), 320 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/185e0c7b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 196673e..4d2fb05 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -416,6 +416,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7438. Consolidate the implementation of rename() into a single class.
     (wheat9)
 
+    HDFS-7462. Consolidate implementation of mkdirs() into a single class.
+    (wheat9)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/185e0c7b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
new file mode 100644
index 0000000..01cb57f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.AclException;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.hadoop.util.Time.now;
+
+class FSDirMkdirOp {
+  static HdfsFileStatus mkdirs(
+      FSNamesystem fsn, String src, PermissionStatus permissions,
+      boolean createParent) throws IOException {
+    FSDirectory fsd = fsn.getFSDirectory();
+    final String srcArg = src;
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
+    }
+    if (!DFSUtil.isValidName(src)) {
+      throw new InvalidPathException(src);
+    }
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath
+        (src);
+    src = fsd.resolvePath(pc, src, pathComponents);
+    if (fsd.isPermissionEnabled()) {
+      fsd.checkTraverse(pc, src);
+    }
+
+    if (!isDirMutable(fsd, src)) {
+      if (fsd.isPermissionEnabled()) {
+        fsd.checkAncestorAccess(pc, src, FsAction.WRITE);
+      }
+
+      if (!createParent) {
+        fsd.verifyParentDir(src);
+      }
+
+      // validate that we have enough inodes. This is, at best, a
+      // heuristic because the mkdirs() operation might need to
+      // create multiple inodes.
+      fsn.checkFsObjectLimit();
+
+      if (!mkdirsRecursively(fsd, src, permissions, false, now())) {
+        throw new IOException("Failed to create directory: " + src);
+      }
+    }
+    return fsd.getAuditFileInfo(srcArg, false);
+  }
+
+  static INode unprotectedMkdir(
+      FSDirectory fsd, long inodeId, String src,
+      PermissionStatus permissions, List<AclEntry> aclEntries, long timestamp)
+      throws QuotaExceededException, UnresolvedLinkException, AclException {
+    assert fsd.hasWriteLock();
+    byte[][] components = INode.getPathComponents(src);
+    INodesInPath iip = fsd.getExistingPathINodes(components);
+    INode[] inodes = iip.getINodes();
+    final int pos = inodes.length - 1;
+    unprotectedMkdir(fsd, inodeId, iip, pos, components[pos], permissions,
+        aclEntries, timestamp);
+    return inodes[pos];
+  }
+
+  /**
+   * Create a directory
+   * If ancestor directories do not exist, automatically create them.
+
+   * @param fsd FSDirectory
+   * @param src string representation of the path to the directory
+   * @param permissions the permission of the directory
+   * @param inheritPermission
+   *   if the permission of the directory should inherit from its parent or not.
+   *   u+wx is implicitly added to the automatically created directories,
+   *   and to the given directory if inheritPermission is true
+   * @param now creation time
+   * @return true if the operation succeeds false otherwise
+   * @throws QuotaExceededException if directory creation violates
+   *                                any quota limit
+   * @throws UnresolvedLinkException if a symlink is encountered in src.
+   * @throws SnapshotAccessControlException if path is in RO snapshot
+   */
+  static boolean mkdirsRecursively(
+      FSDirectory fsd, String src, PermissionStatus permissions,
+      boolean inheritPermission, long now)
+      throws FileAlreadyExistsException, QuotaExceededException,
+             UnresolvedLinkException, SnapshotAccessControlException,
+             AclException {
+    src = FSDirectory.normalizePath(src);
+    String[] names = INode.getPathNames(src);
+    byte[][] components = INode.getPathComponents(names);
+    final int lastInodeIndex = components.length - 1;
+
+    fsd.writeLock();
+    try {
+      INodesInPath iip = fsd.getExistingPathINodes(components);
+      if (iip.isSnapshot()) {
+        throw new SnapshotAccessControlException(
+                "Modification on RO snapshot is disallowed");
+      }
+      INode[] inodes = iip.getINodes();
+
+      // find the index of the first null in inodes[]
+      StringBuilder pathbuilder = new StringBuilder();
+      int i = 1;
+      for(; i < inodes.length && inodes[i] != null; i++) {
+        pathbuilder.append(Path.SEPARATOR).append(names[i]);
+        if (!inodes[i].isDirectory()) {
+          throw new FileAlreadyExistsException(
+                  "Parent path is not a directory: "
+                  + pathbuilder + " "+inodes[i].getLocalName());
+        }
+      }
+
+      // default to creating parent dirs with the given perms
+      PermissionStatus parentPermissions = permissions;
+
+      // if not inheriting and it's the last inode, there's no use in
+      // computing perms that won't be used
+      if (inheritPermission || (i < lastInodeIndex)) {
+        // if inheriting (ie. creating a file or symlink), use the parent dir,
+        // else the supplied permissions
+        // NOTE: the permissions of the auto-created directories violate posix
+        FsPermission parentFsPerm = inheritPermission
+                ? inodes[i-1].getFsPermission() : permissions.getPermission();
+
+        // ensure that the permissions allow user write+execute
+        if (!parentFsPerm.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
+          parentFsPerm = new FsPermission(
+                  parentFsPerm.getUserAction().or(FsAction.WRITE_EXECUTE),
+                  parentFsPerm.getGroupAction(),
+                  parentFsPerm.getOtherAction()
+          );
+        }
+
+        if (!parentPermissions.getPermission().equals(parentFsPerm)) {
+          parentPermissions = new PermissionStatus(
+                  parentPermissions.getUserName(),
+                  parentPermissions.getGroupName(),
+                  parentFsPerm
+          );
+          // when inheriting, use same perms for entire path
+          if (inheritPermission) permissions = parentPermissions;
+        }
+      }
+
+      // create directories beginning from the first null index
+      for(; i < inodes.length; i++) {
+        pathbuilder.append(Path.SEPARATOR).append(names[i]);
+        unprotectedMkdir(fsd, fsd.allocateNewInodeId(), iip, i, components[i],
+            (i < lastInodeIndex) ? parentPermissions : permissions, null, now);
+        if (inodes[i] == null) {
+          return false;
+        }
+        // Directory creation also count towards FilesCreated
+        // to match count of FilesDeleted metric.
+        NameNode.getNameNodeMetrics().incrFilesCreated();
+
+        final String cur = pathbuilder.toString();
+        fsd.getEditLog().logMkDir(cur, inodes[i]);
+        if(NameNode.stateChangeLog.isDebugEnabled()) {
+          NameNode.stateChangeLog.debug(
+                  "mkdirs: created directory " + cur);
+        }
+      }
+    } finally {
+      fsd.writeUnlock();
+    }
+    return true;
+  }
+
+  /**
+   * Check whether the path specifies a directory
+   * @throws SnapshotAccessControlException if path is in RO snapshot
+   */
+  private static boolean isDirMutable(
+      FSDirectory fsd, String src) throws UnresolvedLinkException,
+      SnapshotAccessControlException {
+    src = FSDirectory.normalizePath(src);
+    fsd.readLock();
+    try {
+      INode node = fsd.getINode4Write(src, false);
+      return node != null && node.isDirectory();
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+
+  /** create a directory at index pos.
+   * The parent path to the directory is at [0, pos-1].
+   * All ancestors exist. Newly created one stored at index pos.
+   */
+  private static void unprotectedMkdir(
+      FSDirectory fsd, long inodeId, INodesInPath inodesInPath, int pos,
+      byte[] name, PermissionStatus permission, List<AclEntry> aclEntries,
+      long timestamp)
+      throws QuotaExceededException, AclException {
+    assert fsd.hasWriteLock();
+    final INodeDirectory dir = new INodeDirectory(inodeId, name, permission,
+        timestamp);
+    if (fsd.addChild(inodesInPath, pos, dir, true)) {
+      if (aclEntries != null) {
+        AclStorage.updateINodeAcl(dir, aclEntries, Snapshot.CURRENT_STATE_ID);
+      }
+      inodesInPath.setINode(pos, dir);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/185e0c7b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 7d656f4..ffc2653 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -39,9 +39,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
-import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
@@ -66,7 +66,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
-import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
@@ -83,7 +82,6 @@ import org.apache.hadoop.hdfs.util.ChunkedArrayList;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
@@ -147,6 +145,7 @@ public class FSDirectory implements Closeable {
   private final boolean isPermissionEnabled;
   private final String fsOwnerShortUserName;
   private final String supergroup;
+  private final INodeId inodeId;
 
   private final FSEditLog editLog;
 
@@ -194,6 +193,7 @@ public class FSDirectory implements Closeable {
 
   FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
     this.dirLock = new ReentrantReadWriteLock(true); // fair
+    this.inodeId = new INodeId();
     rootDir = createRoot(ns);
     inodeMap = INodeMap.newInstance(rootDir);
     this.isPermissionEnabled = conf.getBoolean(
@@ -329,8 +329,7 @@ public class FSDirectory implements Closeable {
       UnresolvedLinkException, SnapshotAccessControlException, AclException {
 
     long modTime = now();
-    INodeFile newNode = newINodeFile(namesystem.allocateNewInodeId(),
-        permissions, modTime, modTime, replication, preferredBlockSize);
+    INodeFile newNode = newINodeFile(allocateNewInodeId(), permissions, modTime, modTime, replication, preferredBlockSize);
     newNode.toUnderConstruction(clientName, clientMachine);
 
     boolean added = false;
@@ -929,22 +928,6 @@ public class FSDirectory implements Closeable {
       readUnlock();
     }
   }
-  
-  /**
-   * Check whether the path specifies a directory
-   * @throws SnapshotAccessControlException if path is in RO snapshot
-   */
-  boolean isDirMutable(String src) throws UnresolvedLinkException,
-      SnapshotAccessControlException {
-    src = normalizePath(src);
-    readLock();
-    try {
-      INode node = getINode4Write(src, false);
-      return node != null && node.isDirectory();
-    } finally {
-      readUnlock();
-    }
-  }
 
   /** Updates namespace and diskspace consumed for all
    * directories until the parent directory of file represented by path.
@@ -1081,38 +1064,6 @@ public class FSDirectory implements Closeable {
     return inodes == null ? "" : getFullPathName(inodes, inodes.length - 1);
   }
 
-  INode unprotectedMkdir(long inodeId, String src, PermissionStatus permissions,
-                          List<AclEntry> aclEntries, long timestamp)
-      throws QuotaExceededException, UnresolvedLinkException, AclException {
-    assert hasWriteLock();
-    byte[][] components = INode.getPathComponents(src);
-    INodesInPath iip = getExistingPathINodes(components);
-    INode[] inodes = iip.getINodes();
-    final int pos = inodes.length - 1;
-    unprotectedMkdir(inodeId, iip, pos, components[pos], permissions, aclEntries,
-        timestamp);
-    return inodes[pos];
-  }
-
-  /** create a directory at index pos.
-   * The parent path to the directory is at [0, pos-1].
-   * All ancestors exist. Newly created one stored at index pos.
-   */
-  void unprotectedMkdir(long inodeId, INodesInPath inodesInPath,
-      int pos, byte[] name, PermissionStatus permission,
-      List<AclEntry> aclEntries, long timestamp)
-      throws QuotaExceededException, AclException {
-    assert hasWriteLock();
-    final INodeDirectory dir = new INodeDirectory(inodeId, name, permission,
-        timestamp);
-    if (addChild(inodesInPath, pos, dir, true)) {
-      if (aclEntries != null) {
-        AclStorage.updateINodeAcl(dir, aclEntries, Snapshot.CURRENT_STATE_ID);
-      }
-      inodesInPath.setINode(pos, dir);
-    }
-  }
-  
   /**
    * Add the given child to the namespace.
    * @param src The full path name of the child node.
@@ -1314,8 +1265,8 @@ public class FSDirectory implements Closeable {
    *         otherwise return true;
    * @throws QuotaExceededException is thrown if it violates quota limit
    */
-  private boolean addChild(INodesInPath iip, int pos,
-      INode child, boolean checkQuota) throws QuotaExceededException {
+  boolean addChild(INodesInPath iip, int pos, INode child, boolean checkQuota)
+      throws QuotaExceededException {
     final INode[] inodes = iip.getINodes();
     // Disallow creation of /.reserved. This may be created when loading
     // editlog/fsimage during upgrade since /.reserved was a valid name in older
@@ -1626,6 +1577,7 @@ public class FSDirectory implements Closeable {
       inodeMap.clear();
       addToInodeMap(rootDir);
       nameCache.reset();
+      inodeId.setCurrentValue(INodeId.LAST_RESERVED_ID);
     } finally {
       writeUnlock();
     }
@@ -2381,7 +2333,7 @@ public class FSDirectory implements Closeable {
    * @throws UnresolvedLinkException if symlink can't be resolved
    * @throws SnapshotAccessControlException if path is in RO snapshot
    */
-  private INode getINode4Write(String src, boolean resolveLink)
+  INode getINode4Write(String src, boolean resolveLink)
           throws UnresolvedLinkException, SnapshotAccessControlException {
     return getINodesInPath4Write(src, resolveLink).getLastINode();
   }
@@ -2481,4 +2433,51 @@ public class FSDirectory implements Closeable {
       ? FSDirStatAndListingOp.getFileInfo(this, path, resolveSymlink, false,
         false) : null;
   }
+
+  /**
+   * Verify that parent directory of src exists.
+   */
+  void verifyParentDir(String src)
+      throws FileNotFoundException, ParentNotDirectoryException,
+             UnresolvedLinkException {
+    Path parent = new Path(src).getParent();
+    if (parent != null) {
+      final INode parentNode = getINode(parent.toString());
+      if (parentNode == null) {
+        throw new FileNotFoundException("Parent directory doesn't exist: "
+            + parent);
+      } else if (!parentNode.isDirectory() && !parentNode.isSymlink()) {
+        throw new ParentNotDirectoryException("Parent path is not a directory: "
+            + parent);
+      }
+    }
+  }
+
+  /** Allocate a new inode ID. */
+  long allocateNewInodeId() {
+    return inodeId.nextValue();
+  }
+
+  /** @return the last inode ID. */
+  public long getLastInodeId() {
+    return inodeId.getCurrentValue();
+  }
+
+  /**
+   * Set the last allocated inode id when fsimage or editlog is loaded.
+   * @param newValue
+   */
+  void resetLastInodeId(long newValue) throws IOException {
+    try {
+      inodeId.skipTo(newValue);
+    } catch(IllegalStateException ise) {
+      throw new IOException(ise);
+    }
+  }
+
+  /** Should only be used for tests to reset to any value
+   * @param newValue*/
+  void resetLastInodeIdWithoutChecking(long newValue) {
+    inodeId.setCurrentValue(newValue);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/185e0c7b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 1c89849..c33477a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -176,7 +176,7 @@ public class FSEditLogLoader {
     prog.setTotal(Phase.LOADING_EDITS, step, numTxns);
     Counter counter = prog.getCounter(Phase.LOADING_EDITS, step);
     long lastLogTime = now();
-    long lastInodeId = fsNamesys.getLastInodeId();
+    long lastInodeId = fsNamesys.dir.getLastInodeId();
     
     try {
       while (true) {
@@ -276,7 +276,7 @@ public class FSEditLogLoader {
         }
       }
     } finally {
-      fsNamesys.resetLastInodeId(lastInodeId);
+      fsNamesys.dir.resetLastInodeId(lastInodeId);
       if(closeOnExit) {
         in.close();
       }
@@ -305,12 +305,12 @@ public class FSEditLogLoader {
         throw new IOException("The layout version " + logVersion
             + " supports inodeId but gave bogus inodeId");
       }
-      inodeId = fsNamesys.allocateNewInodeId();
+      inodeId = fsNamesys.dir.allocateNewInodeId();
     } else {
       // need to reset lastInodeId. fsnamesys gets lastInodeId firstly from
       // fsimage but editlog captures more recent inodeId allocations
       if (inodeId > lastInodeId) {
-        fsNamesys.resetLastInodeId(inodeId);
+        fsNamesys.dir.resetLastInodeId(inodeId);
       }
     }
     return inodeId;
@@ -530,7 +530,7 @@ public class FSEditLogLoader {
       MkdirOp mkdirOp = (MkdirOp)op;
       inodeId = getAndUpdateLastInodeId(mkdirOp.inodeId, logVersion,
           lastInodeId);
-      fsDir.unprotectedMkdir(inodeId,
+      FSDirMkdirOp.unprotectedMkdir(fsDir, inodeId,
           renameReservedPathsOnUpgrade(mkdirOp.path, logVersion),
           mkdirOp.permissions, mkdirOp.aclEntries, mkdirOp.timestamp);
       break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/185e0c7b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
index c50f506..e26f052 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
@@ -379,7 +379,7 @@ public class FSImageFormat {
         if (NameNodeLayoutVersion.supports(
             LayoutVersion.Feature.ADD_INODE_ID, imgVersion)) {
           long lastInodeId = in.readLong();
-          namesystem.resetLastInodeId(lastInodeId);
+          namesystem.dir.resetLastInodeId(lastInodeId);
           if (LOG.isDebugEnabled()) {
             LOG.debug("load last allocated InodeId from fsimage:" + lastInodeId);
           }
@@ -732,7 +732,7 @@ public class FSImageFormat {
 
     long inodeId = NameNodeLayoutVersion.supports(
         LayoutVersion.Feature.ADD_INODE_ID, imgVersion) ? in.readLong()
-        : namesystem.allocateNewInodeId();
+        : namesystem.dir.allocateNewInodeId();
     
     final short replication = namesystem.getBlockManager().adjustReplication(
         in.readShort());
@@ -1260,7 +1260,7 @@ public class FSImageFormat {
         out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampAtblockIdSwitch());
         out.writeLong(sourceNamesystem.getBlockIdManager().getLastAllocatedBlockId());
         out.writeLong(context.getTxId());
-        out.writeLong(sourceNamesystem.getLastInodeId());
+        out.writeLong(sourceNamesystem.dir.getLastInodeId());
 
 
         sourceNamesystem.getSnapshotManager().write(out);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/185e0c7b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index 321a148..51f2606 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@ -211,7 +211,7 @@ public final class FSImageFormatPBINode {
 
     void loadINodeSection(InputStream in) throws IOException {
       INodeSection s = INodeSection.parseDelimitedFrom(in);
-      fsn.resetLastInodeId(s.getLastInodeId());
+      fsn.dir.resetLastInodeId(s.getLastInodeId());
       LOG.info("Loading " + s.getNumInodes() + " INodes.");
       for (int i = 0; i < s.getNumInodes(); ++i) {
         INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
@@ -490,7 +490,7 @@ public final class FSImageFormatPBINode {
       INodeMap inodesMap = fsn.dir.getINodeMap();
 
       INodeSection.Builder b = INodeSection.newBuilder()
-          .setLastInodeId(fsn.getLastInodeId()).setNumInodes(inodesMap.size());
+          .setLastInodeId(fsn.dir.getLastInodeId()).setNumInodes(inodesMap.size());
       INodeSection s = b.build();
       s.writeDelimitedTo(out);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/185e0c7b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
index 16636a2..1c22ee9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
@@ -120,7 +120,7 @@ public class FSImageSerialization {
     byte[] name = readBytes(in);
     long inodeId = NameNodeLayoutVersion.supports(
         LayoutVersion.Feature.ADD_INODE_ID, imgVersion) ? in.readLong()
-        : fsNamesys.allocateNewInodeId();
+        : fsNamesys.dir.allocateNewInodeId();
     short blockReplication = in.readShort();
     long modificationTime = in.readLong();
     long preferredBlockSize = in.readLong();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/185e0c7b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 47d6455..a6e88c6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -529,8 +529,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * Whether the namenode is in the middle of starting the active service
    */
   private volatile boolean startingActiveService = false;
-    
-  private INodeId inodeId;
 
   private final RetryCache retryCache;
 
@@ -595,32 +593,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   /**
-   * Set the last allocated inode id when fsimage or editlog is loaded. 
-   */
-  public void resetLastInodeId(long newValue) throws IOException {
-    try {
-      inodeId.skipTo(newValue);
-    } catch(IllegalStateException ise) {
-      throw new IOException(ise);
-    }
-  }
-
-  /** Should only be used for tests to reset to any value */
-  void resetLastInodeIdWithoutChecking(long newValue) {
-    inodeId.setCurrentValue(newValue);
-  }
-  
-  /** @return the last inode ID. */
-  public long getLastInodeId() {
-    return inodeId.getCurrentValue();
-  }
-
-  /** Allocate a new inode ID. */
-  public long allocateNewInodeId() {
-    return inodeId.nextValue();
-  }
-  
-  /**
    * Clear all loaded data
    */
   void clear() {
@@ -628,7 +600,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     dtSecretManager.reset();
     blockIdManager.clear();
     leaseManager.removeAllLeases();
-    inodeId.setCurrentValue(INodeId.LAST_RESERVED_ID);
     snapshotManager.clearSnapshottableDirs();
     cacheManager.clear();
     setImageLoaded(false);
@@ -852,8 +823,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       this.editLogRollerInterval = conf.getInt(
           DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS,
           DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT);
-      this.inodeId = new INodeId();
-      
+
       this.lazyPersistFileScrubIntervalSec = conf.getInt(
           DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
           DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT);
@@ -2082,7 +2052,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkNameNodeSafeMode("Cannot create symlink " + link);
       link = dir.resolvePath(pc, link, pathComponents);
       if (!createParent) {
-        verifyParentDir(link);
+        dir.verifyParentDir(link);
       }
       if (!dir.isValidToCreate(link)) {
         throw new IOException("failed to create link " + link 
@@ -2258,25 +2228,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   /**
-   * Verify that parent directory of src exists.
-   */
-  private void verifyParentDir(String src) throws FileNotFoundException,
-      ParentNotDirectoryException, UnresolvedLinkException {
-    assert hasReadLock();
-    Path parent = new Path(src).getParent();
-    if (parent != null) {
-      final INode parentNode = dir.getINode(parent.toString());
-      if (parentNode == null) {
-        throw new FileNotFoundException("Parent directory doesn't exist: "
-            + parent);
-      } else if (!parentNode.isDirectory() && !parentNode.isSymlink()) {
-        throw new ParentNotDirectoryException("Parent path is not a directory: "
-            + parent);
-      }
-    }
-  }
-
-  /**
    * If the file is within an encryption zone, select the appropriate 
    * CryptoProtocolVersion from the list provided by the client. Since the
    * client may be newer, we need to handle unknown versions.
@@ -2554,7 +2505,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
 
     if (!createParent) {
-      verifyParentDir(src);
+      dir.verifyParentDir(src);
     }
 
     try {
@@ -2586,8 +2537,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
       // Always do an implicit mkdirs for parent directory tree.
       Path parent = new Path(src).getParent();
-      if (parent != null && mkdirsRecursively(parent.toString(),
-              permissions, true, now())) {
+      if (parent != null && FSDirMkdirOp.mkdirsRecursively(dir,
+          parent.toString(), permissions, true, now())) {
         newNode = dir.addFile(src, permissions, replication, blockSize,
                               holder, clientMachine);
       }
@@ -3875,186 +3826,22 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * Create all the necessary directories
    */
   boolean mkdirs(String src, PermissionStatus permissions,
-      boolean createParent) throws IOException, UnresolvedLinkException {
-    boolean ret = false;
-    try {
-      ret = mkdirsInt(src, permissions, createParent);
-    } catch (AccessControlException e) {
-      logAuditEvent(false, "mkdirs", src);
-      throw e;
-    }
-    return ret;
-  }
-
-  private boolean mkdirsInt(final String srcArg, PermissionStatus permissions,
-      boolean createParent) throws IOException, UnresolvedLinkException {
-    String src = srcArg;
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
-    }
-    if (!DFSUtil.isValidName(src)) {
-      throw new InvalidPathException(src);
-    }
-    FSPermissionChecker pc = getPermissionChecker();
+      boolean createParent) throws IOException {
+    HdfsFileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    HdfsFileStatus resultingStat = null;
-    boolean status = false;
     writeLock();
     try {
-      checkOperation(OperationCategory.WRITE);   
+      checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot create directory " + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-      status = mkdirsInternal(pc, src, permissions, createParent);
-      if (status) {
-        resultingStat = getAuditFileInfo(src, false);
-      }
+      auditStat = FSDirMkdirOp.mkdirs(this, src, permissions, createParent);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "mkdirs", src);
+      throw e;
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (status) {
-      logAuditEvent(true, "mkdirs", srcArg, null, resultingStat);
-    }
-    return status;
-  }
-    
-  /**
-   * Create all the necessary directories
-   */
-  private boolean mkdirsInternal(FSPermissionChecker pc, String src,
-      PermissionStatus permissions, boolean createParent) 
-      throws IOException, UnresolvedLinkException {
-    assert hasWriteLock();
-    if (isPermissionEnabled) {
-      checkTraverse(pc, src);
-    }
-    if (dir.isDirMutable(src)) {
-      // all the users of mkdirs() are used to expect 'true' even if
-      // a new directory is not created.
-      return true;
-    }
-    if (isPermissionEnabled) {
-      checkAncestorAccess(pc, src, FsAction.WRITE);
-    }
-    if (!createParent) {
-      verifyParentDir(src);
-    }
-
-    // validate that we have enough inodes. This is, at best, a 
-    // heuristic because the mkdirs() operation might need to 
-    // create multiple inodes.
-    checkFsObjectLimit();
-
-    if (!mkdirsRecursively(src, permissions, false, now())) {
-      throw new IOException("Failed to create directory: " + src);
-    }
-    return true;
-  }
-
-  /**
-   * Create a directory
-   * If ancestor directories do not exist, automatically create them.
-
-   * @param src string representation of the path to the directory
-   * @param permissions the permission of the directory
-   * @param inheritPermission if the permission of the directory should inherit
-   *                          from its parent or not. u+wx is implicitly added to
-   *                          the automatically created directories, and to the
-   *                          given directory if inheritPermission is true
-   * @param now creation time
-   * @return true if the operation succeeds false otherwise
-   * @throws QuotaExceededException if directory creation violates
-   *                                any quota limit
-   * @throws UnresolvedLinkException if a symlink is encountered in src.
-   * @throws SnapshotAccessControlException if path is in RO snapshot
-   */
-  private boolean mkdirsRecursively(String src, PermissionStatus permissions,
-                 boolean inheritPermission, long now)
-          throws FileAlreadyExistsException, QuotaExceededException,
-                 UnresolvedLinkException, SnapshotAccessControlException,
-                 AclException {
-    src = FSDirectory.normalizePath(src);
-    String[] names = INode.getPathNames(src);
-    byte[][] components = INode.getPathComponents(names);
-    final int lastInodeIndex = components.length - 1;
-
-    dir.writeLock();
-    try {
-      INodesInPath iip = dir.getExistingPathINodes(components);
-      if (iip.isSnapshot()) {
-        throw new SnapshotAccessControlException(
-                "Modification on RO snapshot is disallowed");
-      }
-      INode[] inodes = iip.getINodes();
-
-      // find the index of the first null in inodes[]
-      StringBuilder pathbuilder = new StringBuilder();
-      int i = 1;
-      for(; i < inodes.length && inodes[i] != null; i++) {
-        pathbuilder.append(Path.SEPARATOR).append(names[i]);
-        if (!inodes[i].isDirectory()) {
-          throw new FileAlreadyExistsException(
-                  "Parent path is not a directory: "
-                  + pathbuilder + " "+inodes[i].getLocalName());
-        }
-      }
-
-      // default to creating parent dirs with the given perms
-      PermissionStatus parentPermissions = permissions;
-
-      // if not inheriting and it's the last inode, there's no use in
-      // computing perms that won't be used
-      if (inheritPermission || (i < lastInodeIndex)) {
-        // if inheriting (ie. creating a file or symlink), use the parent dir,
-        // else the supplied permissions
-        // NOTE: the permissions of the auto-created directories violate posix
-        FsPermission parentFsPerm = inheritPermission
-                ? inodes[i-1].getFsPermission() : permissions.getPermission();
-
-        // ensure that the permissions allow user write+execute
-        if (!parentFsPerm.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
-          parentFsPerm = new FsPermission(
-                  parentFsPerm.getUserAction().or(FsAction.WRITE_EXECUTE),
-                  parentFsPerm.getGroupAction(),
-                  parentFsPerm.getOtherAction()
-          );
-        }
-
-        if (!parentPermissions.getPermission().equals(parentFsPerm)) {
-          parentPermissions = new PermissionStatus(
-                  parentPermissions.getUserName(),
-                  parentPermissions.getGroupName(),
-                  parentFsPerm
-          );
-          // when inheriting, use same perms for entire path
-          if (inheritPermission) permissions = parentPermissions;
-        }
-      }
-
-      // create directories beginning from the first null index
-      for(; i < inodes.length; i++) {
-        pathbuilder.append(Path.SEPARATOR).append(names[i]);
-        dir.unprotectedMkdir(allocateNewInodeId(), iip, i, components[i],
-                (i < lastInodeIndex) ? parentPermissions : permissions, null,
-                now);
-        if (inodes[i] == null) {
-          return false;
-        }
-        // Directory creation also count towards FilesCreated
-        // to match count of FilesDeleted metric.
-        NameNode.getNameNodeMetrics().incrFilesCreated();
-
-        final String cur = pathbuilder.toString();
-        getEditLog().logMkDir(cur, inodes[i]);
-        if(NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug(
-                  "mkdirs: created directory " + cur);
-        }
-      }
-    } finally {
-      dir.writeUnlock();
-    }
+    logAuditEvent(true, "mkdirs", src, null, auditStat);
     return true;
   }
 
@@ -4763,12 +4550,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final long modTime = now();
     if (createParent) {
       final String parent = new Path(path).getParent().toString();
-      if (!mkdirsRecursively(parent, dirPerms, true, modTime)) {
+      if (!FSDirMkdirOp.mkdirsRecursively(dir, parent, dirPerms, true,
+          modTime)) {
         return null;
       }
     }
     final String userName = dirPerms.getUserName();
-    long id = allocateNewInodeId();
+    long id = dir.allocateNewInodeId();
     INodeSymlink newNode = dir.addSymlink(id, path, target, modTime, modTime,
             new PermissionStatus(userName, null, FsPermission.getDefault()));
     if (newNode == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/185e0c7b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 3814ffc..a51c42c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -197,13 +197,11 @@ public class DFSTestUtil {
         logicalName, "nn2"), "127.0.0.1:12346");
   }
 
-  public static void setEditLogForTesting(NameNode nn, FSEditLog newLog) {
-    Whitebox.setInternalState(nn.getFSImage(), "editLog", newLog);
-    Whitebox.setInternalState(nn.getNamesystem().getFSDirectory(), "editLog",
-        newLog);
+  public static void setEditLogForTesting(FSNamesystem fsn, FSEditLog newLog) {
+    Whitebox.setInternalState(fsn.getFSImage(), "editLog", newLog);
+    Whitebox.setInternalState(fsn.getFSDirectory(), "editLog", newLog);
   }
 
-
   /** class MyFile contains enough information to recreate the contents of
    * a single file.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/185e0c7b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRenameWhileOpen.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRenameWhileOpen.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRenameWhileOpen.java
index 2ee72e8..6ed675d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRenameWhileOpen.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRenameWhileOpen.java
@@ -77,7 +77,7 @@ public class TestRenameWhileOpen {
       FSEditLog spyLog =
           spy(cluster.getNameNode().getFSImage().getEditLog());
       doNothing().when(spyLog).endCurrentLogSegment(Mockito.anyBoolean());
-      DFSTestUtil.setEditLogForTesting(cluster.getNameNode(), spyLog);
+      DFSTestUtil.setEditLogForTesting(cluster.getNamesystem(), spyLog);
 
       final int nnport = cluster.getNameNodePort();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/185e0c7b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index e3cd918..61e7f14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -188,7 +188,7 @@ public class NameNodeAdapter {
   
   public static FSEditLog spyOnEditLog(NameNode nn) {
     FSEditLog spyEditLog = spy(nn.getNamesystem().getFSImage().getEditLog());
-    DFSTestUtil.setEditLogForTesting(nn, spyEditLog);
+    DFSTestUtil.setEditLogForTesting(nn.getNamesystem(), spyEditLog);
     EditLogTailer tailer = nn.getNamesystem().getEditLogTailer();
     if (tailer != null) {
       tailer.setEditLog(spyEditLog);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/185e0c7b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
index fef075e..2a8f289 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
@@ -204,7 +204,7 @@ public class TestEditLog {
       FSEditLog editLog = namesystem.getEditLog();
 
       for (int i = 0; i < numTransactions; i++) {
-        INodeFile inode = new INodeFile(namesystem.allocateNewInodeId(), null,
+        INodeFile inode = new INodeFile(namesystem.dir.allocateNewInodeId(), null,
             p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize);
         inode.toUnderConstruction("", "");
 
@@ -375,7 +375,7 @@ public class TestEditLog {
       // Remember the current lastInodeId and will reset it back to test
       // loading editlog segments.The transactions in the following allocate new
       // inode id to write to editlogs but doesn't create ionde in namespace
-      long originalLastInodeId = namesystem.getLastInodeId();
+      long originalLastInodeId = namesystem.dir.getLastInodeId();
       
       // Create threads and make them run transactions concurrently.
       Thread threadId[] = new Thread[NUM_THREADS];
@@ -409,7 +409,7 @@ public class TestEditLog {
       // If there were any corruptions, it is likely that the reading in
       // of these transactions will throw an exception.
       //
-      namesystem.resetLastInodeIdWithoutChecking(originalLastInodeId);
+      namesystem.dir.resetLastInodeIdWithoutChecking(originalLastInodeId);
       for (Iterator<StorageDirectory> it = 
               fsimage.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
         FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/185e0c7b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
index 6859fdc..8b3c7ae4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
@@ -458,8 +458,8 @@ public class TestEditLogRace {
     try {
       FSImage fsimage = namesystem.getFSImage();
       FSEditLog editLog = spy(fsimage.getEditLog());
-      fsimage.editLog = editLog;
-      
+      DFSTestUtil.setEditLogForTesting(namesystem, editLog);
+
       final AtomicReference<Throwable> deferredException =
           new AtomicReference<Throwable>();
       final CountDownLatch waitToEnterSync = new CountDownLatch(1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/185e0c7b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
index 4221ad5..f79277a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
@@ -389,7 +389,7 @@ public class TestINodeFile {
       cluster.waitActive();
 
       FSNamesystem fsn = cluster.getNamesystem();
-      long lastId = fsn.getLastInodeId();
+      long lastId = fsn.dir.getLastInodeId();
 
       // Ensure root has the correct inode ID
       // Last inode ID should be root inode ID and inode map size should be 1
@@ -404,14 +404,14 @@ public class TestINodeFile {
       FileSystem fs = cluster.getFileSystem();
       Path path = new Path("/test1");
       assertTrue(fs.mkdirs(path));
-      assertEquals(++expectedLastInodeId, fsn.getLastInodeId());
+      assertEquals(++expectedLastInodeId, fsn.dir.getLastInodeId());
       assertEquals(++inodeCount, fsn.dir.getInodeMapSize());
 
       // Create a file
       // Last inode ID and inode map size should increase by 1
       NamenodeProtocols nnrpc = cluster.getNameNodeRpc();
       DFSTestUtil.createFile(fs, new Path("/test1/file"), 1024, (short) 1, 0);
-      assertEquals(++expectedLastInodeId, fsn.getLastInodeId());
+      assertEquals(++expectedLastInodeId, fsn.dir.getLastInodeId());
       assertEquals(++inodeCount, fsn.dir.getInodeMapSize());
       
       // Ensure right inode ID is returned in file status
@@ -422,7 +422,7 @@ public class TestINodeFile {
       // Last inode ID and inode map size should not change
       Path renamedPath = new Path("/test2");
       assertTrue(fs.rename(path, renamedPath));
-      assertEquals(expectedLastInodeId, fsn.getLastInodeId());
+      assertEquals(expectedLastInodeId, fsn.dir.getLastInodeId());
       assertEquals(inodeCount, fsn.dir.getInodeMapSize());
       
       // Delete test2/file and test2 and ensure inode map size decreases
@@ -439,12 +439,12 @@ public class TestINodeFile {
       inodeCount += 3; // test1, file1 and file2 are created
       expectedLastInodeId += 3;
       assertEquals(inodeCount, fsn.dir.getInodeMapSize());
-      assertEquals(expectedLastInodeId, fsn.getLastInodeId());
+      assertEquals(expectedLastInodeId, fsn.dir.getLastInodeId());
       // Concat the /test1/file1 /test1/file2 into /test1/file2
       nnrpc.concat(file2, new String[] {file1});
       inodeCount--; // file1 and file2 are concatenated to file2
       assertEquals(inodeCount, fsn.dir.getInodeMapSize());
-      assertEquals(expectedLastInodeId, fsn.getLastInodeId());
+      assertEquals(expectedLastInodeId, fsn.dir.getLastInodeId());
       assertTrue(fs.delete(new Path("/test1"), true));
       inodeCount -= 2; // test1 and file2 is deleted
       assertEquals(inodeCount, fsn.dir.getInodeMapSize());
@@ -453,14 +453,14 @@ public class TestINodeFile {
       cluster.restartNameNode();
       cluster.waitActive();
       fsn = cluster.getNamesystem();
-      assertEquals(expectedLastInodeId, fsn.getLastInodeId());
+      assertEquals(expectedLastInodeId, fsn.dir.getLastInodeId());
       assertEquals(inodeCount, fsn.dir.getInodeMapSize());
 
       // Create two inodes test2 and test2/file2
       DFSTestUtil.createFile(fs, new Path("/test2/file2"), 1024, (short) 1, 0);
       expectedLastInodeId += 2;
       inodeCount += 2;
-      assertEquals(expectedLastInodeId, fsn.getLastInodeId());
+      assertEquals(expectedLastInodeId, fsn.dir.getLastInodeId());
       assertEquals(inodeCount, fsn.dir.getInodeMapSize());
 
       // create /test3, and /test3/file.
@@ -469,7 +469,7 @@ public class TestINodeFile {
       assertTrue(outStream != null);
       expectedLastInodeId += 2;
       inodeCount += 2;
-      assertEquals(expectedLastInodeId, fsn.getLastInodeId());
+      assertEquals(expectedLastInodeId, fsn.dir.getLastInodeId());
       assertEquals(inodeCount, fsn.dir.getInodeMapSize());
 
       // Apply editlogs to fsimage, ensure inodeUnderConstruction is handled
@@ -483,7 +483,7 @@ public class TestINodeFile {
       cluster.restartNameNode();
       cluster.waitActive();
       fsn = cluster.getNamesystem();
-      assertEquals(expectedLastInodeId, fsn.getLastInodeId());
+      assertEquals(expectedLastInodeId, fsn.dir.getLastInodeId());
       assertEquals(inodeCount, fsn.dir.getInodeMapSize());
     } finally {
       if (cluster != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/185e0c7b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
index c19c469..0265a4d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
@@ -541,7 +541,7 @@ public class TestNameNodeRecovery {
         FSEditLog spyLog =
             spy(cluster.getNameNode().getFSImage().getEditLog());
         doNothing().when(spyLog).endCurrentLogSegment(true);
-        DFSTestUtil.setEditLogForTesting(cluster.getNameNode(), spyLog);
+        DFSTestUtil.setEditLogForTesting(cluster.getNamesystem(), spyLog);
       }
       fileSys = cluster.getFileSystem();
       final FSNamesystem namesystem = cluster.getNamesystem();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/185e0c7b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java
index f1444a4..a8d350d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java
@@ -142,7 +142,7 @@ public class TestEditLogsDuringFailover {
       File sharedDir = new File(sharedUri.getPath(), "current");
       FSNamesystem fsn = cluster.getNamesystem(0);
       FSImageTestUtil.createAbortedLogWithMkdirs(sharedDir, NUM_DIRS_IN_LOG, 1,
-          fsn.getLastInodeId() + 1);
+          fsn.getFSDirectory().getLastInodeId() + 1);
       
       assertEditFiles(Collections.singletonList(sharedUri),
           NNStorage.getInProgressEditsFileName(1));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/185e0c7b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
index 2f9b945..1d75c30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
@@ -251,7 +251,7 @@ public class TestStandbyCheckpoints {
         "testCheckpointCancellation-tmp");
     FSNamesystem fsn = cluster.getNamesystem(0);
     FSImageTestUtil.createAbortedLogWithMkdirs(tmpDir, NUM_DIRS_IN_LOG, 3,
-        fsn.getLastInodeId() + 1);
+        fsn.getFSDirectory().getLastInodeId() + 1);
     String fname = NNStorage.getInProgressEditsFileName(3); 
     new File(tmpDir, fname).renameTo(new File(sharedDir, fname));