You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC

svn commit: r1619012 [18/35] - in /hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Aug 19 23:49:39 2014
@@ -62,6 +62,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
@@ -83,14 +85,13 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_KEY;
 import static org.apache.hadoop.util.Time.now;
 
 import java.io.BufferedWriter;
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
@@ -115,6 +116,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -141,7 +143,10 @@ import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -154,6 +159,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -175,8 +181,8 @@ import org.apache.hadoop.hdfs.protocol.Q
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
+import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
-import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -213,8 +219,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable.SnapshotDiffInfo;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
@@ -226,6 +230,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -446,7 +451,6 @@ public class FSNamesystem implements Nam
   NameNodeResourceChecker nnResourceChecker;
 
   private final FsServerDefaults serverDefaults;
-  private final boolean supportAppends;
   private final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
 
   private volatile SafeModeInfo safeMode;  // safe mode information
@@ -520,7 +524,65 @@ public class FSNamesystem implements Nam
   
   private final RetryCache retryCache;
 
-  private final AclConfigFlag aclConfigFlag;
+  private final NNConf nnConf;
+
+  private volatile boolean imageLoaded = false;
+  private final Condition cond;
+
+  private final FSImage fsImage;
+
+  private boolean randomizeBlockLocationsPerBlock;
+
+  /**
+   * Notify that loading of this FSDirectory is complete, and
+   * it is imageLoaded for use
+   */
+  void imageLoadComplete() {
+    Preconditions.checkState(!imageLoaded, "FSDirectory already loaded");
+    setImageLoaded();
+  }
+
+  void setImageLoaded() {
+    if(imageLoaded) return;
+    writeLock();
+    try {
+      setImageLoaded(true);
+      dir.markNameCacheInitialized();
+      cond.signalAll();
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  //This is for testing purposes only
+  @VisibleForTesting
+  boolean isImageLoaded() {
+    return imageLoaded;
+  }
+
+  // exposed for unit tests
+  protected void setImageLoaded(boolean flag) {
+    imageLoaded = flag;
+  }
+
+  /**
+   * Block until the object is imageLoaded to be used.
+   */
+  void waitForLoadingFSImage() {
+    if (!imageLoaded) {
+      writeLock();
+      try {
+        while (!imageLoaded) {
+          try {
+            cond.await(5000, TimeUnit.MILLISECONDS);
+          } catch (InterruptedException ignored) {
+          }
+        }
+      } finally {
+        writeUnlock();
+      }
+    }
+  }
 
   /**
    * Set the last allocated inode id when fsimage or editlog is loaded. 
@@ -563,6 +625,7 @@ public class FSNamesystem implements Nam
     inodeId.setCurrentValue(INodeId.LAST_RESERVED_ID);
     snapshotManager.clearSnapshottableDirs();
     cacheManager.clear();
+    setImageLoaded(false);
   }
 
   @VisibleForTesting
@@ -690,6 +753,8 @@ public class FSNamesystem implements Nam
     boolean fair = conf.getBoolean("dfs.namenode.fslock.fair", true);
     LOG.info("fsLock is fair:" + fair);
     fsLock = new FSNamesystemLock(fair);
+    cond = fsLock.writeLock().newCondition();
+    this.fsImage = fsImage;
     try {
       resourceRecheckInterval = conf.getLong(
           DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
@@ -754,8 +819,6 @@ public class FSNamesystem implements Nam
           DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT);
       this.accessTimePrecision = conf.getLong(DFS_NAMENODE_ACCESSTIME_PRECISION_KEY,
           DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT);
-      this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY, DFS_SUPPORT_APPEND_DEFAULT);
-      LOG.info("Append Enabled: " + supportAppends);
 
       this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
       
@@ -779,9 +842,13 @@ public class FSNamesystem implements Nam
       alwaysUseDelegationTokensForTests = conf.getBoolean(
           DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
           DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
+      
+      this.randomizeBlockLocationsPerBlock = conf.getBoolean(
+          DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK,
+          DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT);
 
       this.dtSecretManager = createDelegationTokenSecretManager(conf);
-      this.dir = new FSDirectory(fsImage, this, conf);
+      this.dir = new FSDirectory(this, conf);
       this.snapshotManager = new SnapshotManager(dir);
       this.cacheManager = new CacheManager(this, conf, blockManager);
       this.safeMode = new SafeModeInfo(conf);
@@ -789,7 +856,7 @@ public class FSNamesystem implements Nam
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
         auditLoggers.get(0) instanceof DefaultAuditLogger;
       this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
-      this.aclConfigFlag = new AclConfigFlag(conf);
+      this.nnConf = new NNConf(conf);
     } catch(IOException e) {
       LOG.error(getClass().getSimpleName() + " initialization failed.", e);
       close();
@@ -921,7 +988,8 @@ public class FSNamesystem implements Nam
       }
       // This will start a new log segment and write to the seen_txid file, so
       // we shouldn't do it when coming up in standby state
-      if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)) {
+      if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)
+          || (haEnabled && startOpt == StartupOption.UPGRADEONLY)) {
         fsImage.openEditLogForWrite();
       }
       success = true;
@@ -931,7 +999,7 @@ public class FSNamesystem implements Nam
       }
       writeUnlock();
     }
-    dir.imageLoadComplete();
+    imageLoadComplete();
   }
 
   private void startSecretManager() {
@@ -1009,7 +1077,7 @@ public class FSNamesystem implements Nam
     LOG.info("Starting services required for active state");
     writeLock();
     try {
-      FSEditLog editLog = dir.fsImage.getEditLog();
+      FSEditLog editLog = getFSImage().getEditLog();
       
       if (!editLog.isOpenForWrite()) {
         // During startup, we're already open for write during initialization.
@@ -1038,12 +1106,12 @@ public class FSNamesystem implements Nam
               metaSaveAsString());
         }
         
-        long nextTxId = dir.fsImage.getLastAppliedTxId() + 1;
+        long nextTxId = getFSImage().getLastAppliedTxId() + 1;
         LOG.info("Will take over writing edit logs at txnid " + 
             nextTxId);
         editLog.setNextTxId(nextTxId);
 
-        dir.fsImage.editLog.openForWrite();
+        getFSImage().editLog.openForWrite();
       }
 
       // Enable quota checks.
@@ -1109,9 +1177,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       stopSecretManager();
-      if (leaseManager != null) {
-        leaseManager.stopMonitor();
-      }
+      leaseManager.stopMonitor();
       if (nnrmthread != null) {
         ((NameNodeResourceMonitor) nnrmthread.getRunnable()).stopMonitor();
         nnrmthread.interrupt();
@@ -1120,16 +1186,18 @@ public class FSNamesystem implements Nam
         ((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop();
         nnEditLogRoller.interrupt();
       }
-      if (dir != null && dir.fsImage != null) {
-        if (dir.fsImage.editLog != null) {
-          dir.fsImage.editLog.close();
+      if (dir != null && getFSImage() != null) {
+        if (getFSImage().editLog != null) {
+          getFSImage().editLog.close();
         }
         // Update the fsimage with the last txid that we wrote
         // so that the tailer starts from the right spot.
-        dir.fsImage.updateLastAppliedTxIdFromWritten();
+        getFSImage().updateLastAppliedTxIdFromWritten();
+      }
+      if (cacheManager != null) {
+        cacheManager.stopMonitorThread();
+        cacheManager.clearDirectiveStats();
       }
-      cacheManager.stopMonitorThread();
-      cacheManager.clearDirectiveStats();
       blockManager.getDatanodeManager().clearPendingCachingCommands();
       blockManager.getDatanodeManager().setShouldSendCachingCommands(false);
       // Don't want to keep replication queues when not in Active.
@@ -1147,9 +1215,9 @@ public class FSNamesystem implements Nam
    */
   void startStandbyServices(final Configuration conf) throws IOException {
     LOG.info("Starting services required for standby state");
-    if (!dir.fsImage.editLog.isOpenForRead()) {
+    if (!getFSImage().editLog.isOpenForRead()) {
       // During startup, we're already open for read.
-      dir.fsImage.editLog.initSharedJournalsForRead();
+      getFSImage().editLog.initSharedJournalsForRead();
     }
     
     blockManager.setPostponeBlocksFromFuture(true);
@@ -1196,8 +1264,8 @@ public class FSNamesystem implements Nam
     if (editLogTailer != null) {
       editLogTailer.stop();
     }
-    if (dir != null && dir.fsImage != null && dir.fsImage.editLog != null) {
-      dir.fsImage.editLog.close();
+    if (dir != null && getFSImage() != null && getFSImage().editLog != null) {
+      getFSImage().editLog.close();
     }
   }
   
@@ -1353,7 +1421,7 @@ public class FSNamesystem implements Nam
   /**
    * Returns edit directories that are shared between primary and secondary.
    * @param conf configuration
-   * @return Collection of edit directories.
+   * @return collection of edit directories from {@code conf}
    */
   public static List<URI> getSharedEditsDirs(Configuration conf) {
     // don't use getStorageDirs here, because we want an empty default
@@ -1440,9 +1508,9 @@ public class FSNamesystem implements Nam
    * Version of @see #getNamespaceInfo() that is not protected by a lock.
    */
   NamespaceInfo unprotectedGetNamespaceInfo() {
-    return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
+    return new NamespaceInfo(getFSImage().getStorage().getNamespaceID(),
         getClusterId(), getBlockPoolId(),
-        dir.fsImage.getStorage().getCTime());
+        getFSImage().getStorage().getCTime());
   }
 
   /**
@@ -1460,12 +1528,10 @@ public class FSNamesystem implements Nam
       try {
         stopActiveServices();
         stopStandbyServices();
-        if (dir != null) {
-          dir.close();
-        }
       } catch (IOException ie) {
-        LOG.error("Error closing FSDirectory", ie);
+      } finally {
         IOUtils.cleanup(LOG, dir);
+        IOUtils.cleanup(LOG, fsImage);
       }
     }
   }
@@ -1578,6 +1644,7 @@ public class FSNamesystem implements Nam
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOwner(pc, src);
       dir.setPermission(src, permission);
+      getEditLog().logSetPermissions(src, permission);
       resultingStat = getAuditFileInfo(src, false);
     } finally {
       writeUnlock();
@@ -1623,6 +1690,7 @@ public class FSNamesystem implements Nam
         }
       }
       dir.setOwner(src, username, group);
+      getEditLog().logSetOwner(src, username, group);
       resultingStat = getAuditFileInfo(src, false);
     } finally {
       writeUnlock();
@@ -1641,15 +1709,17 @@ public class FSNamesystem implements Nam
     LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true,
         true);
     if (blocks != null) {
-      blockManager.getDatanodeManager().sortLocatedBlocks(
-          clientMachine, blocks.getLocatedBlocks());
-      
+      blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
+          blocks.getLocatedBlocks(), randomizeBlockLocationsPerBlock);
+
+      // lastBlock is not part of getLocatedBlocks(), might need to sort it too
       LocatedBlock lastBlock = blocks.getLastLocatedBlock();
       if (lastBlock != null) {
-        ArrayList<LocatedBlock> lastBlockList = new ArrayList<LocatedBlock>();
+        ArrayList<LocatedBlock> lastBlockList =
+            Lists.newArrayListWithCapacity(1);
         lastBlockList.add(lastBlock);
-        blockManager.getDatanodeManager().sortLocatedBlocks(
-                              clientMachine, lastBlockList);
+        blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
+            lastBlockList, randomizeBlockLocationsPerBlock);
       }
     }
     return blocks;
@@ -1751,7 +1821,11 @@ public class FSNamesystem implements Nam
             if (isReadOp) {
               continue;
             }
-            dir.setTimes(src, inode, -1, now, false, iip.getLatestSnapshotId());
+            boolean changed = dir.setTimes(inode, -1, now, false,
+                    iip.getLatestSnapshotId());
+            if (changed) {
+              getEditLog().logTimes(src, -1, now);
+            }
           }
         }
         final long fileSize = iip.isSnapshot() ?
@@ -1789,9 +1863,9 @@ public class FSNamesystem implements Nam
    * before we start actual move.
    * 
    * This does not support ".inodes" relative path
-   * @param target target file path to concatenate into
-   * @param srcs files that are concatenated
-   * @throws IOException
+   * @param target target to concat into
+   * @param srcs file that will be concatenated
+   * @throws IOException on error
    */
   void concat(String target, String [] srcs) 
       throws IOException, UnresolvedLinkException {
@@ -1842,6 +1916,7 @@ public class FSNamesystem implements Nam
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
+    waitForLoadingFSImage();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1962,7 +2037,9 @@ public class FSNamesystem implements Nam
           Arrays.toString(srcs) + " to " + target);
     }
 
-    dir.concat(target,srcs, logRetryCache);
+    long timestamp = now();
+    dir.concat(target, srcs, timestamp);
+    getEditLog().logConcat(target, srcs, timestamp, logRetryCache);
   }
   
   /**
@@ -2003,7 +2080,11 @@ public class FSNamesystem implements Nam
       final INodesInPath iip = dir.getINodesInPath4Write(src);
       final INode inode = iip.getLastINode();
       if (inode != null) {
-        dir.setTimes(src, inode, mtime, atime, true, iip.getLatestSnapshotId());
+        boolean changed = dir.setTimes(inode, mtime, atime, true,
+                iip.getLatestSnapshotId());
+        if (changed) {
+          getEditLog().logTimes(src, mtime, atime);
+        }
         resultingStat = getAuditFileInfo(src, false);
       } else {
         throw new FileNotFoundException("File/Directory " + src + " does not exist.");
@@ -2072,7 +2153,7 @@ public class FSNamesystem implements Nam
       checkFsObjectLimit();
 
       // add symbolic link to namespace
-      dir.addSymlink(link, target, dirPerms, createParent, logRetryCache);
+      addSymlink(link, target, dirPerms, createParent, logRetryCache);
       resultingStat = getAuditFileInfo(link, false);
     } finally {
       writeUnlock();
@@ -2111,6 +2192,7 @@ public class FSNamesystem implements Nam
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    waitForLoadingFSImage();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2124,6 +2206,7 @@ public class FSNamesystem implements Nam
       final Block[] blocks = dir.setReplication(src, replication, blockRepls);
       isFile = blocks != null;
       if (isFile) {
+        getEditLog().logSetReplication(src, replication);
         blockManager.setReplication(blockRepls[0], blockRepls[1], src, blocks);
       }
     } finally {
@@ -2237,6 +2320,8 @@ public class FSNamesystem implements Nam
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     boolean create = flag.contains(CreateFlag.CREATE);
     boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+
+    waitForLoadingFSImage();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2321,11 +2406,16 @@ public class FSNamesystem implements Nam
       }
 
       checkFsObjectLimit();
-      final DatanodeDescriptor clientNode = 
-          blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
+      INodeFile newNode = null;
+
+      // Always do an implicit mkdirs for parent directory tree.
+      Path parent = new Path(src).getParent();
+      if (parent != null && mkdirsRecursively(parent.toString(),
+              permissions, true, now())) {
+        newNode = dir.addFile(src, permissions, replication, blockSize,
+                holder, clientMachine);
+      }
 
-      INodeFile newNode = dir.addFile(src, permissions, replication, blockSize,
-          holder, clientMachine, clientNode);
       if (newNode == null) {
         throw new IOException("Unable to add " + src +  " to namespace");
       }
@@ -2391,11 +2481,15 @@ public class FSNamesystem implements Nam
       // finalizeINodeFileUnderConstruction so we need to refresh 
       // the referenced file.  
       myFile = INodeFile.valueOf(dir.getINode(src), src, true);
-      
-      final DatanodeDescriptor clientNode = 
-          blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
-      return prepareFileForWrite(src, myFile, holder, clientMachine, clientNode,
-          true, iip.getLatestSnapshotId(), logRetryCache);
+      final BlockInfo lastBlock = myFile.getLastBlock();
+      // Check that the block has at least minimum replication.
+      if(lastBlock != null && lastBlock.isComplete() &&
+          !getBlockManager().isSufficientlyReplicated(lastBlock)) {
+        throw new IOException("append: lastBlock=" + lastBlock +
+            " of src=" + src + " is not sufficiently replicated yet.");
+      }
+      return prepareFileForWrite(src, myFile, holder, clientMachine, true,
+              iip.getLatestSnapshotId(), logRetryCache);
     } catch (IOException ie) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
       throw ie;
@@ -2410,7 +2504,6 @@ public class FSNamesystem implements Nam
    * @param file existing file object
    * @param leaseHolder identifier of the lease holder on this file
    * @param clientMachine identifier of the client machine
-   * @param clientNode if the client is collocated with a DN, that DN's descriptor
    * @param writeToEditLog whether to persist this change to the edit log
    * @param logRetryCache whether to record RPC ids in editlog for retry cache
    *                      rebuilding
@@ -2419,17 +2512,23 @@ public class FSNamesystem implements Nam
    * @throws IOException
    */
   LocatedBlock prepareFileForWrite(String src, INodeFile file,
-      String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
-      boolean writeToEditLog, int latestSnapshot, boolean logRetryCache)
+                                   String leaseHolder, String clientMachine,
+                                   boolean writeToEditLog,
+                                   int latestSnapshot, boolean logRetryCache)
       throws IOException {
-    file = file.recordModification(latestSnapshot);
-    final INodeFile cons = file.toUnderConstruction(leaseHolder, clientMachine,
-        clientNode);
+    file.recordModification(latestSnapshot);
+    final INodeFile cons = file.toUnderConstruction(leaseHolder, clientMachine);
 
     leaseManager.addLease(cons.getFileUnderConstructionFeature()
         .getClientName(), src);
     
     LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
+    if (ret != null) {
+      // update the quota: use the preferred block size for UC block
+      final long diff = file.getPreferredBlockSize() - ret.getBlockSize();
+      dir.updateSpaceConsumed(src, 0, diff * file.getBlockReplication());
+    }
+
     if (writeToEditLog) {
       getEditLog().logOpenFile(src, cons, logRetryCache);
     }
@@ -2499,10 +2598,10 @@ public class FSNamesystem implements Nam
       // We found the lease for this file. And surprisingly the original
       // holder is trying to recreate this file. This should never occur.
       //
+
       if (!force && lease != null) {
         Lease leaseFile = leaseManager.getLeaseByPath(src);
-        if ((leaseFile != null && leaseFile.equals(lease)) ||
-            lease.getHolder().equals(holder)) { 
+        if (leaseFile != null && leaseFile.equals(lease)) {
           throw new AlreadyBeingCreatedException(
             "failed to create file " + src + " for " + holder +
             " for client " + clientMachine +
@@ -2599,12 +2698,6 @@ public class FSNamesystem implements Nam
           + ", clientMachine=" + clientMachine);
     }
     boolean skipSync = false;
-    if (!supportAppends) {
-      throw new UnsupportedOperationException(
-          "Append is not enabled on this NameNode. Use the " +
-          DFS_SUPPORT_APPEND_KEY + " configuration option to enable it.");
-    }
-
     LocatedBlock lb = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
@@ -2681,9 +2774,10 @@ public class FSNamesystem implements Nam
       checkOperation(OperationCategory.READ);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       LocatedBlock[] onRetryBlock = new LocatedBlock[1];
-      final INode[] inodes = analyzeFileState(
-          src, fileId, clientName, previous, onRetryBlock).getINodes();
-      final INodeFile pendingFile = inodes[inodes.length - 1].asFile();
+      FileState fileState = analyzeFileState(
+          src, fileId, clientName, previous, onRetryBlock);
+      final INodeFile pendingFile = fileState.inode;
+      src = fileState.path;
 
       if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
         // This is a retry. Just return the last block if having locations.
@@ -2696,7 +2790,8 @@ public class FSNamesystem implements Nam
             + maxBlocksPerFile);
       }
       blockSize = pendingFile.getPreferredBlockSize();
-      clientNode = pendingFile.getFileUnderConstructionFeature().getClientNode();
+      clientNode = blockManager.getDatanodeManager().getDatanodeByHost(
+              pendingFile.getFileUnderConstructionFeature().getClientMachine());
       replication = pendingFile.getFileReplication();
     } finally {
       readUnlock();
@@ -2711,16 +2806,17 @@ public class FSNamesystem implements Nam
     Block newBlock = null;
     long offset;
     checkOperation(OperationCategory.WRITE);
+    waitForLoadingFSImage();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       // Run the full analysis again, since things could have changed
       // while chooseTarget() was executing.
       LocatedBlock[] onRetryBlock = new LocatedBlock[1];
-      INodesInPath inodesInPath =
+      FileState fileState = 
           analyzeFileState(src, fileId, clientName, previous, onRetryBlock);
-      INode[] inodes = inodesInPath.getINodes();
-      final INodeFile pendingFile = inodes[inodes.length - 1].asFile();
+      final INodeFile pendingFile = fileState.inode;
+      src = fileState.path;
 
       if (onRetryBlock[0] != null) {
         if (onRetryBlock[0].getLocations().length > 0) {
@@ -2742,9 +2838,10 @@ public class FSNamesystem implements Nam
 
       // allocate new block, record block locations in INode.
       newBlock = createNewBlock();
+      INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
       saveAllocatedBlock(src, inodesInPath, newBlock, targets);
 
-      dir.persistNewBlock(src, pendingFile);
+      persistNewBlock(src, pendingFile);
       offset = pendingFile.computeFileSize();
     } finally {
       writeUnlock();
@@ -2755,7 +2852,17 @@ public class FSNamesystem implements Nam
     return makeLocatedBlock(newBlock, targets, offset);
   }
 
-  INodesInPath analyzeFileState(String src,
+  static class FileState {
+    public final INodeFile inode;
+    public final String path;
+
+    public FileState(INodeFile inode, String fullPath) {
+      this.inode = inode;
+      this.path = fullPath;
+    }
+  }
+
+  FileState analyzeFileState(String src,
                                 long fileId,
                                 String clientName,
                                 ExtendedBlock previous,
@@ -2772,9 +2879,20 @@ public class FSNamesystem implements Nam
     checkFsObjectLimit();
 
     Block previousBlock = ExtendedBlock.getLocalBlock(previous);
-    final INodesInPath iip = dir.getINodesInPath4Write(src);
-    final INodeFile pendingFile
-        = checkLease(src, fileId, clientName, iip.getLastINode());
+    INode inode;
+    if (fileId == INodeId.GRANDFATHER_INODE_ID) {
+      // Older clients may not have given us an inode ID to work with.
+      // In this case, we have to try to resolve the path and hope it
+      // hasn't changed or been deleted since the file was opened for write.
+      final INodesInPath iip = dir.getINodesInPath4Write(src);
+      inode = iip.getLastINode();
+    } else {
+      // Newer clients pass the inode ID, so we can just get the inode
+      // directly.
+      inode = dir.getInode(fileId);
+      if (inode != null) src = inode.getFullPathName();
+    }
+    final INodeFile pendingFile = checkLease(src, clientName, inode, fileId);
     BlockInfo lastBlockInFile = pendingFile.getLastBlock();
     if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
       // The block that the client claims is the current last block
@@ -2832,7 +2950,7 @@ public class FSNamesystem implements Nam
         onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
             ((BlockInfoUnderConstruction)lastBlockInFile).getExpectedStorageLocations(),
             offset);
-        return iip;
+        return new FileState(pendingFile, src);
       } else {
         // Case 3
         throw new IOException("Cannot allocate block in " + src + ": " +
@@ -2845,7 +2963,7 @@ public class FSNamesystem implements Nam
     if (!checkFileProgress(pendingFile, false)) {
       throw new NotReplicatedYetException("Not replicated yet: " + src);
     }
-    return iip;
+    return new FileState(pendingFile, src);
   }
 
   LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
@@ -2858,8 +2976,9 @@ public class FSNamesystem implements Nam
   }
 
   /** @see ClientProtocol#getAdditionalDatanode */
-  LocatedBlock getAdditionalDatanode(String src, final ExtendedBlock blk,
-      final DatanodeInfo[] existings, final String[] storageIDs,
+  LocatedBlock getAdditionalDatanode(String src, long fileId,
+      final ExtendedBlock blk, final DatanodeInfo[] existings,
+      final String[] storageIDs,
       final Set<Node> excludes,
       final int numAdditionalNodes, final String clientName
       ) throws IOException {
@@ -2879,8 +2998,20 @@ public class FSNamesystem implements Nam
       src = FSDirectory.resolvePath(src, pathComponents, dir);
 
       //check lease
-      final INodeFile file = checkLease(src, clientName);
-      clientnode = file.getFileUnderConstructionFeature().getClientNode();
+      final INode inode;
+      if (fileId == INodeId.GRANDFATHER_INODE_ID) {
+        // Older clients may not have given us an inode ID to work with.
+        // In this case, we have to try to resolve the path and hope it
+        // hasn't changed or been deleted since the file was opened for write.
+        inode = dir.getINode(src);
+      } else {
+        inode = dir.getInode(fileId);
+        if (inode != null) src = inode.getFullPathName();
+      }
+      final INodeFile file = checkLease(src, clientName, inode, fileId);
+      String clientMachine = file.getFileUnderConstructionFeature()
+              .getClientMachine();
+      clientnode = blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
       preferredblocksize = file.getPreferredBlockSize();
 
       //find datanode storages
@@ -2903,7 +3034,7 @@ public class FSNamesystem implements Nam
   /**
    * The client would like to let go of the given block
    */
-  boolean abandonBlock(ExtendedBlock b, String src, String holder)
+  boolean abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
       throws LeaseExpiredException, FileNotFoundException,
       UnresolvedLinkException, IOException {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2912,16 +3043,28 @@ public class FSNamesystem implements Nam
     }
     checkOperation(OperationCategory.WRITE);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    waitForLoadingFSImage();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      checkNameNodeSafeMode("Cannot abandon block " + b + " for fle" + src);
+      checkNameNodeSafeMode("Cannot abandon block " + b + " for file" + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
 
+      final INode inode;
+      if (fileId == INodeId.GRANDFATHER_INODE_ID) {
+        // Older clients may not have given us an inode ID to work with.
+        // In this case, we have to try to resolve the path and hope it
+        // hasn't changed or been deleted since the file was opened for write.
+        inode = dir.getINode(src);
+      } else {
+        inode = dir.getInode(fileId);
+        if (inode != null) src = inode.getFullPathName();
+      }
+      final INodeFile file = checkLease(src, holder, inode, fileId);
+
       //
       // Remove the block from the pending creates list
       //
-      INodeFile file = checkLease(src, holder);
       boolean removed = dir.removeBlock(src, file,
           ExtendedBlock.getLocalBlock(b));
       if (!removed) {
@@ -2931,7 +3074,7 @@ public class FSNamesystem implements Nam
         NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
                                       + b + " is removed from pendingCreates");
       }
-      dir.persistBlocks(src, file, false);
+      persistBlocks(src, file, false);
     } finally {
       writeUnlock();
     }
@@ -2939,39 +3082,45 @@ public class FSNamesystem implements Nam
 
     return true;
   }
-  
-  /** make sure that we still have the lease on this file. */
-  private INodeFile checkLease(String src, String holder)
-      throws LeaseExpiredException, UnresolvedLinkException,
-      FileNotFoundException {
-    return checkLease(src, INodeId.GRANDFATHER_INODE_ID, holder,
-        dir.getINode(src));
-  }
-  
-  private INodeFile checkLease(String src, long fileId, String holder,
-      INode inode) throws LeaseExpiredException, FileNotFoundException {
+
+  private INodeFile checkLease(String src, String holder, INode inode,
+                               long fileId)
+      throws LeaseExpiredException, FileNotFoundException {
     assert hasReadLock();
-    if (inode == null || !inode.isFile()) {
+    final String ident = src + " (inode " + fileId + ")";
+    if (inode == null) {
       Lease lease = leaseManager.getLease(holder);
       throw new LeaseExpiredException(
-          "No lease on " + src + ": File does not exist. "
+          "No lease on " + ident + ": File does not exist. "
           + (lease != null ? lease.toString()
               : "Holder " + holder + " does not have any open files."));
     }
+    if (!inode.isFile()) {
+      Lease lease = leaseManager.getLease(holder);
+      throw new LeaseExpiredException(
+          "No lease on " + ident + ": INode is not a regular file. "
+              + (lease != null ? lease.toString()
+              : "Holder " + holder + " does not have any open files."));
+    }
     final INodeFile file = inode.asFile();
     if (!file.isUnderConstruction()) {
       Lease lease = leaseManager.getLease(holder);
       throw new LeaseExpiredException(
-          "No lease on " + src + ": File is not open for writing. "
+          "No lease on " + ident + ": File is not open for writing. "
           + (lease != null ? lease.toString()
               : "Holder " + holder + " does not have any open files."));
     }
+    // No further modification is allowed on a deleted file.
+    // A file is considered deleted, if it is not in the inodeMap or is marked
+    // as deleted in the snapshot feature.
+    if (isFileDeleted(file)) {
+      throw new FileNotFoundException(src);
+    }
     String clientName = file.getFileUnderConstructionFeature().getClientName();
     if (holder != null && !clientName.equals(holder)) {
-      throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
-          + clientName + " but is accessed by " + holder);
+      throw new LeaseExpiredException("Lease mismatch on " + ident +
+          " owned by " + clientName + " but is accessed by " + holder);
     }
-    INodeId.checkId(fileId, file);
     return file;
   }
  
@@ -2992,6 +3141,7 @@ public class FSNamesystem implements Nam
     boolean success = false;
     checkOperation(OperationCategory.WRITE);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    waitForLoadingFSImage();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -3014,10 +3164,20 @@ public class FSNamesystem implements Nam
       String holder, Block last, long fileId) throws SafeModeException,
       UnresolvedLinkException, IOException {
     assert hasWriteLock();
-    final INodesInPath iip = dir.getLastINodeInPath(src);
     final INodeFile pendingFile;
     try {
-      pendingFile = checkLease(src, fileId, holder, iip.getINode(0));
+      final INode inode;
+      if (fileId == INodeId.GRANDFATHER_INODE_ID) {
+        // Older clients may not have given us an inode ID to work with.
+        // In this case, we have to try to resolve the path and hope it
+        // hasn't changed or been deleted since the file was opened for write.
+        final INodesInPath iip = dir.getLastINodeInPath(src);
+        inode = iip.getINode(0);
+      } else {
+        inode = dir.getInode(fileId);
+        if (inode != null) src = inode.getFullPathName();
+      }
+      pendingFile = checkLease(src, holder, inode, fileId);
     } catch (LeaseExpiredException lee) {
       final INode inode = dir.getINode(src);
       if (inode != null
@@ -3032,9 +3192,9 @@ public class FSNamesystem implements Nam
         final Block realLastBlock = inode.asFile().getLastBlock();
         if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
           NameNode.stateChangeLog.info("DIR* completeFile: " +
-              "request from " + holder + " to complete " + src +
-              " which is already closed. But, it appears to be an RPC " +
-              "retry. Returning success");
+              "request from " + holder + " to complete inode " + fileId +
+              "(" + src + ") which is already closed. But, it appears to be " +
+              "an RPC retry. Returning success");
           return true;
         }
       }
@@ -3054,7 +3214,7 @@ public class FSNamesystem implements Nam
     }
 
     finalizeINodeFileUnderConstruction(src, pendingFile,
-        iip.getLatestSnapshotId());
+        Snapshot.CURRENT_STATE_ID);
     return true;
   }
 
@@ -3181,6 +3341,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot rename " + src);
+      waitForLoadingFSImage();
       src = FSDirectory.resolvePath(src, srcComponents, dir);
       dst = FSDirectory.resolvePath(dst, dstComponents, dir);
       checkOperation(OperationCategory.WRITE);
@@ -3214,13 +3375,16 @@ public class FSNamesystem implements Nam
       // Rename does not operates on link targets
       // Do not resolveLink when checking permissions of src and dst
       // Check write access to parent of src
-      checkPermission(pc, src, false, null, FsAction.WRITE, null, null, false);
+      checkPermission(pc, src, false, null, FsAction.WRITE, null, null,
+          false, false);
       // Check write access to ancestor of dst
       checkPermission(pc, actualdst, false, FsAction.WRITE, null, null, null,
-          false);
+          false, false);
     }
 
-    if (dir.renameTo(src, dst, logRetryCache)) {
+    long mtime = now();
+    if (dir.renameTo(src, dst, mtime)) {
+      getEditLog().logRename(src, dst, mtime, logRetryCache);
       return true;
     }
     return false;
@@ -3278,12 +3442,17 @@ public class FSNamesystem implements Nam
       // Rename does not operates on link targets
       // Do not resolveLink when checking permissions of src and dst
       // Check write access to parent of src
-      checkPermission(pc, src, false, null, FsAction.WRITE, null, null, false);
+      checkPermission(pc, src, false, null, FsAction.WRITE, null, null, false,
+          false);
       // Check write access to ancestor of dst
-      checkPermission(pc, dst, false, FsAction.WRITE, null, null, null, false);
+      checkPermission(pc, dst, false, FsAction.WRITE, null, null, null, false,
+          false);
     }
 
-    dir.renameTo(src, dst, logRetryCache, options);
+    waitForLoadingFSImage();
+    long mtime = now();
+    dir.renameTo(src, dst, mtime, options);
+    getEditLog().logRename(src, dst, mtime, logRetryCache, options);
   }
   
   /**
@@ -3354,22 +3523,31 @@ public class FSNamesystem implements Nam
     checkOperation(OperationCategory.WRITE);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     boolean ret = false;
+
+    waitForLoadingFSImage();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot delete " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       if (!recursive && dir.isNonEmptyDirectory(src)) {
-        throw new IOException(src + " is non empty");
+        throw new PathIsNotEmptyDirectoryException(src + " is non empty");
       }
       if (enforcePermission && isPermissionEnabled) {
         checkPermission(pc, src, false, null, FsAction.WRITE, null,
-            FsAction.ALL, false);
+            FsAction.ALL, true, false);
       }
+      long mtime = now();
       // Unlink the target directory from directory tree
-      if (!dir.delete(src, collectedBlocks, removedINodes, logRetryCache)) {
+      long filesRemoved = dir.delete(src, collectedBlocks, removedINodes,
+              mtime);
+      if (filesRemoved < 0) {
         return false;
       }
+      getEditLog().logDelete(src, mtime, logRetryCache);
+      incrDeletedFileCount(filesRemoved);
+      // Blocks/INodes will be handled later
+      removePathAndBlocks(src, null, removedINodes, true);
       ret = true;
     } finally {
       writeUnlock();
@@ -3377,13 +3555,7 @@ public class FSNamesystem implements Nam
     getEditLog().logSync(); 
     removeBlocks(collectedBlocks); // Incremental deletion of blocks
     collectedBlocks.clear();
-    dir.writeLock();
-    try {
-      dir.removeFromInodeMap(removedINodes);
-    } finally {
-      dir.writeUnlock();
-    }
-    removedINodes.clear();
+
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* Namesystem.delete: "
         + src +" is removed");
@@ -3421,14 +3593,24 @@ public class FSNamesystem implements Nam
    * @param blocks Containing the list of blocks to be deleted from blocksMap
    * @param removedINodes Containing the list of inodes to be removed from 
    *                      inodesMap
+   * @param acquireINodeMapLock Whether to acquire the lock for inode removal
    */
   void removePathAndBlocks(String src, BlocksMapUpdateInfo blocks,
-      List<INode> removedINodes) {
+      List<INode> removedINodes, final boolean acquireINodeMapLock) {
     assert hasWriteLock();
     leaseManager.removeLeaseWithPrefixPath(src);
     // remove inodes from inodesMap
     if (removedINodes != null) {
-      dir.removeFromInodeMap(removedINodes);
+      if (acquireINodeMapLock) {
+        dir.writeLock();
+      }
+      try {
+        dir.removeFromInodeMap(removedINodes);
+      } finally {
+        if (acquireINodeMapLock) {
+          dir.writeUnlock();
+        }
+      }
       removedINodes.clear();
     }
     if (blocks == null) {
@@ -3516,7 +3698,8 @@ public class FSNamesystem implements Nam
       checkOperation(OperationCategory.READ);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       if (isPermissionEnabled) {
-        checkPermission(pc, src, false, null, null, null, null, resolveLink);
+        checkPermission(pc, src, false, null, null, null, null, false,
+            resolveLink);
       }
       stat = dir.getFileInfo(src, resolveLink);
     } catch (AccessControlException e) {
@@ -3537,8 +3720,10 @@ public class FSNamesystem implements Nam
       StandbyException, IOException {
     FSPermissionChecker pc = getPermissionChecker();  
     checkOperation(OperationCategory.READ);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     readLock();
     try {
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOperation(OperationCategory.READ);
       if (isPermissionEnabled) {
         checkTraverse(pc, src);
@@ -3589,7 +3774,7 @@ public class FSNamesystem implements Nam
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       status = mkdirsInternal(pc, src, permissions, createParent);
       if (status) {
-        resultingStat = dir.getFileInfo(src, false);
+        resultingStat = getAuditFileInfo(src, false);
       }
     } finally {
       writeUnlock();
@@ -3628,13 +3813,119 @@ public class FSNamesystem implements Nam
     // create multiple inodes.
     checkFsObjectLimit();
 
-    if (!dir.mkdirs(src, permissions, false, now())) {
+    if (!mkdirsRecursively(src, permissions, false, now())) {
       throw new IOException("Failed to create directory: " + src);
     }
     return true;
   }
 
   /**
+   * Create a directory
+   * If ancestor directories do not exist, automatically create them.
+
+   * @param src string representation of the path to the directory
+   * @param permissions the permission of the directory
+   * @param inheritPermission if the permission of the directory should inherit
+   *                          from its parent or not. u+wx is implicitly added to
+   *                          the automatically created directories, and to the
+   *                          given directory if inheritPermission is true
+   * @param now creation time
+   * @return true if the operation succeeds false otherwise
+   * @throws QuotaExceededException if directory creation violates
+   *                                any quota limit
+   * @throws UnresolvedLinkException if a symlink is encountered in src.
+   * @throws SnapshotAccessControlException if path is in RO snapshot
+   */
+  private boolean mkdirsRecursively(String src, PermissionStatus permissions,
+                 boolean inheritPermission, long now)
+          throws FileAlreadyExistsException, QuotaExceededException,
+                 UnresolvedLinkException, SnapshotAccessControlException,
+                 AclException {
+    src = FSDirectory.normalizePath(src);
+    String[] names = INode.getPathNames(src);
+    byte[][] components = INode.getPathComponents(names);
+    final int lastInodeIndex = components.length - 1;
+
+    dir.writeLock();
+    try {
+      INodesInPath iip = dir.getExistingPathINodes(components);
+      if (iip.isSnapshot()) {
+        throw new SnapshotAccessControlException(
+                "Modification on RO snapshot is disallowed");
+      }
+      INode[] inodes = iip.getINodes();
+
+      // find the index of the first null in inodes[]
+      StringBuilder pathbuilder = new StringBuilder();
+      int i = 1;
+      for(; i < inodes.length && inodes[i] != null; i++) {
+        pathbuilder.append(Path.SEPARATOR).append(names[i]);
+        if (!inodes[i].isDirectory()) {
+          throw new FileAlreadyExistsException(
+                  "Parent path is not a directory: "
+                  + pathbuilder + " "+inodes[i].getLocalName());
+        }
+      }
+
+      // default to creating parent dirs with the given perms
+      PermissionStatus parentPermissions = permissions;
+
+      // if not inheriting and it's the last inode, there's no use in
+      // computing perms that won't be used
+      if (inheritPermission || (i < lastInodeIndex)) {
+        // if inheriting (ie. creating a file or symlink), use the parent dir,
+        // else the supplied permissions
+        // NOTE: the permissions of the auto-created directories violate posix
+        FsPermission parentFsPerm = inheritPermission
+                ? inodes[i-1].getFsPermission() : permissions.getPermission();
+
+        // ensure that the permissions allow user write+execute
+        if (!parentFsPerm.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
+          parentFsPerm = new FsPermission(
+                  parentFsPerm.getUserAction().or(FsAction.WRITE_EXECUTE),
+                  parentFsPerm.getGroupAction(),
+                  parentFsPerm.getOtherAction()
+          );
+        }
+
+        if (!parentPermissions.getPermission().equals(parentFsPerm)) {
+          parentPermissions = new PermissionStatus(
+                  parentPermissions.getUserName(),
+                  parentPermissions.getGroupName(),
+                  parentFsPerm
+          );
+          // when inheriting, use same perms for entire path
+          if (inheritPermission) permissions = parentPermissions;
+        }
+      }
+
+      // create directories beginning from the first null index
+      for(; i < inodes.length; i++) {
+        pathbuilder.append(Path.SEPARATOR).append(names[i]);
+        dir.unprotectedMkdir(allocateNewInodeId(), iip, i, components[i],
+                (i < lastInodeIndex) ? parentPermissions : permissions, null,
+                now);
+        if (inodes[i] == null) {
+          return false;
+        }
+        // Directory creation also count towards FilesCreated
+        // to match count of FilesDeleted metric.
+        NameNode.getNameNodeMetrics().incrFilesCreated();
+
+        final String cur = pathbuilder.toString();
+        getEditLog().logMkDir(cur, inodes[i]);
+        if(NameNode.stateChangeLog.isDebugEnabled()) {
+          NameNode.stateChangeLog.debug(
+                  "mkdirs: created directory " + cur);
+        }
+      }
+    } finally {
+      dir.writeUnlock();
+    }
+    return true;
+  }
+
+  /**
    * Get the content summary for a specific file/dir.
    *
    * @param src The string representation of the path to the file
@@ -3678,7 +3969,7 @@ public class FSNamesystem implements Nam
    * 
    * Note: This does not support ".inodes" relative path.
    */
-  void setQuota(String path, long nsQuota, long dsQuota) 
+  void setQuota(String path, long nsQuota, long dsQuota)
       throws IOException, UnresolvedLinkException {
     checkSuperuserPrivilege();
     checkOperation(OperationCategory.WRITE);
@@ -3686,7 +3977,12 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set quota on " + path);
-      dir.setQuota(path, nsQuota, dsQuota);
+      INodeDirectory changed = dir.setQuota(path, nsQuota, dsQuota);
+      if (changed != null) {
+        final Quota.Counts q = changed.getQuotaCounts();
+        getEditLog().logSetQuota(path,
+                q.get(Quota.NAMESPACE), q.get(Quota.DISKSPACE));
+      }
     } finally {
       writeUnlock();
     }
@@ -3695,27 +3991,41 @@ public class FSNamesystem implements Nam
 
   /** Persist all metadata about this file.
    * @param src The string representation of the path
+   * @param fileId The inode ID that we're fsyncing.  Older clients will pass
+   *               INodeId.GRANDFATHER_INODE_ID here.
    * @param clientName The string representation of the client
    * @param lastBlockLength The length of the last block 
    *                        under construction reported from client.
    * @throws IOException if path does not exist
    */
-  void fsync(String src, String clientName, long lastBlockLength) 
+  void fsync(String src, long fileId, String clientName, long lastBlockLength)
       throws IOException, UnresolvedLinkException {
     NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
     checkOperation(OperationCategory.WRITE);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+
+    waitForLoadingFSImage();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot fsync file " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
-      INodeFile pendingFile  = checkLease(src, clientName);
+      final INode inode;
+      if (fileId == INodeId.GRANDFATHER_INODE_ID) {
+        // Older clients may not have given us an inode ID to work with.
+        // In this case, we have to try to resolve the path and hope it
+        // hasn't changed or been deleted since the file was opened for write.
+        inode = dir.getINode(src);
+      } else {
+        inode = dir.getInode(fileId);
+        if (inode != null) src = inode.getFullPathName();
+      }
+      final INodeFile pendingFile = checkLease(src, clientName, inode, fileId);
       if (lastBlockLength > 0) {
         pendingFile.getFileUnderConstructionFeature().updateLengthOfLastBlock(
             pendingFile, lastBlockLength);
       }
-      dir.persistBlocks(src, pendingFile, false);
+      persistBlocks(src, pendingFile, false);
     } finally {
       writeUnlock();
     }
@@ -3787,20 +4097,10 @@ public class FSNamesystem implements Nam
     final BlockInfo lastBlock = pendingFile.getLastBlock();
     BlockUCState lastBlockState = lastBlock.getBlockUCState();
     BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
-    boolean penultimateBlockMinReplication;
-    BlockUCState penultimateBlockState;
-    if (penultimateBlock == null) {
-      penultimateBlockState = BlockUCState.COMPLETE;
-      // If penultimate block doesn't exist then its minReplication is met
-      penultimateBlockMinReplication = true;
-    } else {
-      penultimateBlockState = BlockUCState.COMMITTED;
-      penultimateBlockMinReplication = 
+
+    // If penultimate block doesn't exist then its minReplication is met
+    boolean penultimateBlockMinReplication = penultimateBlock == null ? true :
         blockManager.checkMinReplication(penultimateBlock);
-    }
-    assert penultimateBlockState == BlockUCState.COMPLETE ||
-           penultimateBlockState == BlockUCState.COMMITTED :
-           "Unexpected state of penultimate block in " + src;
 
     switch(lastBlockState) {
     case COMPLETE:
@@ -3906,19 +4206,21 @@ public class FSNamesystem implements Nam
       INodeFile pendingFile, int latestSnapshot) throws IOException,
       UnresolvedLinkException {
     assert hasWriteLock();
+
     FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
     Preconditions.checkArgument(uc != null);
     leaseManager.removeLease(uc.getClientName(), src);
     
-    pendingFile = pendingFile.recordModification(latestSnapshot);
+    pendingFile.recordModification(latestSnapshot);
 
     // The file is no longer pending.
     // Create permanent INode, update blocks. No need to replace the inode here
     // since we just remove the uc feature from pendingFile
     final INodeFile newFile = pendingFile.toCompleteFile(now());
 
+    waitForLoadingFSImage();
     // close file and persist block allocations for this file
-    dir.closeFile(src, newFile);
+    closeFile(src, newFile);
 
     blockManager.checkReplication(newFile);
   }
@@ -3975,6 +4277,7 @@ public class FSNamesystem implements Nam
              + ")");
     checkOperation(OperationCategory.WRITE);
     String src = "";
+    waitForLoadingFSImage();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -3997,7 +4300,30 @@ public class FSNamesystem implements Nam
           throw new IOException("Block (=" + lastblock + ") not found");
         }
       }
-      INodeFile iFile = ((INode)storedBlock.getBlockCollection()).asFile();
+      //
+      // The implementation of delete operation (see @deleteInternal method)
+      // first removes the file paths from namespace, and delays the removal
+      // of blocks to later time for better performance. When
+      // commitBlockSynchronization (this method) is called in between, the
+      // blockCollection of storedBlock could have been assigned to null by
+      // the delete operation, throw IOException here instead of NPE; if the
+      // file path is already removed from namespace by the delete operation,
+      // throw FileNotFoundException here, so not to proceed to the end of
+      // this method to add a CloseOp to the edit log for an already deleted
+      // file (See HDFS-6825).
+      //
+      BlockCollection blockCollection = storedBlock.getBlockCollection();
+      if (blockCollection == null) {
+        throw new IOException("The blockCollection of " + storedBlock
+            + " is null, likely because the file owning this block was"
+            + " deleted and the block removal is delayed");
+      }
+      INodeFile iFile = ((INode)blockCollection).asFile();
+      if (isFileDeleted(iFile)) {
+        throw new FileNotFoundException("File not found: "
+            + iFile.getFullPathName() + ", likely due to delayed block"
+            + " removal");
+      }
       if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Unexpected block (=" + lastblock
@@ -4052,8 +4378,11 @@ public class FSNamesystem implements Nam
           // Otherwise fsck will report these blocks as MISSING, especially if the
           // blocksReceived from Datanodes take a long time to arrive.
           for (int i = 0; i < trimmedTargets.size(); i++) {
-            trimmedTargets.get(i).addBlock(
-              trimmedStorages.get(i), storedBlock);
+            DatanodeStorageInfo storageInfo =
+                trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
+            if (storageInfo != null) {
+              storageInfo.addBlock(storedBlock);
+            }
           }
         }
 
@@ -4069,7 +4398,8 @@ public class FSNamesystem implements Nam
         src = closeFileCommitBlocks(iFile, storedBlock);
       } else {
         // If this commit does not want to close the file, persist blocks
-        src = persistBlocks(iFile, false);
+        src = iFile.getFullPathName();
+        persistBlocks(src, iFile, false);
       }
     } finally {
       writeUnlock();
@@ -4087,11 +4417,10 @@ public class FSNamesystem implements Nam
   }
 
   /**
-   *
-   * @param pendingFile
-   * @param storedBlock
+   * @param pendingFile open file that needs to be closed
+   * @param storedBlock last block
    * @return Path of the file that was closed.
-   * @throws IOException
+   * @throws IOException on error
    */
   @VisibleForTesting
   String closeFileCommitBlocks(INodeFile pendingFile, BlockInfo storedBlock)
@@ -4109,21 +4438,6 @@ public class FSNamesystem implements Nam
   }
 
   /**
-   * Persist the block list for the given file.
-   *
-   * @param pendingFile
-   * @return Path to the given file.
-   * @throws IOException
-   */
-  @VisibleForTesting
-  String persistBlocks(INodeFile pendingFile, boolean logRetryCache)
-      throws IOException {
-    String src = pendingFile.getFullPathName();
-    dir.persistBlocks(src, pendingFile, logRetryCache);
-    return src;
-  }
-
-  /**
    * Renew the lease(s) held by the given client
    */
   void renewLease(String holder) throws IOException {
@@ -4249,7 +4563,7 @@ public class FSNamesystem implements Nam
    * @return registration ID
    */
   String getRegistrationID() {
-    return Storage.getRegistrationID(dir.fsImage.getStorage());
+    return Storage.getRegistrationID(getFSImage().getStorage());
   }
 
   /**
@@ -4299,7 +4613,6 @@ public class FSNamesystem implements Nam
 
   /**
    * Perform resource checks and cache the results.
-   * @throws IOException
    */
   void checkAvailableResources() {
     Preconditions.checkState(nnResourceChecker != null,
@@ -4308,6 +4621,85 @@ public class FSNamesystem implements Nam
   }
 
   /**
+   * Persist the block list for the inode.
+   * @param path
+   * @param file
+   * @param logRetryCache
+   */
+  private void persistBlocks(String path, INodeFile file,
+                             boolean logRetryCache) {
+    assert hasWriteLock();
+    Preconditions.checkArgument(file.isUnderConstruction());
+    getEditLog().logUpdateBlocks(path, file, logRetryCache);
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("persistBlocks: " + path
+              + " with " + file.getBlocks().length + " blocks is persisted to" +
+              " the file system");
+    }
+  }
+
+  void incrDeletedFileCount(long count) {
+    NameNode.getNameNodeMetrics().incrFilesDeleted(count);
+  }
+
+  /**
+   * Close file.
+   * @param path
+   * @param file
+   */
+  private void closeFile(String path, INodeFile file) {
+    assert hasWriteLock();
+    waitForLoadingFSImage();
+    // file is closed
+    getEditLog().logCloseFile(path, file);
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("closeFile: "
+              +path+" with "+ file.getBlocks().length
+              +" blocks is persisted to the file system");
+    }
+  }
+
+  /**
+   * Add the given symbolic link to the fs. Record it in the edits log.
+   * @param path
+   * @param target
+   * @param dirPerms
+   * @param createParent
+   * @param logRetryCache
+   * @param dir
+   */
+  private INodeSymlink addSymlink(String path, String target,
+                                  PermissionStatus dirPerms,
+                                  boolean createParent, boolean logRetryCache)
+      throws UnresolvedLinkException, FileAlreadyExistsException,
+      QuotaExceededException, SnapshotAccessControlException, AclException {
+    waitForLoadingFSImage();
+
+    final long modTime = now();
+    if (createParent) {
+      final String parent = new Path(path).getParent().toString();
+      if (!mkdirsRecursively(parent, dirPerms, true, modTime)) {
+        return null;
+      }
+    }
+    final String userName = dirPerms.getUserName();
+    long id = allocateNewInodeId();
+    INodeSymlink newNode = dir.addSymlink(id, path, target, modTime, modTime,
+            new PermissionStatus(userName, null, FsPermission.getDefault()));
+    if (newNode == null) {
+      NameNode.stateChangeLog.info("addSymlink: failed to add " + path);
+      return null;
+    }
+    getEditLog().logSymlink(path, target, modTime, modTime, newNode,
+        logRetryCache);
+
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("addSymlink: " + path + " is added");
+    }
+    return newNode;
+  }
+
+  /**
    * Periodically calls hasAvailableResources of NameNodeResourceChecker, and if
    * there are found to be insufficient resources available, causes the NN to
    * enter safe mode. If resources are later found to have returned to
@@ -4387,7 +4779,7 @@ public class FSNamesystem implements Nam
   }
 
   public FSImage getFSImage() {
-    return dir.fsImage;
+    return fsImage;
   }
 
   public FSEditLog getEditLog() {
@@ -4550,6 +4942,28 @@ public class FSNamesystem implements Nam
     }
   }
 
+  DatanodeStorageReport[] getDatanodeStorageReport(final DatanodeReportType type
+      ) throws AccessControlException, StandbyException {
+    checkSuperuserPrivilege();
+    checkOperation(OperationCategory.UNCHECKED);
+    readLock();
+    try {
+      checkOperation(OperationCategory.UNCHECKED);
+      final DatanodeManager dm = getBlockManager().getDatanodeManager();      
+      final List<DatanodeDescriptor> datanodes = dm.getDatanodeListForReport(type);
+
+      DatanodeStorageReport[] reports = new DatanodeStorageReport[datanodes.size()];
+      for (int i = 0; i < reports.length; i++) {
+        final DatanodeDescriptor d = datanodes.get(i);
+        reports[i] = new DatanodeStorageReport(new DatanodeInfo(d),
+            d.getStorageReports());
+      }
+      return reports;
+    } finally {
+      readUnlock();
+    }
+  }
+
   /**
    * Save namespace image.
    * This will save current namespace into fsimage file and empty edits file.
@@ -4640,6 +5054,21 @@ public class FSNamesystem implements Nam
   }
 
   /**
+   * Persist the new block (the last block of the given file).
+   * @param path
+   * @param file
+   */
+  private void persistNewBlock(String path, INodeFile file) {
+    Preconditions.checkArgument(file.isUnderConstruction());
+    getEditLog().logAddBlock(path, file);
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("persistNewBlock: "
+              + path + " with new block " + file.getLastBlock().toString()
+              + ", current total block count is " + file.getBlocks().length);
+    }
+  }
+
+  /**
    * SafeModeInfo contains information related to the safe mode.
    * <p>
    * An instance of {@link SafeModeInfo} is created when the name node
@@ -4666,8 +5095,12 @@ public class FSNamesystem implements Nam
     private final double threshold;
     /** Safe mode minimum number of datanodes alive */
     private final int datanodeThreshold;
-    /** Safe mode extension after the threshold. */
-    private int extension;
+    /**
+     * Safe mode extension after the threshold.
+     * Make it volatile so that getSafeModeTip can read the latest value
+     * without taking a lock.
+     */
+    private volatile int extension;
     /** Min replication required by safe mode. */
     private final int safeReplication;
     /** threshold for populating needed replication queues */
@@ -4689,8 +5122,12 @@ public class FSNamesystem implements Nam
     private int blockReplQueueThreshold;
     /** time of the last status printout */
     private long lastStatusReport = 0;
-    /** Was safemode entered automatically because available resources were low. */
-    private boolean resourcesLow = false;
+    /**
+     * Was safemode entered automatically because available resources were low.
+     * Make it volatile so that getSafeModeTip can read the latest value
+     * without taking a lock.
+     */
+    private volatile boolean resourcesLow = false;
     /** Should safemode adjust its block totals as blocks come in */
     private boolean shouldIncrementallyTrackBlocks = false;
     /** counter for tracking startup progress of reported blocks */
@@ -5350,7 +5787,6 @@ public class FSNamesystem implements Nam
 
   /**
    * Leave safe mode.
-   * @throws IOException
    */
   void leaveSafeMode() {
     writeLock();
@@ -5366,14 +5802,21 @@ public class FSNamesystem implements Nam
   }
     
   String getSafeModeTip() {
-    readLock();
-    try {
-      if (!isInSafeMode()) {
-        return "";
-      }
+    // There is no need to take readLock.
+    // Don't use isInSafeMode as this.safeMode might be set to null.
+    // after isInSafeMode returns.
+    boolean inSafeMode;
+    SafeModeInfo safeMode = this.safeMode;
+    if (safeMode == null) {
+      inSafeMode = false;
+    } else {
+      inSafeMode = safeMode.isOn();
+    }
+
+    if (!inSafeMode) {
+      return "";
+    } else {
       return safeMode.getTurnOffTip();
-    } finally {
-      readUnlock();
     }
   }
 
@@ -5418,7 +5861,7 @@ public class FSNamesystem implements Nam
   }
 
   public void processIncrementalBlockReport(final DatanodeID nodeID,
-      final String poolId, final StorageReceivedDeletedBlocks srdb)
+      final StorageReceivedDeletedBlocks srdb)
       throws IOException {
     writeLock();
     try {
@@ -5501,7 +5944,7 @@ public class FSNamesystem implements Nam
       FsAction parentAccess, FsAction access, FsAction subAccess)
       throws AccessControlException, UnresolvedLinkException {
         checkPermission(pc, path, doCheckOwner, ancestorAccess,
-            parentAccess, access, subAccess, true);
+            parentAccess, access, subAccess, false, true);
   }
 
   /**
@@ -5512,14 +5955,14 @@ public class FSNamesystem implements Nam
   private void checkPermission(FSPermissionChecker pc,
       String path, boolean doCheckOwner, FsAction ancestorAccess,
       FsAction parentAccess, FsAction access, FsAction subAccess,
-      boolean resolveLink)
+      boolean ignoreEmptyDir, boolean resolveLink)
       throws AccessControlException, UnresolvedLinkException {
     if (!pc.isSuperUser()) {
-      dir.waitForReady();
+      waitForLoadingFSImage();
       readLock();
       try {
-        pc.checkPermission(path, dir.rootDir, doCheckOwner, ancestorAccess,
-            parentAccess, access, subAccess, resolveLink);
+        pc.checkPermission(path, dir, doCheckOwner, ancestorAccess,
+            parentAccess, access, subAccess, ignoreEmptyDir, resolveLink);
       } finally {
         readUnlock();
       }
@@ -5549,12 +5992,9 @@ public class FSNamesystem implements Nam
   @Override // FSNamesystemMBean
   @Metric
   public long getFilesTotal() {
-    readLock();
-    try {
-      return this.dir.totalInodes();
-    } finally {
-      readUnlock();
-    }
+    // There is no need to take fSNamesystem's lock as
+    // FSDirectory has its own lock.
+    return this.dir.totalInodes();
   }
 
   @Override // FSNamesystemMBean
@@ -5671,7 +6111,6 @@ public class FSNamesystem implements Nam
       blockManager.shutdown();
     }
   }
-  
 
   @Override // FSNamesystemMBean
   public int getNumLiveDataNodes() {
@@ -5719,6 +6158,15 @@ public class FSNamesystem implements Nam
   }
 
   /**
+   * Storages are marked as "content stale" after NN restart or fails over and
+   * before NN receives the first Heartbeat followed by the first Blockreport.
+   */
+  @Override // FSNamesystemMBean
+  public int getNumStaleStorages() {
+    return getBlockManager().getDatanodeManager().getNumStaleStorages();
+  }
+
+  /**
    * Sets the current generation stamp for legacy blocks
    */
   void setGenerationStampV1(long stamp) {
@@ -5767,7 +6215,7 @@ public class FSNamesystem implements Nam
   /**
    * Sets the generation stamp that delineates random and sequentially
    * allocated block IDs.
-   * @param stamp
+   * @param stamp set generation stamp limit to this value
    */
   void setGenerationStampV1Limit(long stamp) {
     Preconditions.checkState(generationStampV1Limit ==
@@ -5852,7 +6300,6 @@ public class FSNamesystem implements Nam
    * Determine whether the block ID was randomly generated (legacy) or
    * sequentially generated. The generation stamp value is used to
    * make the distinction.
-   * @param block
    * @return true if the block ID was randomly generated, false otherwise.
    */
   boolean isLegacyBlock(Block block) {
@@ -5871,6 +6318,35 @@ public class FSNamesystem implements Nam
     return blockId;
   }
 
+  private boolean isFileDeleted(INodeFile file) {
+    // Not in the inodeMap or in the snapshot but marked deleted.
+    if (dir.getInode(file.getId()) == null) {
+      return true;
+    }
+
+    // look at the path hierarchy to see if one parent is deleted by recursive
+    // deletion
+    INode tmpChild = file;
+    INodeDirectory tmpParent = file.getParent();
+    while (true) {
+      if (tmpParent == null ||
+          tmpParent.searchChildren(tmpChild.getLocalNameBytes()) < 0) {
+        return true;
+      }
+      if (tmpParent.isRoot()) {
+        break;
+      }
+      tmpChild = tmpParent;
+      tmpParent = tmpParent.getParent();
+    }
+
+    if (file.isWithSnapshot() &&
+        file.getFileWithSnapshotFeature().isCurrentFileDeleted()) {
+      return true;
+    }
+    return false;
+  }
+
   private INodeFile checkUCBlock(ExtendedBlock block,
       String clientName) throws IOException {
     assert hasWriteLock();
@@ -5887,7 +6363,7 @@ public class FSNamesystem implements Nam
     
     // check file inode
     final INodeFile file = ((INode)storedBlock.getBlockCollection()).asFile();
-    if (file == null || !file.isUnderConstruction()) {
+    if (file == null || !file.isUnderConstruction() || isFileDeleted(file)) {
       throw new IOException("The file " + storedBlock + 
           " belonged to does not exist or it is not under construction.");
     }
@@ -5986,6 +6462,7 @@ public class FSNamesystem implements Nam
              + ", newNodes=" + Arrays.asList(newNodes)
              + ", clientName=" + clientName
              + ")");
+    waitForLoadingFSImage();
     writeLock();
     boolean success = false;
     try {
@@ -6037,7 +6514,7 @@ public class FSNamesystem implements Nam
     blockinfo.setExpectedLocations(storages);
 
     String src = pendingFile.getFullPathName();
-    dir.persistBlocks(src, pendingFile, logRetryCache);
+    persistBlocks(src, pendingFile, logRetryCache);
   }
 
   // rename was successful. If any part of the renamed subtree had
@@ -6048,6 +6525,42 @@ public class FSNamesystem implements Nam
   }
 
   /**
+   * Serializes leases.
+   */
+  void saveFilesUnderConstruction(DataOutputStream out,
+      Map<Long, INodeFile> snapshotUCMap) throws IOException {
+    // This is run by an inferior thread of saveNamespace, which holds a read
+    // lock on our behalf. If we took the read lock here, we could block
+    // for fairness if a writer is waiting on the lock.
+    synchronized (leaseManager) {
+      Map<String, INodeFile> nodes = leaseManager.getINodesUnderConstruction();
+      for (Map.Entry<String, INodeFile> entry : nodes.entrySet()) {
+        // TODO: for HDFS-5428, because of rename operations, some
+        // under-construction files that are
+        // in the current fs directory can also be captured in the
+        // snapshotUCMap. We should remove them from the snapshotUCMap.
+        snapshotUCMap.remove(entry.getValue().getId());
+      }
+
+      out.writeInt(nodes.size() + snapshotUCMap.size()); // write the size
+      for (Map.Entry<String, INodeFile> entry : nodes.entrySet()) {
+        FSImageSerialization.writeINodeUnderConstruction(
+            out, entry.getValue(), entry.getKey());
+      }
+      for (Map.Entry<Long, INodeFile> entry : snapshotUCMap.entrySet()) {
+        // for those snapshot INodeFileUC, we use "/.reserved/.inodes/<inodeid>"
+        // as their paths
+        StringBuilder b = new StringBuilder();
+        b.append(FSDirectory.DOT_RESERVED_PATH_PREFIX)
+            .append(Path.SEPARATOR).append(FSDirectory.DOT_INODES_STRING)
+            .append(Path.SEPARATOR).append(entry.getValue().getId());
+        FSImageSerialization.writeINodeUnderConstruction(
+            out, entry.getValue(), b.toString());
+      }
+    }
+  }
+
+  /**
    * @return all the under-construction files in the lease map
    */
   Map<String, INodeFile> getFilesUnderConstruction() {
@@ -6089,7 +6602,6 @@ public class FSNamesystem implements Nam
    * Release (unregister) backup node.
    * <p>
    * Find and remove the backup stream corresponding to the node.
-   * @param registration
    * @throws IOException
    */
   void releaseBackupNode(NamenodeRegistration registration)
@@ -6136,6 +6648,23 @@ public class FSNamesystem implements Nam
   String[] cookieTab) throws IOException {
     checkSuperuserPrivilege();
     checkOperation(OperationCategory.READ);
+
+    int count = 0;
+    ArrayList<CorruptFileBlockInfo> corruptFiles =
+        new ArrayList<CorruptFileBlockInfo>();
+    if (cookieTab == null) {
+      cookieTab = new String[] { null };
+    }
+
+    // Do a quick check if there are any corrupt files without taking the lock
+    if (blockManager.getMissingBlocksCount() == 0) {
+      if (cookieTab[0] == null) {
+        cookieTab[0] = String.valueOf(getIntCookie(cookieTab[0]));
+      }
+      LOG.info("there are no corrupt file blocks.");
+      return corruptFiles;
+    }
+
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -6144,14 +6673,9 @@ public class FSNamesystem implements Nam
                               "replication queues have not been initialized.");
       }
       // print a limited # of corrupt files per call
-      int count = 0;
-      ArrayList<CorruptFileBlockInfo> corruptFiles = new ArrayList<CorruptFileBlockInfo>();
 
       final Iterator<Block> blkIterator = blockManager.getCorruptReplicaBlockIterator();
 
-      if (cookieTab == null) {
-        cookieTab = new String[] { null };
-      }
       int skip = getIntCookie(cookieTab[0]);
       for (int i = 0; i < skip && blkIterator.hasNext(); i++) {
         blkIterator.next();
@@ -6225,8 +6749,8 @@ public class FSNamesystem implements Nam
 
   /**
    * @param renewer Renewer information
-   * @return Token<DelegationTokenIdentifier>
-   * @throws IOException
+   * @return delegation toek
+   * @throws IOException on error
    */
   Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
@@ -6267,10 +6791,10 @@ public class FSNamesystem implements Nam
 
   /**
    * 
-   * @param token delegation token
-   * @return New expiryTime of the token
-   * @throws InvalidToken
-   * @throws IOException
+   * @param token token to renew
+   * @return new expiryTime of the token
+   * @throws InvalidToken if {@code token} is invalid
+   * @throws IOException on other errors
    */
   long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws InvalidToken, IOException {
@@ -6301,8 +6825,8 @@ public class FSNamesystem implements Nam
 
   /**
    * 
-   * @param token delegation token that needs to be canceled
-   * @throws IOException
+   * @param token token to cancel
+   * @throws IOException on error
    */
   void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
@@ -6322,6 +6846,15 @@ public class FSNamesystem implements Nam
     getEditLog().logSync();
   }
 
+  /**
+   * @param out save state of the secret manager
+   * @param sdPath String storage directory path
+   */
+  void saveSecretManagerStateCompat(DataOutputStream out, String sdPath)
+      throws IOException {
+    dtSecretManager.saveSecretManagerStateCompat(out, sdPath);
+  }
+
   SecretManagerState saveSecretManagerState() {
     return dtSecretManager.saveSecretManagerState();
   }
@@ -6630,7 +7163,7 @@ public class FSNamesystem implements Nam
 
   @Override  // NameNodeMXBean
   public String getClusterId() {
-    return dir.fsImage.getStorage().getClusterID();
+    return getFSImage().getStorage().getClusterID();
   }
   
   @Override  // NameNodeMXBean
@@ -6873,7 +7406,18 @@ public class FSNamesystem implements Nam
 
   @Override // FSClusterStats
   public int getNumDatanodesInService() {
-    return getNumLiveDataNodes() - getNumDecomLiveDataNodes();
+    return datanodeStatistics.getNumDatanodesInService();
+  }
+  
+  @Override // for block placement strategy
+  public double getInServiceXceiverAverage() {
+    double avgLoad = 0;
+    final int nodes = getNumDatanodesInService();
+    if (nodes != 0) {
+      final int xceivers = datanodeStatistics.getInServiceXceiverCount();
+      avgLoad = (double)xceivers/nodes;
+    }
+    return avgLoad;
   }
 
   public SnapshotManager getSnapshotManager() {
@@ -7071,7 +7615,7 @@ public class FSNamesystem implements Nam
    */
   SnapshotDiffReport getSnapshotDiffReport(String path,
       String fromSnapshot, String toSnapshot) throws IOException {
-    SnapshotDiffInfo diffs = null;
+    SnapshotDiffReport diffs;
     checkOperation(OperationCategory.READ);
     final FSPermissionChecker pc = getPermissionChecker();
     readLock();
@@ -7085,13 +7629,11 @@ public class FSNamesystem implements Nam
     } finally {
       readUnlock();
     }
-    
+
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(true, "computeSnapshotDiff", null, null, null);
     }
-    return diffs != null ? diffs.generateReport() : new SnapshotDiffReport(
-        path, fromSnapshot, toSnapshot,
-        Collections.<DiffReportEntry> emptyList());
+    return diffs;
   }
   
   private void checkSubtreeReadPermission(final FSPermissionChecker pc,
@@ -7160,7 +7702,7 @@ public class FSNamesystem implements Nam
    * Remove a list of INodeDirectorySnapshottable from the SnapshotManager
    * @param toRemove the list of INodeDirectorySnapshottable to be removed
    */
-  void removeSnapshottableDirs(List<INodeDirectorySnapshottable> toRemove) {
+  void removeSnapshottableDirs(List<INodeDirectory> toRemove) {
     if (snapshotManager != null) {
       snapshotManager.removeSnapshottable(toRemove);
     }
@@ -7213,7 +7755,7 @@ public class FSNamesystem implements Nam
 
   /**
    * Update internal state to indicate that a rolling upgrade is in progress.
-   * @param startTime start time of the rolling upgrade
+   * @param startTime rolling upgrade start time
    */
   void startRollingUpgradeInternal(long startTime)
       throws IOException {
@@ -7306,14 +7848,21 @@ public class FSNamesystem implements Nam
 
       returnInfo = finalizeRollingUpgradeInternal(now());
       getEditLog().logFinalizeRollingUpgrade(returnInfo.getFinalizeTime());
-      getFSImage().saveNamespace(this);
+      if (haEnabled) {
+        // roll the edit log to make sure the standby NameNode can tail
+        getFSImage().rollEditLog();
+      }
+      getFSImage().updateStorageVersion();
       getFSImage().renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK,
           NameNodeFile.IMAGE);
     } finally {
       writeUnlock();
     }
 
-    // getEditLog().logSync() is not needed since it does saveNamespace 
+    if (!haEnabled) {
+      // Sync not needed for ha since the edit was rolled after logging.
+      getEditLog().logSync();
+    }
 
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(true, "finalizeRollingUpgrade", null, null, null);
@@ -7348,6 +7897,7 @@ public class FSNamesystem implements Nam
       cacheManager.waitForRescanIfNeeded();
     }
     writeLock();
+    String effectiveDirectiveStr = null;
     Long result = null;
     try {
       checkOperation(OperationCategory.WRITE);
@@ -7359,11 +7909,12 @@ public class FSNamesystem implements Nam
         throw new IOException("addDirective: you cannot specify an ID " +
             "for this operation.");
       }
-      CacheDirectiveInfo effectiveDirective = 
+      CacheDirectiveInfo effectiveDirective =
           cacheManager.addDirective(directive, pc, flags);
       getEditLog().logAddCacheDirectiveInfo(effectiveDirective,
           cacheEntry != null);
       result = effectiveDirective.getId();
+      effectiveDirectiveStr = effectiveDirective.toString();
       success = true;
     } finally {
       writeUnlock();
@@ -7371,7 +7922,7 @@ public class FSNamesystem implements Nam
         getEditLog().logSync();
       }
       if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(success, "addCacheDirective", null, null, null);
+        logAuditEvent(success, "addCacheDirective", effectiveDirectiveStr, null, null);
       }
       RetryCache.setState(cacheEntry, success, result);
     }
@@ -7408,7 +7959,8 @@ public class FSNamesystem implements Nam
         getEditLog().logSync();
       }
       if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(success, "modifyCacheDirective", null, null, null);
+        String idStr = "{id: " + directive.getId().toString() + "}";
+        logAuditEvent(success, "modifyCacheDirective", idStr, directive.toString(), null);
       }
       RetryCache.setState(cacheEntry, success);
     }
@@ -7436,7 +7988,8 @@ public class FSNamesystem implements Nam
     } finally {
       writeUnlock();
       if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(success, "removeCacheDirective", null, null,
+        String idStr = "{id: " + id.toString() + "}";
+        logAuditEvent(success, "removeCacheDirective", idStr, null,
             null);
       }
       RetryCache.setState(cacheEntry, success);
@@ -7461,7 +8014,7 @@ public class FSNamesystem implements Nam
     } finally {
       readUnlock();
       if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(success, "listCacheDirectives", null, null,
+        logAuditEvent(success, "listCacheDirectives", filter.toString(), null,
             null);
       }
     }
@@ -7478,6 +8031,7 @@ public class FSNamesystem implements Nam
     }
     writeLock();
     boolean success = false;
+    String poolInfoStr = null;
     try {
       checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
@@ -7488,12 +8042,13 @@ public class FSNamesystem implements Nam
         pc.checkSuperuserPrivilege();
       }
       CachePoolInfo info = cacheManager.addCachePool(req);
+      poolInfoStr = info.toString();
       getEditLog().logAddCachePool(info, cacheEntry != null);
       success = true;
     } finally {
       writeUnlock();
       if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(success, "addCachePool", req.getPoolName(), null, null);
+        logAuditEvent(success, "addCachePool", poolInfoStr, null, null);
       }
       RetryCache.setState(cacheEntry, success);
     }
@@ -7526,7 +8081,8 @@ public class FSNamesystem implements Nam
     } finally {
       writeUnlock();
       if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(success, "modifyCachePool", req.getPoolName(), null, null);
+        String poolNameStr = "{poolName: " + req.getPoolName() + "}";
+        logAuditEvent(success, "modifyCachePool", poolNameStr, req.toString(), null);
       }
       RetryCache.setState(cacheEntry, success);
     }
@@ -7559,7 +8115,8 @@ public class FSNamesystem implements Nam
     } finally {
       writeUnlock();
       if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(success, "removeCachePool", cachePoolName, null, null);
+        String poolNameStr = "{poolName: " + cachePoolName + "}";
+        logAuditEvent(success, "removeCachePool", poolNameStr, null, null);
       }
       RetryCache.setState(cacheEntry, success);
     }
@@ -7590,7 +8147,7 @@ public class FSNamesystem implements Nam
   }
 
   void modifyAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
-    aclConfigFlag.checkForApiCall();
+    nnConf.checkAclsConfigFlag();
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
@@ -7601,7 +8158,8 @@ public class FSNamesystem implements Nam
       checkNameNodeSafeMode("Cannot modify ACL entries on " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOwner(pc, src);
-      dir.modifyAclEntries(src, aclSpec);
+      List<AclEntry> newAcl = dir.modifyAclEntries(src, aclSpec);
+      getEditLog().logSetAcl(src, newAcl);
       resultingStat = getAuditFileInfo(src, false);
     } finally {
       writeUnlock();
@@ -7611,7 +8169,7 @@ public class FSNamesystem implements Nam
   }
 
   void removeAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
-    aclConfigFlag.checkForApiCall();
+    nnConf.checkAclsConfigFlag();
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
@@ -7622,7 +8180,8 @@ public class FSNamesystem implements Nam
       checkNameNodeSafeMode("Cannot remove ACL entries on " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOwner(pc, src);
-      dir.removeAclEntries(src, aclSpec);
+      List<AclEntry> newAcl = dir.removeAclEntries(src, aclSpec);
+      getEditLog().logSetAcl(src, newAcl);
       resultingStat = getAuditFileInfo(src, false);
     } finally {
       writeUnlock();
@@ -7632,7 +8191,7 @@ public class FSNamesystem implements Nam
   }
 
   void removeDefaultAcl(String src) throws IOException {
-    aclConfigFlag.checkForApiCall();
+    nnConf.checkAclsConfigFlag();
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
@@ -7643,7 +8202,8 @@ public class FSNamesystem implements Nam
       checkNameNodeSafeMode("Cannot remove default ACL entries on " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOwner(pc, src);
-      dir.removeDefaultAcl(src);
+      List<AclEntry> newAcl = dir.removeDefaultAcl(src);
+      getEditLog().logSetAcl(src, newAcl);
       resultingStat = getAuditFileInfo(src, false);
     } finally {
       writeUnlock();
@@ -7653,7 +8213,7 @@ public class FSNamesystem implements Nam
   }
 
   void removeAcl(String src) throws IOException {
-    aclConfigFlag.checkForApiCall();
+    nnConf.checkAclsConfigFlag();
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
@@ -7665,6 +8225,7 @@ public class FSNamesystem implements Nam

[... 326 lines stripped ...]