You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC

svn commit: r1619012 [16/35] - in /hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Aug 19 23:49:39 2014
@@ -24,12 +24,13 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
+import java.util.ListIterator;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -39,9 +40,10 @@ import org.apache.hadoop.fs.ParentNotDir
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -53,6 +55,7 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
+import org.apache.hadoop.hdfs.protocol.FsAclPermission;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
@@ -64,12 +67,11 @@ import org.apache.hadoop.hdfs.protocol.S
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root;
 import org.apache.hadoop.hdfs.util.ByteArray;
@@ -78,18 +80,18 @@ import org.apache.hadoop.hdfs.util.ReadO
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
-/*************************************************
- * FSDirectory stores the filesystem directory state.
- * It handles writing/loading values to disk, and logging
- * changes as we go.
- *
- * It keeps the filename->blockset mapping always-current
- * and logged to disk.
- * 
- *************************************************/
+/**
+ * Both FSDirectory and FSNamesystem manage the state of the namespace.
+ * FSDirectory is a pure in-memory data structure, all of whose operations
+ * happen entirely in memory. In contrast, FSNamesystem persists the operations
+ * to the disk.
+ * @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem
+ **/
+@InterfaceAudience.Private
 public class FSDirectory implements Closeable {
-  private static INodeDirectorySnapshottable createRoot(FSNamesystem namesystem) {
+  private static INodeDirectory createRoot(FSNamesystem namesystem) {
     final INodeDirectory r = new INodeDirectory(
         INodeId.ROOT_INODE_ID,
         INodeDirectory.ROOT_NAME,
@@ -98,9 +100,9 @@ public class FSDirectory implements Clos
     r.addDirectoryWithQuotaFeature(
         DirectoryWithQuotaFeature.DEFAULT_NAMESPACE_QUOTA,
         DirectoryWithQuotaFeature.DEFAULT_DISKSPACE_QUOTA);
-    final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(r);
-    s.setSnapshotQuota(0);
-    return s;
+    r.addSnapshottableFeature();
+    r.setSnapshotQuota(0);
+    return r;
   }
 
   @VisibleForTesting
@@ -114,9 +116,7 @@ public class FSDirectory implements Clos
   public final static byte[] DOT_INODES = 
       DFSUtil.string2Bytes(DOT_INODES_STRING);
   INodeDirectory rootDir;
-  FSImage fsImage;  
   private final FSNamesystem namesystem;
-  private volatile boolean ready = false;
   private volatile boolean skipQuotaCheck = false; //skip while consuming edits
   private final int maxComponentLength;
   private final int maxDirItems;
@@ -124,10 +124,10 @@ public class FSDirectory implements Clos
   private final int contentCountLimit; // max content summary counts per run
   private final INodeMap inodeMap; // Synchronized by dirLock
   private long yieldCount = 0; // keep track of lock yield count.
+  private final int inodeXAttrsLimit; //inode xattrs max limit
 
   // lock to protect the directory and BlockMap
   private final ReentrantReadWriteLock dirLock;
-  private final Condition cond;
 
   // utility methods to acquire and release read lock and write lock
   void readLock() {
@@ -168,12 +168,10 @@ public class FSDirectory implements Clos
    */
   private final NameCache<ByteArray> nameCache;
 
-  FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
+  FSDirectory(FSNamesystem ns, Configuration conf) {
     this.dirLock = new ReentrantReadWriteLock(true); // fair
-    this.cond = dirLock.writeLock().newCondition();
     rootDir = createRoot(ns);
     inodeMap = INodeMap.newInstance(rootDir);
-    this.fsImage = fsImage;
     int configuredLimit = conf.getInt(
         DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
     this.lsLimit = configuredLimit>0 ?
@@ -189,6 +187,12 @@ public class FSDirectory implements Clos
     this.maxDirItems = conf.getInt(
         DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY,
         DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT);
+    this.inodeXAttrsLimit = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_DEFAULT);
+    Preconditions.checkArgument(this.inodeXAttrsLimit >= 0,
+        "Cannot set a negative limit on the number of xattrs per inode (%s).",
+        DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_KEY);
     // We need a maximum maximum because by default, PB limits message sizes
     // to 64MB. This means we can only store approximately 6.7 million entries
     // per directory, but let's use 6.4 million for some safety.
@@ -221,68 +225,19 @@ public class FSDirectory implements Clos
   }
 
   /**
-   * Notify that loading of this FSDirectory is complete, and
-   * it is ready for use 
+   * Shutdown the filestore
    */
-  void imageLoadComplete() {
-    Preconditions.checkState(!ready, "FSDirectory already loaded");
-    setReady();
-  }
+  @Override
+  public void close() throws IOException {}
 
-  void setReady() {
-    if(ready) return;
+  void markNameCacheInitialized() {
     writeLock();
     try {
-      setReady(true);
-      this.nameCache.initialized();
-      cond.signalAll();
+      nameCache.initialized();
     } finally {
       writeUnlock();
     }
   }
-  
-  //This is for testing purposes only
-  @VisibleForTesting
-  boolean isReady() {
-    return ready;
-  }
-
-  // exposed for unit tests
-  protected void setReady(boolean flag) {
-    ready = flag;
-  }
-
-  private void incrDeletedFileCount(long count) {
-    if (getFSNamesystem() != null)
-      NameNode.getNameNodeMetrics().incrFilesDeleted(count);
-  }
-    
-  /**
-   * Shutdown the filestore
-   */
-  @Override
-  public void close() throws IOException {
-    fsImage.close();
-  }
-
-  /**
-   * Block until the object is ready to be used.
-   */
-  void waitForReady() {
-    if (!ready) {
-      writeLock();
-      try {
-        while (!ready) {
-          try {
-            cond.await(5000, TimeUnit.MILLISECONDS);
-          } catch (InterruptedException ie) {
-          }
-        }
-      } finally {
-        writeUnlock();
-      }
-    }
-  }
 
   /** Enable quota verification */
   void enableQuotaChecks() {
@@ -302,29 +257,16 @@ public class FSDirectory implements Clos
    * @throws SnapshotAccessControlException 
    */
   INodeFile addFile(String path, PermissionStatus permissions,
-      short replication, long preferredBlockSize, String clientName,
-      String clientMachine, DatanodeDescriptor clientNode)
+                    short replication, long preferredBlockSize,
+                    String clientName, String clientMachine)
     throws FileAlreadyExistsException, QuotaExceededException,
       UnresolvedLinkException, SnapshotAccessControlException, AclException {
-    waitForReady();
 
-    // Always do an implicit mkdirs for parent directory tree.
     long modTime = now();
-    
-    Path parent = new Path(path).getParent();
-    if (parent == null) {
-      // Trying to add "/" as a file - this path has no
-      // parent -- avoids an NPE below.
-      return null;
-    }
-    
-    if (!mkdirs(parent.toString(), permissions, true, modTime)) {
-      return null;
-    }
     INodeFile newNode = new INodeFile(namesystem.allocateNewInodeId(), null,
         permissions, modTime, modTime, BlockInfo.EMPTY_ARRAY, replication,
         preferredBlockSize);
-    newNode.toUnderConstruction(clientName, clientMachine, clientNode);
+    newNode.toUnderConstruction(clientName, clientMachine);
 
     boolean added = false;
     writeLock();
@@ -348,6 +290,7 @@ public class FSDirectory implements Clos
                             String path, 
                             PermissionStatus permissions,
                             List<AclEntry> aclEntries,
+                            List<XAttr> xAttrs,
                             short replication,
                             long modificationTime,
                             long atime,
@@ -361,7 +304,7 @@ public class FSDirectory implements Clos
       newNode = new INodeFile(id, null, permissions, modificationTime,
           modificationTime, BlockInfo.EMPTY_ARRAY, replication,
           preferredBlockSize);
-      newNode.toUnderConstruction(clientName, clientMachine, null);
+      newNode.toUnderConstruction(clientName, clientMachine);
 
     } else {
       newNode = new INodeFile(id, null, permissions, modificationTime, atime,
@@ -374,6 +317,10 @@ public class FSDirectory implements Clos
           AclStorage.updateINodeAcl(newNode, aclEntries,
             Snapshot.CURRENT_STATE_ID);
         }
+        if (xAttrs != null) {
+          XAttrStorage.updateINodeXAttrs(newNode, xAttrs,
+              Snapshot.CURRENT_STATE_ID);
+        }
         return newNode;
       }
     } catch (IOException e) {
@@ -391,8 +338,6 @@ public class FSDirectory implements Clos
    */
   BlockInfo addBlock(String path, INodesInPath inodesInPath, Block block,
       DatanodeStorageInfo[] targets) throws IOException {
-    waitForReady();
-
     writeLock();
     try {
       final INodeFile fileINode = inodesInPath.getLastINode().asFile();
@@ -424,73 +369,12 @@ public class FSDirectory implements Clos
   }
 
   /**
-   * Persist the block list for the inode.
-   */
-  void persistBlocks(String path, INodeFile file, boolean logRetryCache) {
-    Preconditions.checkArgument(file.isUnderConstruction());
-    waitForReady();
-
-    writeLock();
-    try {
-      fsImage.getEditLog().logUpdateBlocks(path, file, logRetryCache);
-      if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("DIR* FSDirectory.persistBlocks: "
-            +path+" with "+ file.getBlocks().length 
-            +" blocks is persisted to the file system");
-      }
-    } finally {
-      writeUnlock();
-    }
-  }
-  
-  /**
-   * Persist the new block (the last block of the given file).
-   */
-  void persistNewBlock(String path, INodeFile file) {
-    Preconditions.checkArgument(file.isUnderConstruction());
-    waitForReady();
-
-    writeLock();
-    try {
-      fsImage.getEditLog().logAddBlock(path, file);
-    } finally {
-      writeUnlock();
-    }
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.persistNewBlock: "
-          + path + " with new block " + file.getLastBlock().toString()
-          + ", current total block count is " + file.getBlocks().length);
-    }
-  }
-  
-  /**
-   * Close file.
-   */
-  void closeFile(String path, INodeFile file) {
-    waitForReady();
-    writeLock();
-    try {
-      // file is closed
-      fsImage.getEditLog().logCloseFile(path, file);
-      if (NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
-            +path+" with "+ file.getBlocks().length 
-            +" blocks is persisted to the file system");
-      }
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  /**
    * Remove a block from the file.
    * @return Whether the block exists in the corresponding file
    */
   boolean removeBlock(String path, INodeFile fileNode, Block block)
       throws IOException {
     Preconditions.checkArgument(fileNode.isUnderConstruction());
-    waitForReady();
-
     writeLock();
     try {
       return unprotectedRemoveBlock(path, fileNode, block);
@@ -516,7 +400,7 @@ public class FSDirectory implements Clos
     }
 
     // update space consumed
-    final INodesInPath iip = rootDir.getINodesInPath4Write(path, true);
+    final INodesInPath iip = getINodesInPath4Write(path, true);
     updateCount(iip, 0, -fileNode.getBlockDiskspace(), true);
     return true;
   }
@@ -524,33 +408,30 @@ public class FSDirectory implements Clos
   /**
    * @throws SnapshotAccessControlException 
    * @see #unprotectedRenameTo(String, String, long)
-   * @deprecated Use {@link #renameTo(String, String, Rename...)} instead.
+   * @deprecated Use {@link #renameTo(String, String, boolean, Rename...)}
    */
   @Deprecated
-  boolean renameTo(String src, String dst, boolean logRetryCache) 
+  boolean renameTo(String src, String dst, long mtime)
       throws QuotaExceededException, UnresolvedLinkException, 
       FileAlreadyExistsException, SnapshotAccessControlException, IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
           +src+" to "+dst);
     }
-    waitForReady();
-    long now = now();
     writeLock();
     try {
-      if (!unprotectedRenameTo(src, dst, now))
+      if (!unprotectedRenameTo(src, dst, mtime))
         return false;
     } finally {
       writeUnlock();
     }
-    fsImage.getEditLog().logRename(src, dst, now, logRetryCache);
     return true;
   }
 
   /**
    * @see #unprotectedRenameTo(String, String, long, Options.Rename...)
    */
-  void renameTo(String src, String dst, boolean logRetryCache, 
+  void renameTo(String src, String dst, long mtime,
       Options.Rename... options)
       throws FileAlreadyExistsException, FileNotFoundException,
       ParentNotDirectoryException, QuotaExceededException,
@@ -559,17 +440,14 @@ public class FSDirectory implements Clos
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src
           + " to " + dst);
     }
-    waitForReady();
-    long now = now();
     writeLock();
     try {
-      if (unprotectedRenameTo(src, dst, now, options)) {
-        incrDeletedFileCount(1);
+      if (unprotectedRenameTo(src, dst, mtime, options)) {
+        namesystem.incrDeletedFileCount(1);
       }
     } finally {
       writeUnlock();
     }
-    fsImage.getEditLog().logRename(src, dst, now, logRetryCache, options);
   }
 
   /**
@@ -581,64 +459,39 @@ public class FSDirectory implements Clos
    * @throws QuotaExceededException if the operation violates any quota limit
    * @throws FileAlreadyExistsException if the src is a symlink that points to dst
    * @throws SnapshotAccessControlException if path is in RO snapshot
-   * @deprecated See {@link #renameTo(String, String)}
+   * @deprecated See {@link #renameTo(String, String, boolean, Rename...)}
    */
   @Deprecated
   boolean unprotectedRenameTo(String src, String dst, long timestamp)
     throws QuotaExceededException, UnresolvedLinkException, 
     FileAlreadyExistsException, SnapshotAccessControlException, IOException {
     assert hasWriteLock();
-    INodesInPath srcIIP = rootDir.getINodesInPath4Write(src, false);
+    INodesInPath srcIIP = getINodesInPath4Write(src, false);
     final INode srcInode = srcIIP.getLastINode();
-    
-    // check the validation of the source
-    if (srcInode == null) {
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + "failed to rename " + src + " to " + dst
-          + " because source does not exist");
-      return false;
-    } 
-    if (srcIIP.getINodes().length == 1) {
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          +"failed to rename "+src+" to "+dst+ " because source is the root");
+    try {
+      validateRenameSource(src, srcIIP);
+    } catch (SnapshotException e) {
+      throw e;
+    } catch (IOException ignored) {
       return false;
     }
-    
-    // srcInode and its subtree cannot contain snapshottable directories with
-    // snapshots
-    List<INodeDirectorySnapshottable> snapshottableDirs = 
-        new ArrayList<INodeDirectorySnapshottable>();
-    checkSnapshot(srcInode, snapshottableDirs);
-    
+
     if (isDir(dst)) {
       dst += Path.SEPARATOR + new Path(src).getName();
     }
-    
-    // check the validity of the destination
+
+    // validate the destination
     if (dst.equals(src)) {
       return true;
     }
-    if (srcInode.isSymlink() && 
-        dst.equals(srcInode.asSymlink().getSymlinkString())) {
-      throw new FileAlreadyExistsException(
-          "Cannot rename symlink "+src+" to its target "+dst);
-    }
-    
-    // dst cannot be directory or a file under src
-    if (dst.startsWith(src) && 
-        dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + "failed to rename " + src + " to " + dst
-          + " because destination starts with src");
+
+    try {
+      validateRenameDestination(src, dst, srcInode);
+    } catch (IOException ignored) {
       return false;
     }
-    
-    byte[][] dstComponents = INode.getPathComponents(dst);
-    INodesInPath dstIIP = getExistingPathINodes(dstComponents);
-    if (dstIIP.isSnapshot()) {
-      throw new SnapshotAccessControlException(
-          "Modification on RO snapshot is disallowed");
-    }
+
+    INodesInPath dstIIP = getINodesInPath4Write(dst, false);
     if (dstIIP.getLastINode() != null) {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
                                    +"failed to rename "+src+" to "+dst+ 
@@ -656,42 +509,10 @@ public class FSDirectory implements Clos
     // Ensure dst has quota to accommodate rename
     verifyFsLimitsForRename(srcIIP, dstIIP);
     verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
-    
+
+    RenameOperation tx = new RenameOperation(src, dst, srcIIP, dstIIP);
+
     boolean added = false;
-    INode srcChild = srcIIP.getLastINode();
-    final byte[] srcChildName = srcChild.getLocalNameBytes();
-    final boolean isSrcInSnapshot = srcChild.isInLatestSnapshot(
-        srcIIP.getLatestSnapshotId());
-    final boolean srcChildIsReference = srcChild.isReference();
-    
-    // Record the snapshot on srcChild. After the rename, before any new 
-    // snapshot is taken on the dst tree, changes will be recorded in the latest
-    // snapshot of the src tree.
-    if (isSrcInSnapshot) {
-      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshotId());
-      srcIIP.setLastINode(srcChild);
-    }
-    
-    // check srcChild for reference
-    final INodeReference.WithCount withCount;
-    Quota.Counts oldSrcCounts = Quota.Counts.newInstance();
-    int srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference()
-        .getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
-    if (isSrcInSnapshot) {
-      final INodeReference.WithName withName = 
-          srcIIP.getINode(-2).asDirectory().replaceChild4ReferenceWithName(
-              srcChild, srcIIP.getLatestSnapshotId()); 
-      withCount = (INodeReference.WithCount) withName.getReferredINode();
-      srcChild = withName;
-      srcIIP.setLastINode(srcChild);
-      // get the counts before rename
-      withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
-    } else if (srcChildIsReference) {
-      // srcChild is reference but srcChild is not in latest snapshot
-      withCount = (WithCount) srcChild.asReference().getReferredINode();
-    } else {
-      withCount = null;
-    }
 
     try {
       // remove src
@@ -702,87 +523,22 @@ public class FSDirectory implements Clos
             + " because the source can not be removed");
         return false;
       }
-      
-      if (dstParent.getParent() == null) {
-        // src and dst file/dir are in the same directory, and the dstParent has
-        // been replaced when we removed the src. Refresh the dstIIP and
-        // dstParent.
-        dstIIP = getExistingPathINodes(dstComponents);
-        dstParent = dstIIP.getINode(-2);
-      }
-      
-      // add src to the destination
-      
-      srcChild = srcIIP.getLastINode();
-      final byte[] dstChildName = dstIIP.getLastLocalName();
-      final INode toDst;
-      if (withCount == null) {
-        srcChild.setLocalName(dstChildName);
-        toDst = srcChild;
-      } else {
-        withCount.getReferredINode().setLocalName(dstChildName);
-        int dstSnapshotId = dstIIP.getLatestSnapshotId();
-        final INodeReference.DstReference ref = new INodeReference.DstReference(
-            dstParent.asDirectory(), withCount, dstSnapshotId);
-        toDst = ref;
-      }
-      
-      added = addLastINodeNoQuotaCheck(dstIIP, toDst);
+
+      added = tx.addSourceToDestination();
       if (added) {
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: " 
               + src + " is renamed to " + dst);
         }
-        // update modification time of dst and the parent of src
-        final INode srcParent = srcIIP.getINode(-2);
-        srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshotId());
-        dstParent = dstIIP.getINode(-2); // refresh dstParent
-        dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshotId());
-        // update moved leases with new filename
-        getFSNamesystem().unprotectedChangeLease(src, dst);     
-
-        // update the quota usage in src tree
-        if (isSrcInSnapshot) {
-          // get the counts after rename
-          Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
-              Quota.Counts.newInstance(), false);
-          newSrcCounts.subtract(oldSrcCounts);
-          srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
-              newSrcCounts.get(Quota.DISKSPACE), false);
-        }
+
+        tx.updateMtimeAndLease(timestamp);
+        tx.updateQuotasInSourceTree();
         
         return true;
       }
     } finally {
       if (!added) {
-        final INodeDirectory srcParent = srcIIP.getINode(-2).asDirectory();
-        final INode oldSrcChild = srcChild;
-        // put it back
-        if (withCount == null) {
-          srcChild.setLocalName(srcChildName);
-        } else if (!srcChildIsReference) { // src must be in snapshot
-          // the withCount node will no longer be used thus no need to update
-          // its reference number here
-          final INode originalChild = withCount.getReferredINode();
-          srcChild = originalChild;
-          srcChild.setLocalName(srcChildName);
-        } else {
-          withCount.removeReference(oldSrcChild.asReference());
-          final INodeReference originalRef = new INodeReference.DstReference(
-              srcParent, withCount, srcRefDstSnapshot);
-          srcChild = originalRef;
-          withCount.getReferredINode().setLocalName(srcChildName);
-        }
-        
-        if (isSrcInSnapshot) {
-          // srcParent must have snapshot feature since isSrcInSnapshot is true
-          // and src node has been removed from srcParent 
-          srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild);
-        } else {
-          // original srcChild is not in latest snapshot, we only need to add
-          // the srcChild back
-          addLastINodeNoQuotaCheck(srcIIP, srcChild);
-        }
+        tx.restoreSource();
       }
     }
     NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@@ -805,54 +561,22 @@ public class FSDirectory implements Clos
       FileNotFoundException, ParentNotDirectoryException,
       QuotaExceededException, UnresolvedLinkException, IOException {
     assert hasWriteLock();
-    boolean overwrite = false;
-    if (null != options) {
-      for (Rename option : options) {
-        if (option == Rename.OVERWRITE) {
-          overwrite = true;
-        }
-      }
-    }
-    String error = null;
-    final INodesInPath srcIIP = rootDir.getINodesInPath4Write(src, false);
+    boolean overwrite = options != null && Arrays.asList(options).contains
+            (Rename.OVERWRITE);
+
+    final String error;
+    final INodesInPath srcIIP = getINodesInPath4Write(src, false);
     final INode srcInode = srcIIP.getLastINode();
-    // validate source
-    if (srcInode == null) {
-      error = "rename source " + src + " is not found.";
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new FileNotFoundException(error);
-    }
-    if (srcIIP.getINodes().length == 1) {
-      error = "rename source cannot be the root";
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new IOException(error);
-    }
-    // srcInode and its subtree cannot contain snapshottable directories with
-    // snapshots
-    checkSnapshot(srcInode, null);
-    
+    validateRenameSource(src, srcIIP);
+
     // validate the destination
     if (dst.equals(src)) {
       throw new FileAlreadyExistsException(
           "The source "+src+" and destination "+dst+" are the same");
     }
-    if (srcInode.isSymlink() && 
-        dst.equals(srcInode.asSymlink().getSymlinkString())) {
-      throw new FileAlreadyExistsException(
-          "Cannot rename symlink "+src+" to its target "+dst);
-    }
-    // dst cannot be a directory or a file under src
-    if (dst.startsWith(src) && 
-        dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
-      error = "Rename destination " + dst
-          + " is a directory or file under source " + src;
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new IOException(error);
-    }
-    INodesInPath dstIIP = rootDir.getINodesInPath4Write(dst, false);
+    validateRenameDestination(src, dst, srcInode);
+
+    INodesInPath dstIIP = getINodesInPath4Write(dst, false);
     if (dstIIP.getINodes().length == 1) {
       error = "rename destination cannot be the root";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@@ -861,33 +585,9 @@ public class FSDirectory implements Clos
     }
 
     final INode dstInode = dstIIP.getLastINode();
-    List<INodeDirectorySnapshottable> snapshottableDirs = 
-        new ArrayList<INodeDirectorySnapshottable>();
+    List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
     if (dstInode != null) { // Destination exists
-      // It's OK to rename a file to a symlink and vice versa
-      if (dstInode.isDirectory() != srcInode.isDirectory()) {
-        error = "Source " + src + " and destination " + dst
-            + " must both be directories";
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new IOException(error);
-      }
-      if (!overwrite) { // If destination exists, overwrite flag must be true
-        error = "rename destination " + dst + " already exists";
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new FileAlreadyExistsException(error);
-      }
-      if (dstInode.isDirectory()) {
-        final ReadOnlyList<INode> children = dstInode.asDirectory()
-            .getChildrenList(Snapshot.CURRENT_STATE_ID);
-        if (!children.isEmpty()) {
-          error = "rename destination directory is not empty: " + dst;
-          NameNode.stateChangeLog.warn(
-              "DIR* FSDirectory.unprotectedRenameTo: " + error);
-          throw new IOException(error);
-        }
-      }
+      validateRenameOverwrite(src, dst, overwrite, srcInode, dstInode);
       checkSnapshot(dstInode, snapshottableDirs);
     }
 
@@ -909,40 +609,8 @@ public class FSDirectory implements Clos
     verifyFsLimitsForRename(srcIIP, dstIIP);
     verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
 
-    INode srcChild = srcIIP.getLastINode();
-    final byte[] srcChildName = srcChild.getLocalNameBytes();
-    final boolean isSrcInSnapshot = srcChild.isInLatestSnapshot(
-        srcIIP.getLatestSnapshotId());
-    final boolean srcChildIsReference = srcChild.isReference();
-    
-    // Record the snapshot on srcChild. After the rename, before any new 
-    // snapshot is taken on the dst tree, changes will be recorded in the latest
-    // snapshot of the src tree.
-    if (isSrcInSnapshot) {
-      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshotId());
-      srcIIP.setLastINode(srcChild);
-    }
-    
-    // check srcChild for reference
-    final INodeReference.WithCount withCount;
-    int srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference()
-        .getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
-    Quota.Counts oldSrcCounts = Quota.Counts.newInstance();    
-    if (isSrcInSnapshot) {
-      final INodeReference.WithName withName = srcIIP.getINode(-2).asDirectory()
-          .replaceChild4ReferenceWithName(srcChild, srcIIP.getLatestSnapshotId()); 
-      withCount = (INodeReference.WithCount) withName.getReferredINode();
-      srcChild = withName;
-      srcIIP.setLastINode(srcChild);
-      // get the counts before rename
-      withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
-    } else if (srcChildIsReference) {
-      // srcChild is reference but srcChild is not in latest snapshot
-      withCount = (WithCount) srcChild.asReference().getReferredINode();
-    } else {
-      withCount = null;
-    }
-    
+    RenameOperation tx = new RenameOperation(src, dst, srcIIP, dstIIP);
+
     boolean undoRemoveSrc = true;
     final long removedSrc = removeLastINode(srcIIP);
     if (removedSrc == -1) {
@@ -953,40 +621,19 @@ public class FSDirectory implements Clos
       throw new IOException(error);
     }
     
-    if (dstParent.getParent() == null) {
-      // src and dst file/dir are in the same directory, and the dstParent has
-      // been replaced when we removed the src. Refresh the dstIIP and
-      // dstParent.
-      dstIIP = rootDir.getINodesInPath4Write(dst, false);
-    }
-    
     boolean undoRemoveDst = false;
     INode removedDst = null;
+    long removedNum = 0;
     try {
       if (dstInode != null) { // dst exists remove it
-        if (removeLastINode(dstIIP) != -1) {
+        if ((removedNum = removeLastINode(dstIIP)) != -1) {
           removedDst = dstIIP.getLastINode();
           undoRemoveDst = true;
         }
       }
-      
-      srcChild = srcIIP.getLastINode();
-
-      final byte[] dstChildName = dstIIP.getLastLocalName();
-      final INode toDst;
-      if (withCount == null) {
-        srcChild.setLocalName(dstChildName);
-        toDst = srcChild;
-      } else {
-        withCount.getReferredINode().setLocalName(dstChildName);
-        int dstSnapshotId = dstIIP.getLatestSnapshotId();
-        final INodeReference.DstReference ref = new INodeReference.DstReference(
-            dstIIP.getINode(-2).asDirectory(), withCount, dstSnapshotId);
-        toDst = ref;
-      }
 
       // add src as dst to complete rename
-      if (addLastINodeNoQuotaCheck(dstIIP, toDst)) {
+      if (tx.addSourceToDestination()) {
         undoRemoveSrc = false;
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug(
@@ -994,24 +641,21 @@ public class FSDirectory implements Clos
               + " is renamed to " + dst);
         }
 
-        final INode srcParent = srcIIP.getINode(-2);
-        srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshotId());
-        dstParent = dstIIP.getINode(-2);
-        dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshotId());
-        // update moved lease with new filename
-        getFSNamesystem().unprotectedChangeLease(src, dst);
+        tx.updateMtimeAndLease(timestamp);
 
         // Collect the blocks and remove the lease for previous dst
         long filesDeleted = -1;
         if (removedDst != null) {
           undoRemoveDst = false;
-          BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-          List<INode> removedINodes = new ChunkedArrayList<INode>();
-          filesDeleted = removedDst.cleanSubtree(Snapshot.CURRENT_STATE_ID,
-              dstIIP.getLatestSnapshotId(), collectedBlocks, removedINodes, true)
-              .get(Quota.NAMESPACE);
-          getFSNamesystem().removePathAndBlocks(src, collectedBlocks,
-              removedINodes);
+          if (removedNum > 0) {
+            BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
+            List<INode> removedINodes = new ChunkedArrayList<INode>();
+            filesDeleted = removedDst.cleanSubtree(Snapshot.CURRENT_STATE_ID,
+                dstIIP.getLatestSnapshotId(), collectedBlocks, removedINodes,
+                true).get(Quota.NAMESPACE);
+            getFSNamesystem().removePathAndBlocks(src, collectedBlocks,
+                removedINodes, false);
+          }
         }
 
         if (snapshottableDirs.size() > 0) {
@@ -1019,49 +663,15 @@ public class FSDirectory implements Clos
           // deleted. Need to update the SnapshotManager.
           namesystem.removeSnapshottableDirs(snapshottableDirs);
         }
-        
-        // update the quota usage in src tree
-        if (isSrcInSnapshot) {
-          // get the counts after rename
-          Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
-              Quota.Counts.newInstance(), false);
-          newSrcCounts.subtract(oldSrcCounts);
-          srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
-              newSrcCounts.get(Quota.DISKSPACE), false);
-        }
-        
+
+        tx.updateQuotasInSourceTree();
         return filesDeleted >= 0;
       }
     } finally {
       if (undoRemoveSrc) {
-        // Rename failed - restore src
-        final INodeDirectory srcParent = srcIIP.getINode(-2).asDirectory();
-        final INode oldSrcChild = srcChild;
-        // put it back
-        if (withCount == null) {
-          srcChild.setLocalName(srcChildName);
-        } else if (!srcChildIsReference) { // src must be in snapshot
-          // the withCount node will no longer be used thus no need to update
-          // its reference number here
-          final INode originalChild = withCount.getReferredINode();
-          srcChild = originalChild;
-          srcChild.setLocalName(srcChildName);
-        } else {
-          withCount.removeReference(oldSrcChild.asReference());
-          final INodeReference originalRef = new INodeReference.DstReference(
-              srcParent, withCount, srcRefDstSnapshot);
-          srcChild = originalRef;
-          withCount.getReferredINode().setLocalName(srcChildName);
-        }
-        
-        if (srcParent.isWithSnapshot()) {
-          srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild);
-        } else {
-          // srcParent is not an INodeDirectoryWithSnapshot, we only need to add
-          // the srcChild back
-          addLastINodeNoQuotaCheck(srcIIP, srcChild);
-        }
+        tx.restoreSource();
       }
+
       if (undoRemoveDst) {
         // Rename failed - restore dst
         if (dstParent.isDirectory() && dstParent.asDirectory().isWithSnapshot()) {
@@ -1082,7 +692,198 @@ public class FSDirectory implements Clos
         + "failed to rename " + src + " to " + dst);
     throw new IOException("rename from " + src + " to " + dst + " failed.");
   }
-  
+
+  private static void validateRenameOverwrite(String src, String dst,
+                                              boolean overwrite,
+                                              INode srcInode, INode dstInode)
+          throws IOException {
+    String error;// It's OK to rename a file to a symlink and vice versa
+    if (dstInode.isDirectory() != srcInode.isDirectory()) {
+      error = "Source " + src + " and destination " + dst
+          + " must both be directories";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+    if (!overwrite) { // If destination exists, overwrite flag must be true
+      error = "rename destination " + dst + " already exists";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new FileAlreadyExistsException(error);
+    }
+    if (dstInode.isDirectory()) {
+      final ReadOnlyList<INode> children = dstInode.asDirectory()
+          .getChildrenList(Snapshot.CURRENT_STATE_ID);
+      if (!children.isEmpty()) {
+        error = "rename destination directory is not empty: " + dst;
+        NameNode.stateChangeLog.warn(
+            "DIR* FSDirectory.unprotectedRenameTo: " + error);
+        throw new IOException(error);
+      }
+    }
+  }
+
+  private static void validateRenameDestination(String src, String dst, INode srcInode)
+          throws IOException {
+    String error;
+    if (srcInode.isSymlink() &&
+        dst.equals(srcInode.asSymlink().getSymlinkString())) {
+      throw new FileAlreadyExistsException(
+          "Cannot rename symlink "+src+" to its target "+dst);
+    }
+    // dst cannot be a directory or a file under src
+    if (dst.startsWith(src) &&
+        dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
+      error = "Rename destination " + dst
+          + " is a directory or file under source " + src;
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+  }
+
+  private static void validateRenameSource(String src, INodesInPath srcIIP)
+          throws IOException {
+    String error;
+    final INode srcInode = srcIIP.getLastINode();
+    // validate source
+    if (srcInode == null) {
+      error = "rename source " + src + " is not found.";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new FileNotFoundException(error);
+    }
+    if (srcIIP.getINodes().length == 1) {
+      error = "rename source cannot be the root";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+    // srcInode and its subtree cannot contain snapshottable directories with
+    // snapshots
+    checkSnapshot(srcInode, null);
+  }
+
+  private class RenameOperation {
+    private final INodesInPath srcIIP;
+    private final INodesInPath dstIIP;
+    private final String src;
+    private final String dst;
+
+    private INode srcChild;
+    private final INodeReference.WithCount withCount;
+    private final int srcRefDstSnapshot;
+    private final INodeDirectory srcParent;
+    private final byte[] srcChildName;
+    private final boolean isSrcInSnapshot;
+    private final boolean srcChildIsReference;
+    private final Quota.Counts oldSrcCounts;
+
+    private RenameOperation(String src, String dst, INodesInPath srcIIP, INodesInPath dstIIP)
+            throws QuotaExceededException {
+      this.srcIIP = srcIIP;
+      this.dstIIP = dstIIP;
+      this.src = src;
+      this.dst = dst;
+      srcChild = srcIIP.getLastINode();
+      srcChildName = srcChild.getLocalNameBytes();
+      isSrcInSnapshot = srcChild.isInLatestSnapshot(
+              srcIIP.getLatestSnapshotId());
+      srcChildIsReference = srcChild.isReference();
+      srcParent = srcIIP.getINode(-2).asDirectory();
+
+      // Record the snapshot on srcChild. After the rename, before any new
+      // snapshot is taken on the dst tree, changes will be recorded in the latest
+      // snapshot of the src tree.
+      if (isSrcInSnapshot) {
+        srcChild.recordModification(srcIIP.getLatestSnapshotId());
+      }
+
+      // check srcChild for reference
+      srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference()
+              .getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
+      oldSrcCounts = Quota.Counts.newInstance();
+      if (isSrcInSnapshot) {
+        final INodeReference.WithName withName = srcIIP.getINode(-2).asDirectory()
+                .replaceChild4ReferenceWithName(srcChild, srcIIP.getLatestSnapshotId());
+        withCount = (INodeReference.WithCount) withName.getReferredINode();
+        srcChild = withName;
+        srcIIP.setLastINode(srcChild);
+        // get the counts before rename
+        withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
+      } else if (srcChildIsReference) {
+        // srcChild is reference but srcChild is not in latest snapshot
+        withCount = (WithCount) srcChild.asReference().getReferredINode();
+      } else {
+        withCount = null;
+      }
+    }
+
+    boolean addSourceToDestination() {
+      final INode dstParent = dstIIP.getINode(-2);
+      srcChild = srcIIP.getLastINode();
+      final byte[] dstChildName = dstIIP.getLastLocalName();
+      final INode toDst;
+      if (withCount == null) {
+        srcChild.setLocalName(dstChildName);
+        toDst = srcChild;
+      } else {
+        withCount.getReferredINode().setLocalName(dstChildName);
+        int dstSnapshotId = dstIIP.getLatestSnapshotId();
+        toDst = new INodeReference.DstReference(
+                dstParent.asDirectory(), withCount, dstSnapshotId);
+      }
+      return addLastINodeNoQuotaCheck(dstIIP, toDst);
+    }
+
+    void updateMtimeAndLease(long timestamp) throws QuotaExceededException {
+      srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshotId());
+      final INode dstParent = dstIIP.getINode(-2);
+      dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshotId());
+      // update moved lease with new filename
+      getFSNamesystem().unprotectedChangeLease(src, dst);
+    }
+
+    void restoreSource() throws QuotaExceededException {
+      // Rename failed - restore src
+      final INode oldSrcChild = srcChild;
+      // put it back
+      if (withCount == null) {
+        srcChild.setLocalName(srcChildName);
+      } else if (!srcChildIsReference) { // src must be in snapshot
+        // the withCount node will no longer be used thus no need to update
+        // its reference number here
+        srcChild = withCount.getReferredINode();
+        srcChild.setLocalName(srcChildName);
+      } else {
+        withCount.removeReference(oldSrcChild.asReference());
+        srcChild = new INodeReference.DstReference(
+                srcParent, withCount, srcRefDstSnapshot);
+        withCount.getReferredINode().setLocalName(srcChildName);
+      }
+
+      if (isSrcInSnapshot) {
+        srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild);
+      } else {
+        // srcParent is not an INodeDirectoryWithSnapshot, we only need to add
+        // the srcChild back
+        addLastINodeNoQuotaCheck(srcIIP, srcChild);
+      }
+    }
+
+    void updateQuotasInSourceTree() throws QuotaExceededException {
+      // update the quota usage in src tree
+      if (isSrcInSnapshot) {
+        // get the counts after rename
+        Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
+                Quota.Counts.newInstance(), false);
+        newSrcCounts.subtract(oldSrcCounts);
+        srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
+                newSrcCounts.get(Quota.DISKSPACE), false);
+      }
+    }
+  }
+
   /**
    * Set file replication
    * 
@@ -1096,14 +897,9 @@ public class FSDirectory implements Clos
   Block[] setReplication(String src, short replication, short[] blockRepls)
       throws QuotaExceededException, UnresolvedLinkException,
       SnapshotAccessControlException {
-    waitForReady();
     writeLock();
     try {
-      final Block[] fileBlocks = unprotectedSetReplication(
-          src, replication, blockRepls);
-      if (fileBlocks != null)  // log replication change
-        fsImage.getEditLog().logSetReplication(src, replication);
-      return fileBlocks;
+      return unprotectedSetReplication(src, replication, blockRepls);
     } finally {
       writeUnlock();
     }
@@ -1114,7 +910,7 @@ public class FSDirectory implements Clos
       UnresolvedLinkException, SnapshotAccessControlException {
     assert hasWriteLock();
 
-    final INodesInPath iip = rootDir.getINodesInPath4Write(src, true);
+    final INodesInPath iip = getINodesInPath4Write(src, true);
     final INode inode = iip.getLastINode();
     if (inode == null || !inode.isFile()) {
       return null;
@@ -1130,8 +926,7 @@ public class FSDirectory implements Clos
       updateCount(iip, 0, dsDelta, true);
     }
 
-    file = file.setFileReplication(replication, iip.getLatestSnapshotId(),
-        inodeMap);
+    file.setFileReplication(replication, iip.getLatestSnapshotId());
     
     final short newBR = file.getBlockReplication(); 
     // check newBR < oldBR case. 
@@ -1155,27 +950,13 @@ public class FSDirectory implements Clos
       FileNotFoundException, IOException {
     readLock();
     try {
-      return INodeFile.valueOf(rootDir.getNode(path, false), path
+      return INodeFile.valueOf(getNode(path, false), path
           ).getPreferredBlockSize();
     } finally {
       readUnlock();
     }
   }
 
-  boolean exists(String src) throws UnresolvedLinkException {
-    src = normalizePath(src);
-    readLock();
-    try {
-      INode inode = rootDir.getNode(src, false);
-      if (inode == null) {
-         return false;
-      }
-      return !inode.isFile() || inode.asFile().getBlocks() != null;
-    } finally {
-      readUnlock();
-    }
-  }
-  
   void setPermission(String src, FsPermission permission)
       throws FileNotFoundException, UnresolvedLinkException,
       QuotaExceededException, SnapshotAccessControlException {
@@ -1185,14 +966,13 @@ public class FSDirectory implements Clos
     } finally {
       writeUnlock();
     }
-    fsImage.getEditLog().logSetPermissions(src, permission);
   }
   
   void unprotectedSetPermission(String src, FsPermission permissions)
       throws FileNotFoundException, UnresolvedLinkException,
       QuotaExceededException, SnapshotAccessControlException {
     assert hasWriteLock();
-    final INodesInPath inodesInPath = rootDir.getINodesInPath4Write(src, true);
+    final INodesInPath inodesInPath = getINodesInPath4Write(src, true);
     final INode inode = inodesInPath.getLastINode();
     if (inode == null) {
       throw new FileNotFoundException("File does not exist: " + src);
@@ -1210,14 +990,13 @@ public class FSDirectory implements Clos
     } finally {
       writeUnlock();
     }
-    fsImage.getEditLog().logSetOwner(src, username, groupname);
   }
 
   void unprotectedSetOwner(String src, String username, String groupname)
       throws FileNotFoundException, UnresolvedLinkException,
       QuotaExceededException, SnapshotAccessControlException {
     assert hasWriteLock();
-    final INodesInPath inodesInPath = rootDir.getINodesInPath4Write(src, true);
+    final INodesInPath inodesInPath = getINodesInPath4Write(src, true);
     INode inode = inodesInPath.getLastINode();
     if (inode == null) {
       throw new FileNotFoundException("File does not exist: " + src);
@@ -1233,18 +1012,13 @@ public class FSDirectory implements Clos
   /**
    * Concat all the blocks from srcs to trg and delete the srcs files
    */
-  void concat(String target, String [] srcs, boolean supportRetryCache) 
+  void concat(String target, String[] srcs, long timestamp)
       throws UnresolvedLinkException, QuotaExceededException,
       SnapshotAccessControlException, SnapshotException {
     writeLock();
     try {
       // actual move
-      waitForReady();
-      long timestamp = now();
       unprotectedConcat(target, srcs, timestamp);
-      // do the commit
-      fsImage.getEditLog().logConcat(target, srcs, timestamp, 
-          supportRetryCache);
     } finally {
       writeUnlock();
     }
@@ -1264,7 +1038,7 @@ public class FSDirectory implements Clos
     }
     // do the move
     
-    final INodesInPath trgIIP = rootDir.getINodesInPath4Write(target, true);
+    final INodesInPath trgIIP = getINodesInPath4Write(target, true);
     final INode[] trgINodes = trgIIP.getINodes();
     final INodeFile trgInode = trgIIP.getLastINode().asFile();
     INodeDirectory trgParent = trgINodes[trgINodes.length-2].asDirectory();
@@ -1304,9 +1078,6 @@ public class FSDirectory implements Clos
       count++;
     }
     
-    // update inodeMap
-    removeFromInodeMap(Arrays.asList(allSrcInodes));
-    
     trgInode.setModificationTime(timestamp, trgLatestSnapshot);
     trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
     // update quota on the parent directory ('count' files removed, 0 space)
@@ -1319,43 +1090,31 @@ public class FSDirectory implements Clos
    * @param src Path of a directory to delete
    * @param collectedBlocks Blocks under the deleted directory
    * @param removedINodes INodes that should be removed from {@link #inodeMap}
-   * @param logRetryCache Whether to record RPC IDs in editlog to support retry
-   *                      cache rebuilding.
-   * @return true on successful deletion; else false
+   * @return the number of files that have been removed
    */
-  boolean delete(String src, BlocksMapUpdateInfo collectedBlocks,
-      List<INode> removedINodes, boolean logRetryCache) throws IOException {
+  long delete(String src, BlocksMapUpdateInfo collectedBlocks,
+              List<INode> removedINodes, long mtime) throws IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
     }
-    waitForReady();
-    long now = now();
     final long filesRemoved;
     writeLock();
     try {
-      final INodesInPath inodesInPath = rootDir.getINodesInPath4Write(
+      final INodesInPath inodesInPath = getINodesInPath4Write(
           normalizePath(src), false);
       if (!deleteAllowed(inodesInPath, src) ) {
         filesRemoved = -1;
       } else {
-        List<INodeDirectorySnapshottable> snapshottableDirs = 
-            new ArrayList<INodeDirectorySnapshottable>();
+        List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
         checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
         filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks,
-            removedINodes, now);
+            removedINodes, mtime);
         namesystem.removeSnapshottableDirs(snapshottableDirs);
       }
     } finally {
       writeUnlock();
     }
-    if (filesRemoved < 0) {
-      return false;
-    }
-    fsImage.getEditLog().logDelete(src, now, logRetryCache);
-    incrDeletedFileCount(filesRemoved);
-    // Blocks/INodes will be handled later by the caller of this method
-    getFSNamesystem().removePathAndBlocks(src, null, null);
-    return true;
+    return filesRemoved;
   }
   
   private static boolean deleteAllowed(final INodesInPath iip,
@@ -1383,7 +1142,7 @@ public class FSDirectory implements Clos
   boolean isNonEmptyDirectory(String path) throws UnresolvedLinkException {
     readLock();
     try {
-      final INodesInPath inodesInPath = rootDir.getLastINodeInPath(path, false);
+      final INodesInPath inodesInPath = getLastINodeInPath(path, false);
       final INode inode = inodesInPath.getINode(0);
       if (inode == null || !inode.isDirectory()) {
         //not found or not a directory
@@ -1412,12 +1171,11 @@ public class FSDirectory implements Clos
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     List<INode> removedINodes = new ChunkedArrayList<INode>();
 
-    final INodesInPath inodesInPath = rootDir.getINodesInPath4Write(
+    final INodesInPath inodesInPath = getINodesInPath4Write(
         normalizePath(src), false);
     long filesRemoved = -1;
     if (deleteAllowed(inodesInPath, src)) {
-      List<INodeDirectorySnapshottable> snapshottableDirs = 
-          new ArrayList<INodeDirectorySnapshottable>();
+      List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
       checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
       filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks,
           removedINodes, mtime);
@@ -1426,7 +1184,7 @@ public class FSDirectory implements Clos
 
     if (filesRemoved >= 0) {
       getFSNamesystem().removePathAndBlocks(src, collectedBlocks, 
-          removedINodes);
+          removedINodes, false);
     }
   }
   
@@ -1451,8 +1209,7 @@ public class FSDirectory implements Clos
 
     // record modification
     final int latestSnapshot = iip.getLatestSnapshotId();
-    targetNode = targetNode.recordModification(latestSnapshot);
-    iip.setLastINode(targetNode);
+    targetNode.recordModification(latestSnapshot);
 
     // Remove the node from the namespace
     long removed = removeLastINode(iip);
@@ -1493,19 +1250,20 @@ public class FSDirectory implements Clos
    *                          but do not have snapshots yet
    */
   private static void checkSnapshot(INode target,
-      List<INodeDirectorySnapshottable> snapshottableDirs) throws IOException {
+      List<INodeDirectory> snapshottableDirs) throws SnapshotException {
     if (target.isDirectory()) {
       INodeDirectory targetDir = target.asDirectory();
-      if (targetDir.isSnapshottable()) {
-        INodeDirectorySnapshottable ssTargetDir = 
-            (INodeDirectorySnapshottable) targetDir;
-        if (ssTargetDir.getNumSnapshots() > 0) {
-          throw new IOException("The directory " + ssTargetDir.getFullPathName()
-              + " cannot be deleted since " + ssTargetDir.getFullPathName()
+      DirectorySnapshottableFeature sf = targetDir
+          .getDirectorySnapshottableFeature();
+      if (sf != null) {
+        if (sf.getNumSnapshots() > 0) {
+          String fullPath = targetDir.getFullPathName();
+          throw new SnapshotException("The directory " + fullPath
+              + " cannot be deleted since " + fullPath
               + " is snapshottable and already has snapshots");
         } else {
           if (snapshottableDirs != null) {
-            snapshottableDirs.add(ssTargetDir);
+            snapshottableDirs.add(targetDir);
           }
         }
       } 
@@ -1537,7 +1295,7 @@ public class FSDirectory implements Clos
       if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
         return getSnapshotsListing(srcs, startAfter);
       }
-      final INodesInPath inodesInPath = rootDir.getLastINodeInPath(srcs, true);
+      final INodesInPath inodesInPath = getLastINodeInPath(srcs, true);
       final int snapshot = inodesInPath.getPathSnapshotId();
       final INode targetNode = inodesInPath.getINode(0);
       if (targetNode == null)
@@ -1590,16 +1348,20 @@ public class FSDirectory implements Clos
       throws UnresolvedLinkException, IOException {
     Preconditions.checkState(hasReadLock());
     Preconditions.checkArgument(
-        src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR), 
+        src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR),
         "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
-    
+
     final String dirPath = normalizePath(src.substring(0,
         src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
     
     final INode node = this.getINode(dirPath);
-    final INodeDirectorySnapshottable dirNode = INodeDirectorySnapshottable
-        .valueOf(node, dirPath);
-    final ReadOnlyList<Snapshot> snapshots = dirNode.getSnapshotList();
+    final INodeDirectory dirNode = INodeDirectory.valueOf(node, dirPath);
+    final DirectorySnapshottableFeature sf = dirNode.getDirectorySnapshottableFeature();
+    if (sf == null) {
+      throw new SnapshotException(
+          "Directory is not a snapshottable directory: " + dirPath);
+    }
+    final ReadOnlyList<Snapshot> snapshots = sf.getSnapshotList();
     int skipSize = ReadOnlyList.Util.binarySearch(snapshots, startAfter);
     skipSize = skipSize < 0 ? -skipSize - 1 : skipSize + 1;
     int numOfListing = Math.min(snapshots.size() - skipSize, this.lsLimit);
@@ -1627,7 +1389,7 @@ public class FSDirectory implements Clos
       if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
         return getFileInfo4DotSnapshot(srcs);
       }
-      final INodesInPath inodesInPath = rootDir.getLastINodeInPath(srcs, resolveLink);
+      final INodesInPath inodesInPath = getLastINodeInPath(srcs, resolveLink);
       final INode i = inodesInPath.getINode(0);
       return i == null? null: createFileStatus(HdfsFileStatus.EMPTY_NAME, i,
           inodesInPath.getPathSnapshotId());
@@ -1654,36 +1416,20 @@ public class FSDirectory implements Clos
 
   private INode getINode4DotSnapshot(String src) throws UnresolvedLinkException {
     Preconditions.checkArgument(
-        src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR), 
+        src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR),
         "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
     
     final String dirPath = normalizePath(src.substring(0,
         src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
     
     final INode node = this.getINode(dirPath);
-    if (node != null
-        && node.isDirectory()
-        && node.asDirectory() instanceof INodeDirectorySnapshottable) {
+    if (node != null && node.isDirectory()
+        && node.asDirectory().isSnapshottable()) {
       return node;
     }
     return null;
   }
 
-  /**
-   * Get the blocks associated with the file.
-   */
-  Block[] getFileBlocks(String src) throws UnresolvedLinkException {
-    waitForReady();
-    readLock();
-    try {
-      final INode i = rootDir.getNode(src, false);
-      return i != null && i.isFile()? i.asFile().getBlocks(): null;
-    } finally {
-      readUnlock();
-    }
-  }
-
-
   INodesInPath getExistingPathINodes(byte[][] components)
       throws UnresolvedLinkException {
     return INodesInPath.resolve(rootDir, components);
@@ -1703,12 +1449,12 @@ public class FSDirectory implements Clos
        throws UnresolvedLinkException {
     readLock();
     try {
-      return rootDir.getLastINodeInPath(src, true);
+      return getLastINodeInPath(src, true);
     } finally {
       readUnlock();
     }
   }
-  
+
   /**
    * Get {@link INode} associated with the file / directory.
    */
@@ -1716,7 +1462,7 @@ public class FSDirectory implements Clos
       ) throws UnresolvedLinkException, SnapshotAccessControlException {
     readLock();
     try {
-      return rootDir.getINodesInPath4Write(src, true);
+      return getINodesInPath4Write(src, true);
     } finally {
       readUnlock();
     }
@@ -1730,7 +1476,7 @@ public class FSDirectory implements Clos
       SnapshotAccessControlException {
     readLock();
     try {
-      return rootDir.getINode4Write(src, true);
+      return getINode4Write(src, true);
     } finally {
       readUnlock();
     }
@@ -1745,12 +1491,8 @@ public class FSDirectory implements Clos
     String srcs = normalizePath(src);
     readLock();
     try {
-      if (srcs.startsWith("/") && !srcs.endsWith("/")
-          && rootDir.getINode4Write(srcs, false) == null) {
-        return true;
-      } else {
-        return false;
-      }
+      return srcs.startsWith("/") && !srcs.endsWith("/")
+              && getINode4Write(srcs, false) == null;
     } finally {
       readUnlock();
     }
@@ -1763,7 +1505,7 @@ public class FSDirectory implements Clos
     src = normalizePath(src);
     readLock();
     try {
-      INode node = rootDir.getNode(src, false);
+      INode node = getNode(src, false);
       return node != null && node.isDirectory();
     } finally {
       readUnlock();
@@ -1779,7 +1521,7 @@ public class FSDirectory implements Clos
     src = normalizePath(src);
     readLock();
     try {
-      INode node = rootDir.getINode4Write(src, false);
+      INode node = getINode4Write(src, false);
       return node != null && node.isDirectory();
     } finally {
       readUnlock();
@@ -1800,7 +1542,7 @@ public class FSDirectory implements Clos
           UnresolvedLinkException, SnapshotAccessControlException {
     writeLock();
     try {
-      final INodesInPath iip = rootDir.getINodesInPath4Write(path, false);
+      final INodesInPath iip = getINodesInPath4Write(path, false);
       if (iip.getLastINode() == null) {
         throw new FileNotFoundException("Path not found: " + path);
       }
@@ -1828,7 +1570,7 @@ public class FSDirectory implements Clos
                            long nsDelta, long dsDelta, boolean checkQuota)
                            throws QuotaExceededException {
     assert hasWriteLock();
-    if (!ready) {
+    if (!namesystem.isImageLoaded()) {
       //still initializing. do not check or update quotas.
       return;
     }
@@ -1844,7 +1586,7 @@ public class FSDirectory implements Clos
   
   /** 
    * update quota of each inode and check to see if quota is exceeded. 
-   * See {@link #updateCount(INode[], int, long, long, boolean)}
+   * See {@link #updateCount(INodesInPath, long, long, boolean)}
    */ 
   private void updateCountNoQuotaCheck(INodesInPath inodesInPath,
       int numOfINodes, long nsDelta, long dsDelta) {
@@ -1921,113 +1663,6 @@ public class FSDirectory implements Clos
     // inodes can be null only when its called without holding lock
     return inodes == null ? "" : getFullPathName(inodes, inodes.length - 1);
   }
-  
-  /**
-   * Create a directory 
-   * If ancestor directories do not exist, automatically create them.
-
-   * @param src string representation of the path to the directory
-   * @param permissions the permission of the directory
-   * @param isAutocreate if the permission of the directory should inherit
-   *                          from its parent or not. u+wx is implicitly added to
-   *                          the automatically created directories, and to the
-   *                          given directory if inheritPermission is true
-   * @param now creation time
-   * @return true if the operation succeeds false otherwise
-   * @throws FileNotFoundException if an ancestor or itself is a file
-   * @throws QuotaExceededException if directory creation violates 
-   *                                any quota limit
-   * @throws UnresolvedLinkException if a symlink is encountered in src.                      
-   * @throws SnapshotAccessControlException if path is in RO snapshot
-   */
-  boolean mkdirs(String src, PermissionStatus permissions,
-      boolean inheritPermission, long now)
-      throws FileAlreadyExistsException, QuotaExceededException, 
-             UnresolvedLinkException, SnapshotAccessControlException,
-             AclException {
-    src = normalizePath(src);
-    String[] names = INode.getPathNames(src);
-    byte[][] components = INode.getPathComponents(names);
-    final int lastInodeIndex = components.length - 1;
-
-    writeLock();
-    try {
-      INodesInPath iip = getExistingPathINodes(components);
-      if (iip.isSnapshot()) {
-        throw new SnapshotAccessControlException(
-            "Modification on RO snapshot is disallowed");
-      }
-      INode[] inodes = iip.getINodes();
-
-      // find the index of the first null in inodes[]
-      StringBuilder pathbuilder = new StringBuilder();
-      int i = 1;
-      for(; i < inodes.length && inodes[i] != null; i++) {
-        pathbuilder.append(Path.SEPARATOR).append(names[i]);
-        if (!inodes[i].isDirectory()) {
-          throw new FileAlreadyExistsException("Parent path is not a directory: "
-              + pathbuilder+ " "+inodes[i].getLocalName());
-        }
-      }
-
-      // default to creating parent dirs with the given perms
-      PermissionStatus parentPermissions = permissions;
-
-      // if not inheriting and it's the last inode, there's no use in
-      // computing perms that won't be used
-      if (inheritPermission || (i < lastInodeIndex)) {
-        // if inheriting (ie. creating a file or symlink), use the parent dir,
-        // else the supplied permissions
-        // NOTE: the permissions of the auto-created directories violate posix
-        FsPermission parentFsPerm = inheritPermission
-            ? inodes[i-1].getFsPermission() : permissions.getPermission();
-        
-        // ensure that the permissions allow user write+execute
-        if (!parentFsPerm.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
-          parentFsPerm = new FsPermission(
-              parentFsPerm.getUserAction().or(FsAction.WRITE_EXECUTE),
-              parentFsPerm.getGroupAction(),
-              parentFsPerm.getOtherAction()
-          );
-        }
-        
-        if (!parentPermissions.getPermission().equals(parentFsPerm)) {
-          parentPermissions = new PermissionStatus(
-              parentPermissions.getUserName(),
-              parentPermissions.getGroupName(),
-              parentFsPerm
-          );
-          // when inheriting, use same perms for entire path
-          if (inheritPermission) permissions = parentPermissions;
-        }
-      }
-      
-      // create directories beginning from the first null index
-      for(; i < inodes.length; i++) {
-        pathbuilder.append(Path.SEPARATOR + names[i]);
-        unprotectedMkdir(namesystem.allocateNewInodeId(), iip, i,
-            components[i], (i < lastInodeIndex) ? parentPermissions
-                : permissions, null, now);
-        if (inodes[i] == null) {
-          return false;
-        }
-        // Directory creation also count towards FilesCreated
-        // to match count of FilesDeleted metric.
-        if (getFSNamesystem() != null)
-          NameNode.getNameNodeMetrics().incrFilesCreated();
-
-        final String cur = pathbuilder.toString();
-        fsImage.getEditLog().logMkDir(cur, inodes[i]);
-        if(NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug(
-              "DIR* FSDirectory.mkdirs: created directory " + cur);
-        }
-      }
-    } finally {
-      writeUnlock();
-    }
-    return true;
-  }
 
   INode unprotectedMkdir(long inodeId, String src, PermissionStatus permissions,
                           List<AclEntry> aclEntries, long timestamp)
@@ -2046,7 +1681,7 @@ public class FSDirectory implements Clos
    * The parent path to the directory is at [0, pos-1].
    * All ancestors exist. Newly created one stored at index pos.
    */
-  private void unprotectedMkdir(long inodeId, INodesInPath inodesInPath,
+  void unprotectedMkdir(long inodeId, INodesInPath inodesInPath,
       int pos, byte[] name, PermissionStatus permission,
       List<AclEntry> aclEntries, long timestamp)
       throws QuotaExceededException, AclException {
@@ -2064,7 +1699,7 @@ public class FSDirectory implements Clos
   /**
    * Add the given child to the namespace.
    * @param src The full path name of the child node.
-   * @throw QuotaExceededException is thrown if it violates quota limit
+   * @throws QuotaExceededException is thrown if it violates quota limit
    */
   private boolean addINode(String src, INode child
       ) throws QuotaExceededException, UnresolvedLinkException {
@@ -2128,12 +1763,12 @@ public class FSDirectory implements Clos
    */
   private void verifyQuotaForRename(INode[] src, INode[] dst)
       throws QuotaExceededException {
-    if (!ready || skipQuotaCheck) {
+    if (!namesystem.isImageLoaded() || skipQuotaCheck) {
       // Do not check quota if edits log is still being processed
       return;
     }
     int i = 0;
-    for(; src[i] == dst[i]; i++);
+    while(src[i] == dst[i]) { i++; }
     // src[i - 1] is the last common ancestor.
 
     final Quota.Counts delta = src[src.length - 1].computeQuotaUsage();
@@ -2184,7 +1819,7 @@ public class FSDirectory implements Clos
   void verifyINodeName(byte[] childName) throws HadoopIllegalArgumentException {
     if (Arrays.equals(HdfsConstants.DOT_SNAPSHOT_DIR_BYTES, childName)) {
       String s = "\"" + HdfsConstants.DOT_SNAPSHOT_DIR + "\" is a reserved name.";
-      if (!ready) {
+      if (!namesystem.isImageLoaded()) {
         s += "  Please rename it before upgrade.";
       }
       throw new HadoopIllegalArgumentException(s);
@@ -2211,7 +1846,7 @@ public class FSDirectory implements Clos
           getFullPathName((INode[])parentPath, pos - 1): (String)parentPath;
       final PathComponentTooLongException e = new PathComponentTooLongException(
           maxComponentLength, length, p, DFSUtil.bytes2String(childName));
-      if (ready) {
+      if (namesystem.isImageLoaded()) {
         throw e;
       } else {
         // Do not throw if edits log is still being processed
@@ -2235,7 +1870,7 @@ public class FSDirectory implements Clos
     if (count >= maxDirItems) {
       final MaxDirectoryItemsExceededException e
           = new MaxDirectoryItemsExceededException(maxDirItems, count);
-      if (ready) {
+      if (namesystem.isImageLoaded()) {
         e.setPathName(getFullPathName(pathComponents, pos - 1));
         throw e;
       } else {
@@ -2260,7 +1895,7 @@ public class FSDirectory implements Clos
    * Its ancestors are stored at [0, pos-1].
    * @return false if the child with this name already exists; 
    *         otherwise return true;
-   * @throw QuotaExceededException is thrown if it violates quota limit
+   * @throws QuotaExceededException is thrown if it violates quota limit
    */
   private boolean addChild(INodesInPath iip, int pos,
       INode child, boolean checkQuota) throws QuotaExceededException {
@@ -2294,7 +1929,7 @@ public class FSDirectory implements Clos
         counts.get(Quota.NAMESPACE), counts.get(Quota.DISKSPACE), checkQuota);
     boolean isRename = (child.getParent() != null);
     final INodeDirectory parent = inodes[pos-1].asDirectory();
-    boolean added = false;
+    boolean added;
     try {
       added = parent.addChild(child, true, iip.getLatestSnapshotId());
     } catch (QuotaExceededException e) {
@@ -2340,10 +1975,6 @@ public class FSDirectory implements Clos
     if (!parent.removeChild(last, latestSnapshot)) {
       return -1;
     }
-    INodeDirectory newParent = last.getParent();
-    if (parent != newParent) {
-      iip.setINode(-2, newParent);
-    }
     
     if (!last.isInLatestSnapshot(latestSnapshot)) {
       final Quota.Counts counts = last.computeQuotaUsage();
@@ -2358,10 +1989,8 @@ public class FSDirectory implements Clos
     }
     return 1;
   }
-  
-  /**
-   */
-  String normalizePath(String src) {
+
+  static String normalizePath(String src) {
     if (src.length() > 1 && src.endsWith("/")) {
       src = src.substring(0, src.length() - 1);
     }
@@ -2373,7 +2002,7 @@ public class FSDirectory implements Clos
     String srcs = normalizePath(src);
     readLock();
     try {
-      INode targetNode = rootDir.getNode(srcs, false);
+      INode targetNode = getNode(srcs, false);
       if (targetNode == null) {
         throw new FileNotFoundException("File does not exist: " + srcs);
       }
@@ -2446,7 +2075,7 @@ public class FSDirectory implements Clos
   /**
    * See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
    * Sets quota for for a directory.
-   * @returns INodeDirectory if any of the quotas have changed. null other wise.
+   * @return INodeDirectory if any of the quotas have changed. null otherwise.
    * @throws FileNotFoundException if the path does not exist.
    * @throws PathIsNotDirectoryException if the path is not a directory.
    * @throws QuotaExceededException if the directory tree size is 
@@ -2470,7 +2099,7 @@ public class FSDirectory implements Clos
     }
     
     String srcs = normalizePath(src);
-    final INodesInPath iip = rootDir.getINodesInPath4Write(srcs, true);
+    final INodesInPath iip = getINodesInPath4Write(srcs, true);
     INodeDirectory dirNode = INodeDirectory.valueOf(iip.getLastINode(), srcs);
     if (dirNode.isRoot() && nsQuota == HdfsConstants.QUOTA_RESET) {
       throw new IllegalArgumentException("Cannot clear namespace quota on root.");
@@ -2489,7 +2118,7 @@ public class FSDirectory implements Clos
       }
 
       final int latest = iip.getLatestSnapshotId();
-      dirNode = dirNode.recordModification(latest);
+      dirNode.recordModification(latest);
       dirNode.setQuota(nsQuota, dsQuota);
       return dirNode;
     }
@@ -2497,21 +2126,17 @@ public class FSDirectory implements Clos
   
   /**
    * See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
+   * @return INodeDirectory if any of the quotas have changed. null otherwise.
    * @throws SnapshotAccessControlException if path is in RO snapshot
    * @see #unprotectedSetQuota(String, long, long)
    */
-  void setQuota(String src, long nsQuota, long dsQuota) 
+  INodeDirectory setQuota(String src, long nsQuota, long dsQuota)
       throws FileNotFoundException, PathIsNotDirectoryException,
       QuotaExceededException, UnresolvedLinkException,
       SnapshotAccessControlException {
     writeLock();
     try {
-      INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota);
-      if (dir != null) {
-        final Quota.Counts q = dir.getQuotaCounts();
-        fsImage.getEditLog().logSetQuota(src,
-            q.get(Quota.NAMESPACE), q.get(Quota.DISKSPACE));
-      }
+      return unprotectedSetQuota(src, nsQuota, dsQuota);
     } finally {
       writeUnlock();
     }
@@ -2530,18 +2155,14 @@ public class FSDirectory implements Clos
   /**
    * Sets the access time on the file/directory. Logs it in the transaction log.
    */
-  void setTimes(String src, INode inode, long mtime, long atime, boolean force,
-      int latestSnapshotId) throws QuotaExceededException {
-    boolean status = false;
+  boolean setTimes(INode inode, long mtime, long atime, boolean force,
+                   int latestSnapshotId) throws QuotaExceededException {
     writeLock();
     try {
-      status = unprotectedSetTimes(inode, mtime, atime, force, latestSnapshotId);
+      return unprotectedSetTimes(inode, mtime, atime, force, latestSnapshotId);
     } finally {
       writeUnlock();
     }
-    if (status) {
-      fsImage.getEditLog().logTimes(src, mtime, atime);
-    }
   }
 
   boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force) 
@@ -2581,7 +2202,6 @@ public class FSDirectory implements Clos
   void reset() {
     writeLock();
     try {
-      setReady(false);
       rootDir = createRoot(getFSNamesystem());
       inodeMap.clear();
       addToInodeMap(rootDir);
@@ -2632,7 +2252,7 @@ public class FSDirectory implements Clos
         blocksize,
         node.getModificationTime(snapshot),
         node.getAccessTime(snapshot),
-        node.getFsPermission(snapshot),
+        getPermissionForFileStatus(node, snapshot),
         node.getUserName(snapshot),
         node.getGroupName(snapshot),
         node.isSymlink() ? node.asSymlink().getSymlink() : null,
@@ -2658,7 +2278,7 @@ public class FSDirectory implements Clos
       blocksize = fileNode.getPreferredBlockSize();
 
       final boolean inSnapshot = snapshot != Snapshot.CURRENT_STATE_ID; 
-      final boolean isUc = inSnapshot ? false : fileNode.isUnderConstruction();
+      final boolean isUc = !inSnapshot && fileNode.isUnderConstruction();
       final long fileSize = !inSnapshot && isUc ? 
           fileNode.computeFileSizeNotIncludingLastUcBlock() : size;
       loc = getFSNamesystem().getBlockManager().createLocatedBlocks(
@@ -2674,7 +2294,8 @@ public class FSDirectory implements Clos
     HdfsLocatedFileStatus status =
         new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
           blocksize, node.getModificationTime(snapshot),
-          node.getAccessTime(snapshot), node.getFsPermission(snapshot),
+          node.getAccessTime(snapshot),
+          getPermissionForFileStatus(node, snapshot),
           node.getUserName(snapshot), node.getGroupName(snapshot),
           node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
           node.getId(), loc, childrenNum);
@@ -2688,49 +2309,37 @@ public class FSDirectory implements Clos
     return status;
   }
 
-    
   /**
-   * Add the given symbolic link to the fs. Record it in the edits log.
+   * Returns an inode's FsPermission for use in an outbound FileStatus.  If the
+   * inode has an ACL, then this method will convert to a FsAclPermission.
+   *
+   * @param node INode to check
+   * @param snapshot int snapshot ID
+   * @return FsPermission from inode, with ACL bit on if the inode has an ACL
    */
-  INodeSymlink addSymlink(String path, String target,
-      PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
-      throws UnresolvedLinkException, FileAlreadyExistsException,
-      QuotaExceededException, SnapshotAccessControlException, AclException {
-    waitForReady();
-
-    final long modTime = now();
-    if (createParent) {
-      final String parent = new Path(path).getParent().toString();
-      if (!mkdirs(parent, dirPerms, true, modTime)) {
-        return null;
-      }
+  private static FsPermission getPermissionForFileStatus(INode node,
+      int snapshot) {
+    FsPermission perm = node.getFsPermission(snapshot);
+    if (node.getAclFeature(snapshot) != null) {
+      perm = new FsAclPermission(perm);
     }
-    final String userName = dirPerms.getUserName();
-    INodeSymlink newNode  = null;
-    long id = namesystem.allocateNewInodeId();
+    return perm;
+  }
+
+  /**
+   * Add the specified path into the namespace.
+   */
+  INodeSymlink addSymlink(long id, String path, String target,
+                          long mtime, long atime, PermissionStatus perm)
+          throws UnresolvedLinkException, QuotaExceededException {
     writeLock();
     try {
-      newNode = unprotectedAddSymlink(id, path, target, modTime, modTime,
-          new PermissionStatus(userName, null, FsPermission.getDefault()));
+      return unprotectedAddSymlink(id, path, target, mtime, atime, perm);
     } finally {
       writeUnlock();
     }
-    if (newNode == null) {
-      NameNode.stateChangeLog.info("DIR* addSymlink: failed to add " + path);
-      return null;
-    }
-    fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode,
-        logRetryCache);
-    
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* addSymlink: " + path + " is added");
-    }
-    return newNode;
   }
 
-  /**
-   * Add the specified path into the namespace. Invoked from edit log processing.
-   */
   INodeSymlink unprotectedAddSymlink(long id, String path, String target,
       long mtime, long atime, PermissionStatus perm)
       throws UnresolvedLinkException, QuotaExceededException {
@@ -2740,11 +2349,10 @@ public class FSDirectory implements Clos
     return addINode(path, symlink) ? symlink : null;
   }
 
-  void modifyAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
+  List<AclEntry> modifyAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
     writeLock();
     try {
-      List<AclEntry> newAcl = unprotectedModifyAclEntries(src, aclSpec);
-      fsImage.getEditLog().logSetAcl(src, newAcl);
+      return unprotectedModifyAclEntries(src, aclSpec);
     } finally {
       writeUnlock();
     }
@@ -2753,7 +2361,7 @@ public class FSDirectory implements Clos
   private List<AclEntry> unprotectedModifyAclEntries(String src,
       List<AclEntry> aclSpec) throws IOException {
     assert hasWriteLock();
-    INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
+    INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
     INode inode = resolveLastINode(src, iip);
     int snapshotId = iip.getLatestSnapshotId();
     List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
@@ -2763,11 +2371,10 @@ public class FSDirectory implements Clos
     return newAcl;
   }
 
-  void removeAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
+  List<AclEntry> removeAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
     writeLock();
     try {
-      List<AclEntry> newAcl = unprotectedRemoveAclEntries(src, aclSpec);
-      fsImage.getEditLog().logSetAcl(src, newAcl);
+      return unprotectedRemoveAclEntries(src, aclSpec);
     } finally {
       writeUnlock();
     }
@@ -2776,7 +2383,7 @@ public class FSDirectory implements Clos
   private List<AclEntry> unprotectedRemoveAclEntries(String src,
       List<AclEntry> aclSpec) throws IOException {
     assert hasWriteLock();
-    INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
+    INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
     INode inode = resolveLastINode(src, iip);
     int snapshotId = iip.getLatestSnapshotId();
     List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
@@ -2786,11 +2393,10 @@ public class FSDirectory implements Clos
     return newAcl;
   }
 
-  void removeDefaultAcl(String src) throws IOException {
+  List<AclEntry> removeDefaultAcl(String src) throws IOException {
     writeLock();
     try {
-      List<AclEntry> newAcl = unprotectedRemoveDefaultAcl(src);
-      fsImage.getEditLog().logSetAcl(src, newAcl);
+      return unprotectedRemoveDefaultAcl(src);
     } finally {
       writeUnlock();
     }
@@ -2799,7 +2405,7 @@ public class FSDirectory implements Clos
   private List<AclEntry> unprotectedRemoveDefaultAcl(String src)
       throws IOException {
     assert hasWriteLock();
-    INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
+    INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
     INode inode = resolveLastINode(src, iip);
     int snapshotId = iip.getLatestSnapshotId();
     List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
@@ -2813,7 +2419,6 @@ public class FSDirectory implements Clos
     writeLock();
     try {
       unprotectedRemoveAcl(src);
-      fsImage.getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST);
     } finally {
       writeUnlock();
     }
@@ -2821,17 +2426,16 @@ public class FSDirectory implements Clos
 
   private void unprotectedRemoveAcl(String src) throws IOException {
     assert hasWriteLock();
-    INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
+    INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
     INode inode = resolveLastINode(src, iip);
     int snapshotId = iip.getLatestSnapshotId();
     AclStorage.removeINodeAcl(inode, snapshotId);
   }
 
-  void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
+  List<AclEntry> setAcl(String src, List<AclEntry> aclSpec) throws IOException {
     writeLock();
     try {
-      List<AclEntry> newAcl = unprotectedSetAcl(src, aclSpec);
-      fsImage.getEditLog().logSetAcl(src, newAcl);
+      return unprotectedSetAcl(src, aclSpec);
     } finally {
       writeUnlock();
     }
@@ -2846,7 +2450,7 @@ public class FSDirectory implements Clos
     }
 
     assert hasWriteLock();
-    INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
+    INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
     INode inode = resolveLastINode(src, iip);
     int snapshotId = iip.getLatestSnapshotId();
     List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
@@ -2866,7 +2470,7 @@ public class FSDirectory implements Clos
           getINode4DotSnapshot(srcs) != null) {
         return new AclStatus.Builder().owner("").group("").build();
       }
-      INodesInPath iip = rootDir.getLastINodeInPath(srcs, true);
+      INodesInPath iip = getLastINodeInPath(srcs, true);
       INode inode = resolveLastINode(src, iip);
       int snapshotId = iip.getPathSnapshotId();
       List<AclEntry> acl = AclStorage.readINodeAcl(inode, snapshotId);
@@ -2879,6 +2483,195 @@ public class FSDirectory implements Clos
     }
   }
 
+  /**
+   * Removes a list of XAttrs from an inode at a path.
+   *
+   * @param src path of inode
+   * @param toRemove XAttrs to be removed
+   * @return List of XAttrs that were removed
+   * @throws IOException if the inode does not exist, if quota is exceeded
+   */
+  List<XAttr> removeXAttrs(final String src, final List<XAttr> toRemove)
+      throws IOException {
+    writeLock();
+    try {
+      return unprotectedRemoveXAttrs(src, toRemove);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  List<XAttr> unprotectedRemoveXAttrs(final String src,
+      final List<XAttr> toRemove) throws IOException {
+    assert hasWriteLock();
+    INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
+    INode inode = resolveLastINode(src, iip);
+    int snapshotId = iip.getLatestSnapshotId();
+    List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
+    List<XAttr> removedXAttrs = Lists.newArrayListWithCapacity(toRemove.size());
+    List<XAttr> newXAttrs = filterINodeXAttrs(existingXAttrs, toRemove,
+        removedXAttrs);
+    if (existingXAttrs.size() != newXAttrs.size()) {
+      XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
+      return removedXAttrs;
+    }
+    return null;
+  }
+
+  /**
+   * Filter XAttrs from a list of existing XAttrs. Removes matched XAttrs from
+   * toFilter and puts them into filtered. Upon completion,
+   * toFilter contains the filter XAttrs that were not found, while
+   * fitleredXAttrs contains the XAttrs that were found.
+   *
+   * @param existingXAttrs Existing XAttrs to be filtered
+   * @param toFilter XAttrs to filter from the existing XAttrs
+   * @param filtered Return parameter, XAttrs that were filtered
+   * @return List of XAttrs that does not contain filtered XAttrs
+   */
+  @VisibleForTesting
+  List<XAttr> filterINodeXAttrs(final List<XAttr> existingXAttrs,
+      final List<XAttr> toFilter, final List<XAttr> filtered) {
+    if (existingXAttrs == null || existingXAttrs.isEmpty() ||
+        toFilter == null || toFilter.isEmpty()) {
+      return existingXAttrs;
+    }
+
+    // Populate a new list with XAttrs that pass the filter
+    List<XAttr> newXAttrs =
+        Lists.newArrayListWithCapacity(existingXAttrs.size());
+    for (XAttr a : existingXAttrs) {
+      boolean add = true;
+      for (ListIterator<XAttr> it = toFilter.listIterator(); it.hasNext()
+          ;) {
+        XAttr filter = it.next();
+        if (a.equalsIgnoreValue(filter)) {
+          add = false;
+          it.remove();
+          filtered.add(filter);
+          break;
+        }
+      }
+      if (add) {
+        newXAttrs.add(a);
+      }
+    }
+
+    return newXAttrs;
+  }
+  
+  void setXAttrs(final String src, final List<XAttr> xAttrs,
+      final EnumSet<XAttrSetFlag> flag) throws IOException {
+    writeLock();
+    try {
+      unprotectedSetXAttrs(src, xAttrs, flag);
+    } finally {
+      writeUnlock();
+    }
+  }
+  
+  void unprotectedSetXAttrs(final String src, final List<XAttr> xAttrs,
+      final EnumSet<XAttrSetFlag> flag)
+      throws QuotaExceededException, IOException {
+    assert hasWriteLock();
+    INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
+    INode inode = resolveLastINode(src, iip);
+    int snapshotId = iip.getLatestSnapshotId();
+    List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);

[... 212 lines stripped ...]