You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2011/06/12 20:17:05 UTC

svn commit: r1134951 [2/3] - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/cli/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/ser...

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1134951&r1=1134950&r2=1134951&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sun Jun 12 18:17:05 2011
@@ -491,6 +491,14 @@ public class FSNamesystem implements FSC
     return this.fsLock.isWriteLockedByCurrentThread();
   }
 
+  boolean hasReadLock() {
+    return this.fsLock.getReadHoldCount() > 0;
+  }
+
+  boolean hasReadOrWriteLock() {
+    return hasReadLock() || hasWriteLock();
+  }
+
   /**
    * dirs is a list of directories where the filesystem directory state 
    * is stored
@@ -601,11 +609,14 @@ public class FSNamesystem implements FSC
   }
   
   NamespaceInfo getNamespaceInfo() {
-    return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
-                             getClusterId(),
-                             getBlockPoolId(),
-                             dir.fsImage.getStorage().getCTime(),
-                             getDistributedUpgradeVersion());
+    readLock();
+    try {
+      return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
+          getClusterId(), getBlockPoolId(),
+          dir.fsImage.getStorage().getCTime(), getDistributedUpgradeVersion());
+    } finally {
+      readUnlock();
+    }
   }
 
   /**
@@ -654,32 +665,32 @@ public class FSNamesystem implements FSC
   void metaSave(String filename) throws IOException {
     writeLock();
     try {
-    checkSuperuserPrivilege();
-    File file = new File(System.getProperty("hadoop.log.dir"), filename);
-    PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
-        true)));
-
-    long totalInodes = this.dir.totalInodes();
-    long totalBlocks = this.getBlocksTotal();
-
-    ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
-    ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
-    this.DFSNodesStatus(live, dead);
-    
-    String str = totalInodes + " files and directories, " + totalBlocks
-        + " blocks = " + (totalInodes + totalBlocks) + " total";
-    out.println(str);
-    out.println("Live Datanodes: "+live.size());
-    out.println("Dead Datanodes: "+dead.size());
-    blockManager.metaSave(out);
-
-    //
-    // Dump all datanodes
-    //
-    datanodeDump(out);
-
-    out.flush();
-    out.close();
+      checkSuperuserPrivilege();
+      File file = new File(System.getProperty("hadoop.log.dir"), filename);
+      PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
+          true)));
+  
+      long totalInodes = this.dir.totalInodes();
+      long totalBlocks = this.getBlocksTotal();
+  
+      ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+      ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+      this.DFSNodesStatus(live, dead);
+      
+      String str = totalInodes + " files and directories, " + totalBlocks
+          + " blocks = " + (totalInodes + totalBlocks) + " total";
+      out.println(str);
+      out.println("Live Datanodes: "+live.size());
+      out.println("Dead Datanodes: "+dead.size());
+      blockManager.metaSave(out);
+  
+      //
+      // Dump all datanodes
+      //
+      datanodeDump(out);
+  
+      out.flush();
+      out.close();
     } finally {
       writeUnlock();
     }
@@ -717,46 +728,46 @@ public class FSNamesystem implements FSC
       throws IOException {
     readLock();
     try {
-    checkSuperuserPrivilege();
-
-    DatanodeDescriptor node = getDatanode(datanode);
-    if (node == null) {
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
-          + "Asking for blocks from an unrecorded node " + datanode.getName());
-      throw new IllegalArgumentException(
-          "Unexpected exception.  Got getBlocks message for datanode " +
-          datanode.getName() + ", but there is no info for it");
-    }
-
-    int numBlocks = node.numBlocks();
-    if(numBlocks == 0) {
-      return new BlocksWithLocations(new BlockWithLocations[0]);
-    }
-    Iterator<BlockInfo> iter = node.getBlockIterator();
-    int startBlock = r.nextInt(numBlocks); // starting from a random block
-    // skip blocks
-    for(int i=0; i<startBlock; i++) {
-      iter.next();
-    }
-    List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
-    long totalSize = 0;
-    BlockInfo curBlock;
-    while(totalSize<size && iter.hasNext()) {
-      curBlock = iter.next();
-      if(!curBlock.isComplete())  continue;
-      totalSize += addBlock(curBlock, results);
-    }
-    if(totalSize<size) {
-      iter = node.getBlockIterator(); // start from the beginning
-      for(int i=0; i<startBlock&&totalSize<size; i++) {
+      checkSuperuserPrivilege();
+  
+      DatanodeDescriptor node = getDatanode(datanode);
+      if (node == null) {
+        NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
+            + "Asking for blocks from an unrecorded node " + datanode.getName());
+        throw new IllegalArgumentException(
+            "Unexpected exception.  Got getBlocks message for datanode " +
+            datanode.getName() + ", but there is no info for it");
+      }
+  
+      int numBlocks = node.numBlocks();
+      if(numBlocks == 0) {
+        return new BlocksWithLocations(new BlockWithLocations[0]);
+      }
+      Iterator<BlockInfo> iter = node.getBlockIterator();
+      int startBlock = r.nextInt(numBlocks); // starting from a random block
+      // skip blocks
+      for(int i=0; i<startBlock; i++) {
+        iter.next();
+      }
+      List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
+      long totalSize = 0;
+      BlockInfo curBlock;
+      while(totalSize<size && iter.hasNext()) {
         curBlock = iter.next();
         if(!curBlock.isComplete())  continue;
         totalSize += addBlock(curBlock, results);
       }
-    }
-
-    return new BlocksWithLocations(
-        results.toArray(new BlockWithLocations[results.size()]));
+      if(totalSize<size) {
+        iter = node.getBlockIterator(); // start from the beginning
+        for(int i=0; i<startBlock&&totalSize<size; i++) {
+          curBlock = iter.next();
+          if(!curBlock.isComplete())  continue;
+          totalSize += addBlock(curBlock, results);
+        }
+      }
+  
+      return new BlocksWithLocations(
+          results.toArray(new BlockWithLocations[results.size()]));
     } finally {
       readUnlock();
     }
@@ -777,6 +788,7 @@ public class FSNamesystem implements FSC
    * return the length of the added block; 0 if the block is not added
    */
   private long addBlock(Block block, List<BlockWithLocations> results) {
+    assert hasReadOrWriteLock();
     ArrayList<String> machineSet = blockManager.getValidLocations(block);
     if(machineSet.size() == 0) {
       return 0;
@@ -799,21 +811,25 @@ public class FSNamesystem implements FSC
   public void setPermission(String src, FsPermission permission)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
+    HdfsFileStatus resultingStat = null;
     writeLock();
     try {
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot set permission for " + src, safeMode);
-    checkOwner(src);
-    dir.setPermission(src, permission);
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot set permission for " + src, safeMode);
+      }
+      checkOwner(src);
+      dir.setPermission(src, permission);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(src, false);
+      }
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
-                    "setPermission", src, null, stat);
-    }
-    } finally {
-      writeUnlock();
+                    "setPermission", src, null, resultingStat);
     }
   }
 
@@ -824,30 +840,34 @@ public class FSNamesystem implements FSC
   public void setOwner(String src, String username, String group)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
+    HdfsFileStatus resultingStat = null;
     writeLock();
     try {
-    if (isInSafeMode())
+      if (isInSafeMode()) {
         throw new SafeModeException("Cannot set owner for " + src, safeMode);
-    FSPermissionChecker pc = checkOwner(src);
-    if (!pc.isSuper) {
-      if (username != null && !pc.user.equals(username)) {
-        throw new AccessControlException("Non-super user cannot change owner.");
       }
-      if (group != null && !pc.containsGroup(group)) {
-        throw new AccessControlException("User does not belong to " + group
+      FSPermissionChecker pc = checkOwner(src);
+      if (!pc.isSuper) {
+        if (username != null && !pc.user.equals(username)) {
+          throw new AccessControlException("Non-super user cannot change owner.");
+        }
+        if (group != null && !pc.containsGroup(group)) {
+          throw new AccessControlException("User does not belong to " + group
             + " .");
+        }
+      }
+      dir.setOwner(src, username, group);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(src, false);
       }
+    } finally {
+      writeUnlock();
     }
-    dir.setOwner(src, username, group);
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
-                    "setOwner", src, null, stat);
-    }
-    } finally {
-      writeUnlock();
+                    "setOwner", src, null, resultingStat);
     }
   }
 
@@ -876,7 +896,7 @@ public class FSNamesystem implements FSC
   /**
    * Get block locations within the specified range.
    * @see ClientProtocol#getBlockLocations(String, long, long)
-   * @throws FileNotFoundException
+   * @throws FileNotFoundException, UnresolvedLinkException, IOException
    */
   LocatedBlocks getBlockLocations(String src, long offset, long length,
       boolean doAccessTime, boolean needBlockToken) throws FileNotFoundException,
@@ -893,7 +913,7 @@ public class FSNamesystem implements FSC
       throw new HadoopIllegalArgumentException(
           "Negative length is not supported. File: " + src);
     }
-    final LocatedBlocks ret = getBlockLocationsInternal(src,
+    final LocatedBlocks ret = getBlockLocationsUpdateTimes(src,
         offset, length, doAccessTime, needBlockToken);  
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
@@ -903,7 +923,11 @@ public class FSNamesystem implements FSC
     return ret;
   }
 
-  private LocatedBlocks getBlockLocationsInternal(String src,
+  /*
+   * Get block locations within the specified range, updating the
+   * access times if necessary. 
+   */
+  private LocatedBlocks getBlockLocationsUpdateTimes(String src,
                                                        long offset, 
                                                        long length,
                                                        boolean doAccessTime, 
@@ -954,8 +978,7 @@ public class FSNamesystem implements FSC
   LocatedBlocks getBlockLocationsInternal(INodeFile inode,
       long offset, long length, boolean needBlockToken)
   throws IOException {
-    readLock();
-    try {
+    assert hasReadOrWriteLock();
     final BlockInfo[] blocks = inode.getBlocks();
     if (LOG.isDebugEnabled()) {
       LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
@@ -987,9 +1010,6 @@ public class FSNamesystem implements FSC
       return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
           lastBlock, last.isComplete());
     }
-    } finally {
-      readUnlock();
-    }
   }
 
   /** Create a LocatedBlock. */
@@ -1021,146 +1041,152 @@ public class FSNamesystem implements FSC
    * @throws IOException
    */
   public void concat(String target, String [] srcs) 
-    throws IOException, UnresolvedLinkException {
+      throws IOException, UnresolvedLinkException {
     if(FSNamesystem.LOG.isDebugEnabled()) {
       FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
           " to " + target);
     }
-    // check safe mode
-    if (isInSafeMode()) {
-      throw new SafeModeException("concat: cannot concat " + target, safeMode);
-    }
     
     // verify args
     if(target.isEmpty()) {
-      throw new IllegalArgumentException("concat: trg file name is empty");
+      throw new IllegalArgumentException("Target file name is empty");
     }
     if(srcs == null || srcs.length == 0) {
-      throw new IllegalArgumentException("concat: srcs list is empty or null");
+      throw new IllegalArgumentException("No sources given");
     }
     
-    // currently we require all the files to be in the same dir
+    // We require all files be in the same directory
     String trgParent = 
       target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
-    for(String s : srcs) {
+    for (String s : srcs) {
       String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
-      if(! srcParent.equals(trgParent)) {
-        throw new IllegalArgumentException
-           ("concat:  srcs and target shoould be in same dir");
+      if (!srcParent.equals(trgParent)) {
+        throw new IllegalArgumentException(
+           "Sources and target are not in the same directory");
       }
     }
-    
+
+    HdfsFileStatus resultingStat = null;
     writeLock();
     try {
-      // write permission for the target
-      if (isPermissionEnabled) {
-        checkPathAccess(target, FsAction.WRITE);
-
-        // and srcs
-        for(String aSrc: srcs) {
-          checkPathAccess(aSrc, FsAction.READ); // read the file
-          checkParentAccess(aSrc, FsAction.WRITE); // for delete 
-        }
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot concat " + target, safeMode);
       }
+      concatInternal(target, srcs);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(target, false);
+      }
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(UserGroupInformation.getLoginUser(),
+                    Server.getRemoteIp(),
+                    "concat", Arrays.toString(srcs), target, resultingStat);
+    }
+  }
 
+  /** See {@link #concat(String, String[])} */
+  public void concatInternal(String target, String [] srcs) 
+      throws IOException, UnresolvedLinkException {
+    assert hasWriteLock();
 
-      // to make sure no two files are the same
-      Set<INode> si = new HashSet<INode>();
-
-      // we put the following prerequisite for the operation
-      // replication and blocks sizes should be the same for ALL the blocks
-      // check the target
-      INode inode = dir.getFileINode(target);
+    // write permission for the target
+    if (isPermissionEnabled) {
+      checkPathAccess(target, FsAction.WRITE);
 
-      if(inode == null) {
-        throw new IllegalArgumentException("concat: trg file doesn't exist");
-      }
-      if(inode.isUnderConstruction()) {
-        throw new IllegalArgumentException("concat: trg file is uner construction");
+      // and srcs
+      for(String aSrc: srcs) {
+        checkPathAccess(aSrc, FsAction.READ); // read the file
+        checkParentAccess(aSrc, FsAction.WRITE); // for delete 
       }
+    }
 
-      INodeFile trgInode = (INodeFile) inode;
+    // to make sure no two files are the same
+    Set<INode> si = new HashSet<INode>();
 
-      // per design trg shouldn't be empty and all the blocks same size
-      if(trgInode.blocks.length == 0) {
-        throw new IllegalArgumentException("concat: "+ target + " file is empty");
-      }
+    // we put the following prerequisite for the operation
+    // replication and blocks sizes should be the same for ALL the blocks
+    // check the target
+    INode inode = dir.getFileINode(target);
 
-      long blockSize = trgInode.getPreferredBlockSize();
+    if(inode == null) {
+      throw new IllegalArgumentException("concat: trg file doesn't exist");
+    }
+    if(inode.isUnderConstruction()) {
+      throw new IllegalArgumentException("concat: trg file is uner construction");
+    }
 
-      // check the end block to be full
-      if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) {
-        throw new IllegalArgumentException(target + " blocks size should be the same");
-      }
+    INodeFile trgInode = (INodeFile) inode;
 
-      si.add(trgInode);
-      short repl = trgInode.getReplication();
+    // per design trg shouldn't be empty and all the blocks same size
+    if(trgInode.blocks.length == 0) {
+      throw new IllegalArgumentException("concat: "+ target + " file is empty");
+    }
 
-      // now check the srcs
-      boolean endSrc = false; // final src file doesn't have to have full end block
-      for(int i=0; i<srcs.length; i++) {
-        String src = srcs[i];
-        if(i==srcs.length-1)
-          endSrc=true;
+    long blockSize = trgInode.getPreferredBlockSize();
 
-        INodeFile srcInode = dir.getFileINode(src);
+    // check the end block to be full
+    if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) {
+      throw new IllegalArgumentException(target + " blocks size should be the same");
+    }
 
-        if(src.isEmpty() 
-            || srcInode == null
-            || srcInode.isUnderConstruction()
-            || srcInode.blocks.length == 0) {
-          throw new IllegalArgumentException("concat: file " + src + 
-          " is invalid or empty or underConstruction");
-        }
+    si.add(trgInode);
+    short repl = trgInode.getReplication();
 
-        // check replication and blocks size
-        if(repl != srcInode.getReplication()) {
-          throw new IllegalArgumentException(src + " and " + target + " " +
-              "should have same replication: "
-              + repl + " vs. " + srcInode.getReplication());
-        }
+    // now check the srcs
+    boolean endSrc = false; // final src file doesn't have to have full end block
+    for(int i=0; i<srcs.length; i++) {
+      String src = srcs[i];
+      if(i==srcs.length-1)
+        endSrc=true;
 
-        //boolean endBlock=false;
-        // verify that all the blocks are of the same length as target
-        // should be enough to check the end blocks
-        int idx = srcInode.blocks.length-1;
-        if(endSrc)
-          idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full
-        if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) {
-          throw new IllegalArgumentException("concat: blocks sizes of " + 
-              src + " and " + target + " should all be the same");
-        }
+      INodeFile srcInode = dir.getFileINode(src);
 
-        si.add(srcInode);
+      if(src.isEmpty() 
+          || srcInode == null
+          || srcInode.isUnderConstruction()
+          || srcInode.blocks.length == 0) {
+        throw new IllegalArgumentException("concat: file " + src + 
+        " is invalid or empty or underConstruction");
       }
 
-      // make sure no two files are the same
-      if(si.size() < srcs.length+1) { // trg + srcs
-        // it means at least two files are the same
-        throw new IllegalArgumentException("at least two files are the same");
+      // check replication and blocks size
+      if(repl != srcInode.getReplication()) {
+        throw new IllegalArgumentException(src + " and " + target + " " +
+            "should have same replication: "
+            + repl + " vs. " + srcInode.getReplication());
       }
 
-      if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " + 
-            Arrays.toString(srcs) + " to " + target);
+      //boolean endBlock=false;
+      // verify that all the blocks are of the same length as target
+      // should be enough to check the end blocks
+      int idx = srcInode.blocks.length-1;
+      if(endSrc)
+        idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full
+      if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) {
+        throw new IllegalArgumentException("concat: blocks sizes of " + 
+            src + " and " + target + " should all be the same");
       }
 
-      dir.concatInternal(target,srcs);
-    } finally {
-      writeUnlock();
+      si.add(srcInode);
     }
-    getEditLog().logSync();
-   
-    
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      final HdfsFileStatus stat = dir.getFileInfo(target, false);
-      logAuditEvent(UserGroupInformation.getLoginUser(),
-                    Server.getRemoteIp(),
-                    "concat", Arrays.toString(srcs), target, stat);
+
+    // make sure no two files are the same
+    if(si.size() < srcs.length+1) { // trg + srcs
+      // it means at least two files are the same
+      throw new IllegalArgumentException("at least two files are the same");
     }
-   
-  }
 
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " + 
+          Arrays.toString(srcs) + " to " + target);
+    }
+
+    dir.concat(target,srcs);
+  }
+  
   /**
    * stores the modification and access time for this inode. 
    * The access time is precise upto an hour. The transaction, if needed, is
@@ -1174,23 +1200,22 @@ public class FSNamesystem implements FSC
     }
     writeLock();
     try {
-    //
-    // The caller needs to have write access to set access & modification times.
-    if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.WRITE);
-    }
-    INodeFile inode = dir.getFileINode(src);
-    if (inode != null) {
-      dir.setTimes(src, inode, mtime, atime, true);
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-        final HdfsFileStatus stat = dir.getFileInfo(src, false);
-        logAuditEvent(UserGroupInformation.getCurrentUser(),
-                      Server.getRemoteIp(),
-                      "setTimes", src, null, stat);
+      // Write access is required to set access and modification times
+      if (isPermissionEnabled) {
+        checkPathAccess(src, FsAction.WRITE);
+      }
+      INodeFile inode = dir.getFileINode(src);
+      if (inode != null) {
+        dir.setTimes(src, inode, mtime, atime, true);
+        if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+          final HdfsFileStatus stat = dir.getFileInfo(src, false);
+          logAuditEvent(UserGroupInformation.getCurrentUser(),
+                        Server.getRemoteIp(),
+                        "setTimes", src, null, stat);
+        }
+      } else {
+        throw new FileNotFoundException("File " + src + " does not exist.");
       }
-    } else {
-      throw new FileNotFoundException("File " + src + " does not exist.");
-    }
     } finally {
       writeUnlock();
     }
@@ -1202,21 +1227,24 @@ public class FSNamesystem implements FSC
   public void createSymlink(String target, String link,
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
+    HdfsFileStatus resultingStat = null;
     writeLock();
     try {
-    if (!createParent) {
-      verifyParentDir(link);
-    }
-    createSymlinkInternal(target, link, dirPerms, createParent);
+      if (!createParent) {
+        verifyParentDir(link);
+      }
+      createSymlinkInternal(target, link, dirPerms, createParent);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(link, false);
+      }
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      final HdfsFileStatus stat = dir.getFileInfo(link, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
-                    "createSymlink", link, target, stat);
+                    "createSymlink", link, target, resultingStat);
     }
   }
 
@@ -1226,13 +1254,11 @@ public class FSNamesystem implements FSC
   private void createSymlinkInternal(String target, String link,
       PermissionStatus dirPerms, boolean createParent)
       throws IOException, UnresolvedLinkException {
-    writeLock();
-    try {
+    assert hasWriteLock();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target=" + 
         target + " link=" + link);
     }
-
     if (isInSafeMode()) {
       throw new SafeModeException("Cannot create symlink " + link, safeMode);
     }
@@ -1251,9 +1277,6 @@ public class FSNamesystem implements FSC
 
     // add symbolic link to namespace
     dir.addSymlink(link, target, dirPerms, createParent);
-    } finally {
-      writeUnlock();
-    }
   }
 
   /**
@@ -1271,7 +1294,16 @@ public class FSNamesystem implements FSC
    */
   public boolean setReplication(String src, short replication) 
     throws IOException, UnresolvedLinkException {
-    boolean status = setReplicationInternal(src, replication);
+    boolean status = false;
+    writeLock();
+    try {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot set replication for " + src, safeMode);
+      }
+      status = setReplicationInternal(src, replication);
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
@@ -1284,10 +1316,7 @@ public class FSNamesystem implements FSC
   private boolean setReplicationInternal(String src,
       short replication) throws AccessControlException, QuotaExceededException,
       SafeModeException, UnresolvedLinkException, IOException {
-    writeLock();
-    try {
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot set replication for " + src, safeMode);
+    assert hasWriteLock();
     blockManager.verifyReplication(src, replication, null);
     if (isPermissionEnabled) {
       checkPathAccess(src, FsAction.WRITE);
@@ -1317,17 +1346,19 @@ public class FSNamesystem implements FSC
           + ". New replication is " + replication);
     }
     return true;
-    } finally {
-      writeUnlock();
-    }
   }
     
   long getPreferredBlockSize(String filename) 
-    throws IOException, UnresolvedLinkException {
-    if (isPermissionEnabled) {
-      checkTraverse(filename);
+      throws IOException, UnresolvedLinkException {
+    readLock();
+    try {
+      if (isPermissionEnabled) {
+        checkTraverse(filename);
+      }
+      return dir.getPreferredBlockSize(filename);
+    } finally {
+      readUnlock();
     }
-    return dir.getPreferredBlockSize(filename);
   }
 
   /*
@@ -1335,6 +1366,7 @@ public class FSNamesystem implements FSC
    */
   private void verifyParentDir(String src) throws FileNotFoundException,
       ParentNotDirectoryException, UnresolvedLinkException {
+    assert hasReadOrWriteLock();
     Path parent = new Path(src).getParent();
     if (parent != null) {
       INode[] pathINodes = dir.getExistingPathINodes(parent.toString());
@@ -1360,8 +1392,13 @@ public class FSNamesystem implements FSC
       short replication, long blockSize) throws AccessControlException,
       SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
       FileNotFoundException, ParentNotDirectoryException, IOException {
-    startFileInternal(src, permissions, holder, clientMachine, flag,
-        createParent, replication, blockSize);
+    writeLock();
+    try {
+      startFileInternal(src, permissions, holder, clientMachine, flag,
+          createParent, replication, blockSize);
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
@@ -1393,6 +1430,7 @@ public class FSNamesystem implements FSC
       long blockSize) throws SafeModeException, FileAlreadyExistsException,
       AccessControlException, UnresolvedLinkException, FileNotFoundException,
       ParentNotDirectoryException, IOException {
+    assert hasWriteLock();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
           + ", holder=" + holder
@@ -1401,10 +1439,9 @@ public class FSNamesystem implements FSC
           + ", replication=" + replication
           + ", createFlag=" + flag.toString());
     }
-    writeLock();
-    try {
-    if (isInSafeMode())
+    if (isInSafeMode()) {
       throw new SafeModeException("Cannot create file" + src, safeMode);
+    }
     if (!DFSUtil.isValidName(src)) {
       throw new InvalidPathException(src);
     }
@@ -1512,9 +1549,6 @@ public class FSNamesystem implements FSC
                                    +ie.getMessage());
       throw ie;
     }
-    } finally {
-      writeUnlock();
-    }
     return null;
   }
 
@@ -1529,35 +1563,41 @@ public class FSNamesystem implements FSC
    * @return true if the file is already closed
    * @throws IOException
    */
-  synchronized boolean recoverLease(String src, String holder, String clientMachine)
-  throws IOException {
-    if (isInSafeMode()) {
-      throw new SafeModeException(
-          "Cannot recover the lease of " + src, safeMode);
-    }
-    if (!DFSUtil.isValidName(src)) {
-      throw new IOException("Invalid file name: " + src);
-    }
-
-    INode inode = dir.getFileINode(src);
-    if (inode == null) {
-      throw new FileNotFoundException("File not found " + src);
-    }
-
-    if (!inode.isUnderConstruction()) {
-      return true;
-    }
-    if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.WRITE);
+  boolean recoverLease(String src, String holder, String clientMachine)
+      throws IOException {
+    writeLock();
+    try {
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+            "Cannot recover the lease of " + src, safeMode);
+      }
+      if (!DFSUtil.isValidName(src)) {
+        throw new IOException("Invalid file name: " + src);
+      }
+  
+      INode inode = dir.getFileINode(src);
+      if (inode == null) {
+        throw new FileNotFoundException("File not found " + src);
+      }
+  
+      if (!inode.isUnderConstruction()) {
+        return true;
+      }
+      if (isPermissionEnabled) {
+        checkPathAccess(src, FsAction.WRITE);
+      }
+  
+      recoverLeaseInternal(inode, src, holder, clientMachine, true);
+    } finally {
+      writeUnlock();
     }
-
-    recoverLeaseInternal(inode, src, holder, clientMachine, true);
     return false;
   }
 
   private void recoverLeaseInternal(INode fileInode, 
       String src, String holder, String clientMachine, boolean force)
-  throws IOException {
+      throws IOException {
+    assert hasWriteLock();
     if (fileInode != null && fileInode.isUnderConstruction()) {
       INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode;
       //
@@ -1643,12 +1683,16 @@ public class FSNamesystem implements FSC
       throw new UnsupportedOperationException("Append to hdfs not supported." +
                             " Please refer to dfs.support.append configuration parameter.");
     }
-    LocatedBlock lb = 
-      startFileInternal(src, null, holder, clientMachine, 
+    LocatedBlock lb = null;
+    writeLock();
+    try {
+      lb = startFileInternal(src, null, holder, clientMachine, 
                         EnumSet.of(CreateFlag.APPEND), 
                         false, (short)blockManager.maxReplication, (long)0);
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
-
     if (lb != null) {
       if (NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
@@ -1657,7 +1701,6 @@ public class FSNamesystem implements FSC
             +" block size " + lb.getBlock().getNumBytes());
       }
     }
-
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
@@ -1749,6 +1792,9 @@ public class FSNamesystem implements FSC
     // Allocate a new block and record it in the INode. 
     writeLock();
     try {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot add block to " + src, safeMode);
+      }
       INode[] pathINodes = dir.getExistingPathINodes(src);
       int inodesLen = pathINodes.length;
       checkLease(src, clientName, pathINodes[inodesLen-1]);
@@ -1768,7 +1814,7 @@ public class FSNamesystem implements FSC
     } finally {
       writeUnlock();
     }
-        
+
     // Create next block
     LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, fileLength);
     if (isBlockTokenEnabled) {
@@ -1788,6 +1834,7 @@ public class FSNamesystem implements FSC
 
     final DatanodeDescriptor clientnode;
     final long preferredblocksize;
+    final List<DatanodeDescriptor> chosen;
     readLock();
     try {
       //check safe mode
@@ -1800,17 +1847,17 @@ public class FSNamesystem implements FSC
       final INodeFileUnderConstruction file = checkLease(src, clientName);
       clientnode = file.getClientNode();
       preferredblocksize = file.getPreferredBlockSize();
-    } finally {
-      readUnlock();
-    }
 
-    //find datanode descriptors
-    final List<DatanodeDescriptor> chosen = new ArrayList<DatanodeDescriptor>();
-    for(DatanodeInfo d : existings) {
-      final DatanodeDescriptor descriptor = getDatanode(d);
-      if (descriptor != null) {
-        chosen.add(descriptor);
+      //find datanode descriptors
+      chosen = new ArrayList<DatanodeDescriptor>();
+      for(DatanodeInfo d : existings) {
+        final DatanodeDescriptor descriptor = getDatanode(d);
+        if (descriptor != null) {
+          chosen.add(descriptor);
+        }
       }
+    } finally {
+      readUnlock();
     }
 
     // choose new datanodes.
@@ -1833,21 +1880,24 @@ public class FSNamesystem implements FSC
       UnresolvedLinkException, IOException {
     writeLock();
     try {
-    //
-    // Remove the block from the pending creates list
-    //
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
-                                    +b+"of file "+src);
-    }
-    INodeFileUnderConstruction file = checkLease(src, holder);
-    dir.removeBlock(src, file, ExtendedBlock.getLocalBlock(b));
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
-                                    + b
-                                    + " is removed from pendingCreates");
-    }
-    return true;
+      //
+      // Remove the block from the pending creates list
+      //
+      if(NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+                                      +b+"of file "+src);
+      }
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot abandon block " + b +
+                                    " for fle" + src, safeMode);
+      }
+      INodeFileUnderConstruction file = checkLease(src, holder);
+      dir.removeBlock(src, file, ExtendedBlock.getLocalBlock(b));
+      if(NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+                                      + b + " is removed from pendingCreates");
+      }
+      return true;
     } finally {
       writeUnlock();
     }
@@ -1855,7 +1905,8 @@ public class FSNamesystem implements FSC
   
   // make sure that we still have the lease on this file.
   private INodeFileUnderConstruction checkLease(String src, String holder) 
-    throws LeaseExpiredException, UnresolvedLinkException {
+      throws LeaseExpiredException, UnresolvedLinkException {
+    assert hasReadOrWriteLock();
     INodeFile file = dir.getFileINode(src);
     checkLease(src, holder, file);
     return (INodeFileUnderConstruction)file;
@@ -1863,7 +1914,7 @@ public class FSNamesystem implements FSC
 
   private void checkLease(String src, String holder, INode file)
       throws LeaseExpiredException {
-
+    assert hasReadOrWriteLock();
     if (file == null || file.isDirectory()) {
       Lease lease = leaseManager.getLease(holder);
       throw new LeaseExpiredException("No lease on " + src +
@@ -1896,23 +1947,29 @@ public class FSNamesystem implements FSC
   public boolean completeFile(String src, String holder, ExtendedBlock last) 
     throws SafeModeException, UnresolvedLinkException, IOException {
     checkBlock(last);
-    boolean success = completeFileInternal(src, holder, 
+    boolean success = false;
+    writeLock();
+    try {
+      success = completeFileInternal(src, holder, 
         ExtendedBlock.getLocalBlock(last));
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
-    return success ;
+    return success;
   }
 
   private boolean completeFileInternal(String src, 
       String holder, Block last) throws SafeModeException,
       UnresolvedLinkException, IOException {
-    writeLock();
-    try {
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
+    assert hasWriteLock();
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
           src + " for " + holder);
     }
-    if (isInSafeMode())
+    if (isInSafeMode()) {
       throw new SafeModeException("Cannot complete file " + src, safeMode);
+    }
 
     INodeFileUnderConstruction pendingFile = checkLease(src, holder);
     // commit the last block and complete it if it has minimum replicas
@@ -1927,9 +1984,6 @@ public class FSNamesystem implements FSC
     NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: file " + src
                                   + " is closed by " + holder);
     return true;
-    } finally {
-      writeUnlock();
-    }
   }
 
   /** 
@@ -1958,6 +2012,7 @@ public class FSNamesystem implements FSC
    */
   private Block allocateBlock(String src, INode[] inodes,
       DatanodeDescriptor targets[]) throws QuotaExceededException {
+    assert hasWriteLock();
     Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0); 
     while(isValidBlock(b)) {
       b.setBlockId(FSNamesystem.randBlockId.nextLong());
@@ -1975,35 +2030,35 @@ public class FSNamesystem implements FSC
    * all blocks, otherwise check only penultimate block.
    */
   boolean checkFileProgress(INodeFile v, boolean checkall) {
-    writeLock();
+    readLock();
     try {
-    if (checkall) {
-      //
-      // check all blocks of the file.
-      //
-      for (BlockInfo block: v.getBlocks()) {
-        if (!block.isComplete()) {
+      if (checkall) {
+        //
+        // check all blocks of the file.
+        //
+        for (BlockInfo block: v.getBlocks()) {
+          if (!block.isComplete()) {
+            LOG.info("BLOCK* NameSystem.checkFileProgress: "
+                + "block " + block + " has not reached minimal replication "
+                + blockManager.minReplication);
+            return false;
+          }
+        }
+      } else {
+        //
+        // check the penultimate block of this file
+        //
+        BlockInfo b = v.getPenultimateBlock();
+        if (b != null && !b.isComplete()) {
           LOG.info("BLOCK* NameSystem.checkFileProgress: "
-              + "block " + block + " has not reached minimal replication "
+              + "block " + b + " has not reached minimal replication "
               + blockManager.minReplication);
           return false;
         }
       }
-    } else {
-      //
-      // check the penultimate block of this file
-      //
-      BlockInfo b = v.getPenultimateBlock();
-      if (b != null && !b.isComplete()) {
-        LOG.info("BLOCK* NameSystem.checkFileProgress: "
-            + "block " + b + " has not reached minimal replication "
-            + blockManager.minReplication);
-        return false;
-      }
-    }
-    return true;
+      return true;
     } finally {
-      writeUnlock();
+      readUnlock();
     }
   }
 
@@ -2042,13 +2097,26 @@ public class FSNamesystem implements FSC
   @Deprecated
   boolean renameTo(String src, String dst) 
     throws IOException, UnresolvedLinkException {
-    boolean status = renameToInternal(src, dst);
+    boolean status = false;
+    HdfsFileStatus resultingStat = null;
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
+          " to " + dst);
+    }
+    writeLock();
+    try {
+      status = renameToInternal(src, dst);
+      if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(dst, false);
+      }
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
-      final HdfsFileStatus stat = dir.getFileInfo(dst, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
-                    "rename", src, dst, stat);
+                    "rename", src, dst, resultingStat);
     }
     return status;
   }
@@ -2057,19 +2125,13 @@ public class FSNamesystem implements FSC
   @Deprecated
   private boolean renameToInternal(String src, String dst)
     throws IOException, UnresolvedLinkException {
-
-    writeLock();
-    try {
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
-          " to " + dst);
-    }
-    if (isInSafeMode())
+    assert hasWriteLock();
+    if (isInSafeMode()) {
       throw new SafeModeException("Cannot rename " + src, safeMode);
+    }
     if (!DFSUtil.isValidName(dst)) {
       throw new IOException("Invalid name: " + dst);
     }
-
     if (isPermissionEnabled) {
       //We should not be doing this.  This is move() not renameTo().
       //but for now,
@@ -2081,40 +2143,44 @@ public class FSNamesystem implements FSC
 
     HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
     if (dir.renameTo(src, dst)) {
-      changeLease(src, dst, dinfo);     // update lease with new filename
+      unprotectedChangeLease(src, dst, dinfo);     // update lease with new filename
       return true;
     }
     return false;
-    } finally {
-      writeUnlock();
-    }
   }
   
 
   /** Rename src to dst */
   void renameTo(String src, String dst, Options.Rename... options)
       throws IOException, UnresolvedLinkException {
-    renameToInternal(src, dst, options);
+    HdfsFileStatus resultingStat = null;
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
+          + src + " to " + dst);
+    }
+    writeLock();
+    try {
+      renameToInternal(src, dst, options);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(dst, false); 
+      }
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       StringBuilder cmd = new StringBuilder("rename options=");
       for (Rename option : options) {
         cmd.append(option.value()).append(" ");
       }
-      final HdfsFileStatus stat = dir.getFileInfo(dst, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(), Server.getRemoteIp(),
-                    cmd.toString(), src, dst, stat);
+                    cmd.toString(), src, dst, resultingStat);
     }
   }
 
   private void renameToInternal(String src, String dst,
       Options.Rename... options) throws IOException {
-    writeLock();
-    try {
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
-          + src + " to " + dst);
-    }
+    assert hasWriteLock();
     if (isInSafeMode()) {
       throw new SafeModeException("Cannot rename " + src, safeMode);
     }
@@ -2128,10 +2194,7 @@ public class FSNamesystem implements FSC
 
     HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
     dir.renameTo(src, dst, options);
-    changeLease(src, dst, dinfo); // update lease with new filename
-    } finally {
-      writeUnlock();
-    }
+    unprotectedChangeLease(src, dst, dinfo); // update lease with new filename
   }
   
   /**
@@ -2141,15 +2204,12 @@ public class FSNamesystem implements FSC
    * description of exceptions
    */
     public boolean delete(String src, boolean recursive)
-      throws AccessControlException, SafeModeException,
-      UnresolvedLinkException, IOException {
-      if ((!recursive) && (!dir.isDirEmpty(src))) {
-        throw new IOException(src + " is non empty");
-      }
+        throws AccessControlException, SafeModeException,
+               UnresolvedLinkException, IOException {
       if (NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
       }
-      boolean status = deleteInternal(src, true);
+      boolean status = deleteInternal(src, recursive, true);
       if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(UserGroupInformation.getCurrentUser(),
                       Server.getRemoteIp(),
@@ -2169,9 +2229,10 @@ public class FSNamesystem implements FSC
    * 
    * @see ClientProtocol#delete(String, boolean) for description of exceptions
    */
-  private boolean deleteInternal(String src, boolean enforcePermission)
-      throws AccessControlException, SafeModeException,
-      UnresolvedLinkException, IOException{
+  private boolean deleteInternal(String src, boolean recursive,
+      boolean enforcePermission)
+      throws AccessControlException, SafeModeException, UnresolvedLinkException,
+             IOException {
     boolean deleteNow = false;
     ArrayList<Block> collectedBlocks = new ArrayList<Block>();
 
@@ -2180,6 +2241,9 @@ public class FSNamesystem implements FSC
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot delete " + src, safeMode);
       }
+      if (!recursive && !dir.isDirEmpty(src)) {
+        throw new IOException(src + " is non empty");
+      }
       if (enforcePermission && isPermissionEnabled) {
         checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
       }
@@ -2194,10 +2258,16 @@ public class FSNamesystem implements FSC
     } finally {
       writeUnlock();
     }
-    // Log directory deletion to editlog
+
     getEditLog().logSync();
-    if (!deleteNow) {
-      removeBlocks(collectedBlocks); // Incremental deletion of blocks
+
+    writeLock();
+    try {
+      if (!deleteNow) {
+        removeBlocks(collectedBlocks); // Incremental deletion of blocks
+      }
+    } finally {
+      writeUnlock();
     }
     collectedBlocks.clear();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2209,24 +2279,21 @@ public class FSNamesystem implements FSC
 
   /** From the given list, incrementally remove the blocks from blockManager */
   private void removeBlocks(List<Block> blocks) {
+    assert hasWriteLock();
     int start = 0;
     int end = 0;
     while (start < blocks.size()) {
       end = BLOCK_DELETION_INCREMENT + start;
       end = end > blocks.size() ? blocks.size() : end;
-      writeLock();
-      try {
-        for (int i=start; i<end; i++) {
-          blockManager.removeBlock(blocks.get(i));
-        }
-      } finally {
-        writeUnlock();
+      for (int i=start; i<end; i++) {
+        blockManager.removeBlock(blocks.get(i));
       }
       start = end;
     }
   }
   
   void removePathAndBlocks(String src, List<Block> blocks) {
+    assert hasWriteLock();
     leaseManager.removeLeaseWithPrefixPath(src);
     if (blocks == null) {
       return;
@@ -2249,13 +2316,18 @@ public class FSNamesystem implements FSC
    */
   HdfsFileStatus getFileInfo(String src, boolean resolveLink) 
     throws AccessControlException, UnresolvedLinkException {
-    if (!DFSUtil.isValidName(src)) {
-      throw new InvalidPathException("Invalid file name: " + src);
-    }
-    if (isPermissionEnabled) {
-      checkTraverse(src);
+    readLock();
+    try {
+      if (!DFSUtil.isValidName(src)) {
+        throw new InvalidPathException("Invalid file name: " + src);
+      }
+      if (isPermissionEnabled) {
+        checkTraverse(src);
+      }
+      return dir.getFileInfo(src, resolveLink);
+    } finally {
+      readUnlock();
     }
-    return dir.getFileInfo(src, resolveLink);
   }
 
   /**
@@ -2263,7 +2335,16 @@ public class FSNamesystem implements FSC
    */
   public boolean mkdirs(String src, PermissionStatus permissions,
       boolean createParent) throws IOException, UnresolvedLinkException {
-    boolean status = mkdirsInternal(src, permissions, createParent);
+    boolean status = false;
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
+    }
+    writeLock();
+    try {
+      status = mkdirsInternal(src, permissions, createParent);
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
@@ -2280,10 +2361,9 @@ public class FSNamesystem implements FSC
   private boolean mkdirsInternal(String src,
       PermissionStatus permissions, boolean createParent) 
       throws IOException, UnresolvedLinkException {
-    writeLock();
-    try {
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
+    assert hasWriteLock();
+    if (isInSafeMode()) {
+      throw new SafeModeException("Cannot create directory " + src, safeMode);
     }
     if (isPermissionEnabled) {
       checkTraverse(src);
@@ -2293,15 +2373,12 @@ public class FSNamesystem implements FSC
       // a new directory is not created.
       return true;
     }
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot create directory " + src, safeMode);
     if (!DFSUtil.isValidName(src)) {
       throw new InvalidPathException(src);
     }
     if (isPermissionEnabled) {
       checkAncestorAccess(src, FsAction.WRITE);
     }
-
     if (!createParent) {
       verifyParentDir(src);
     }
@@ -2315,17 +2392,19 @@ public class FSNamesystem implements FSC
       throw new IOException("Failed to create directory: " + src);
     }
     return true;
-    } finally {
-      writeUnlock();
-    }
   }
 
   ContentSummary getContentSummary(String src) throws AccessControlException,
       FileNotFoundException, UnresolvedLinkException {
-    if (isPermissionEnabled) {
-      checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
+    readLock();
+    try {
+      if (isPermissionEnabled) {
+        checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
+      }
+      return dir.getContentSummary(src);
+    } finally {
+      readUnlock();
     }
-    return dir.getContentSummary(src);
   }
 
   /**
@@ -2335,12 +2414,18 @@ public class FSNamesystem implements FSC
    */
   void setQuota(String path, long nsQuota, long dsQuota) 
       throws IOException, UnresolvedLinkException {
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot set quota on " + path, safeMode);
-    if (isPermissionEnabled) {
-      checkSuperuserPrivilege();
+    writeLock();
+    try {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot set quota on " + path, safeMode);
+      }
+      if (isPermissionEnabled) {
+        checkSuperuserPrivilege();
+      }
+      dir.setQuota(path, nsQuota, dsQuota);
+    } finally {
+      writeUnlock();
     }
-    dir.setQuota(path, nsQuota, dsQuota);
     getEditLog().logSync();
   }
   
@@ -2351,7 +2436,6 @@ public class FSNamesystem implements FSC
    */
   void fsync(String src, String clientName) 
       throws IOException, UnresolvedLinkException {
-
     NameNode.stateChangeLog.info("BLOCK* NameSystem.fsync: file "
                                   + src + " for " + clientName);
     writeLock();
@@ -2364,6 +2448,7 @@ public class FSNamesystem implements FSC
     } finally {
       writeUnlock();
     }
+    getEditLog().logSync();
   }
 
   /**
@@ -2383,9 +2468,8 @@ public class FSNamesystem implements FSC
       String recoveryLeaseHolder) throws AlreadyBeingCreatedException, 
       IOException, UnresolvedLinkException {
     LOG.info("Recovering lease=" + lease + ", src=" + src);
-    
     assert !isInSafeMode();
-
+    assert hasWriteLock();
     INodeFile iFile = dir.getFileINode(src);
     if (iFile == null) {
       final String message = "DIR* NameSystem.internalReleaseLease: "
@@ -2507,6 +2591,7 @@ public class FSNamesystem implements FSC
 
   Lease reassignLease(Lease lease, String src, String newHolder,
       INodeFileUnderConstruction pendingFile) throws IOException {
+    assert hasWriteLock();
     if(newHolder == null)
       return lease;
     logReassignLease(lease.getHolder(), src, newHolder);
@@ -2515,6 +2600,7 @@ public class FSNamesystem implements FSC
   
   Lease reassignLeaseInternal(Lease lease, String src, String newHolder,
       INodeFileUnderConstruction pendingFile) throws IOException {
+    assert hasWriteLock();
     pendingFile.setClientName(newHolder);
     return leaseManager.reassignLease(lease, src, newHolder);
   }
@@ -2523,7 +2609,7 @@ public class FSNamesystem implements FSC
   private void finalizeINodeFileUnderConstruction(String src, 
       INodeFileUnderConstruction pendingFile) 
       throws IOException, UnresolvedLinkException {
-      
+    assert hasWriteLock();
     leaseManager.removeLease(pendingFile.getClientName(), src);
 
     // The file is no longer pending.
@@ -2544,92 +2630,96 @@ public class FSNamesystem implements FSC
     String src = "";
     writeLock();
     try {
-    LOG.info("commitBlockSynchronization(lastblock=" + lastblock
-          + ", newgenerationstamp=" + newgenerationstamp
-          + ", newlength=" + newlength
-          + ", newtargets=" + Arrays.asList(newtargets)
-          + ", closeFile=" + closeFile
-          + ", deleteBlock=" + deleteblock
-          + ")");
-    final BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+          "Cannot commitBlockSynchronization while in safe mode",
+          safeMode);
+      }
+      LOG.info("commitBlockSynchronization(lastblock=" + lastblock
+               + ", newgenerationstamp=" + newgenerationstamp
+               + ", newlength=" + newlength
+               + ", newtargets=" + Arrays.asList(newtargets)
+               + ", closeFile=" + closeFile
+               + ", deleteBlock=" + deleteblock
+               + ")");
+      final BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock
         .getLocalBlock(lastblock));
-    if (storedBlock == null) {
-      throw new IOException("Block (=" + lastblock + ") not found");
-    }
-    INodeFile iFile = storedBlock.getINode();
-    if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
-      throw new IOException("Unexpected block (=" + lastblock
-          + ") since the file (=" + iFile.getLocalName()
-          + ") is not under construction");
-    }
-
-    long recoveryId =
-      ((BlockInfoUnderConstruction)storedBlock).getBlockRecoveryId();
-    if(recoveryId != newgenerationstamp) {
-      throw new IOException("The recovery id " + newgenerationstamp
-          + " does not match current recovery id "
-          + recoveryId + " for block " + lastblock); 
-    }
-        
-    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
+      if (storedBlock == null) {
+        throw new IOException("Block (=" + lastblock + ") not found");
+      }
+      INodeFile iFile = storedBlock.getINode();
+      if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
+        throw new IOException("Unexpected block (=" + lastblock
+                              + ") since the file (=" + iFile.getLocalName()
+                              + ") is not under construction");
+      }
+
+      long recoveryId =
+        ((BlockInfoUnderConstruction)storedBlock).getBlockRecoveryId();
+      if(recoveryId != newgenerationstamp) {
+        throw new IOException("The recovery id " + newgenerationstamp
+                              + " does not match current recovery id "
+                              + recoveryId + " for block " + lastblock); 
+      }
+
+      INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
 
-    if (deleteblock) {
-      pendingFile.removeLastBlock(ExtendedBlock.getLocalBlock(lastblock));
-      blockManager.removeBlockFromMap(storedBlock);
-    }
-    else {
-      // update last block
-      storedBlock.setGenerationStamp(newgenerationstamp);
-      storedBlock.setNumBytes(newlength);
-
-      // find the DatanodeDescriptor objects
-      // There should be no locations in the blockManager till now because the
-      // file is underConstruction
-      DatanodeDescriptor[] descriptors = null;
-      if (newtargets.length > 0) {
-        descriptors = new DatanodeDescriptor[newtargets.length];
-        for(int i = 0; i < newtargets.length; i++) {
-          descriptors[i] = getDatanode(newtargets[i]);
+      if (deleteblock) {
+        pendingFile.removeLastBlock(ExtendedBlock.getLocalBlock(lastblock));
+        blockManager.removeBlockFromMap(storedBlock);
+      }
+      else {
+        // update last block
+        storedBlock.setGenerationStamp(newgenerationstamp);
+        storedBlock.setNumBytes(newlength);
+
+        // find the DatanodeDescriptor objects
+        // There should be no locations in the blockManager till now because the
+        // file is underConstruction
+        DatanodeDescriptor[] descriptors = null;
+        if (newtargets.length > 0) {
+          descriptors = new DatanodeDescriptor[newtargets.length];
+          for(int i = 0; i < newtargets.length; i++) {
+            descriptors[i] = getDatanode(newtargets[i]);
+          }
         }
+        if (closeFile) {
+          // the file is getting closed. Insert block locations into blockManager.
+          // Otherwise fsck will report these blocks as MISSING, especially if the
+          // blocksReceived from Datanodes take a long time to arrive.
+          for (int i = 0; i < descriptors.length; i++) {
+            descriptors[i].addBlock(storedBlock);
+          }
+        }
+        // add pipeline locations into the INodeUnderConstruction
+        pendingFile.setLastBlock(storedBlock, descriptors);
       }
+
+      src = leaseManager.findPath(pendingFile);
       if (closeFile) {
-        // the file is getting closed. Insert block locations into blockManager.
-        // Otherwise fsck will report these blocks as MISSING, especially if the
-        // blocksReceived from Datanodes take a long time to arrive.
-        for (int i = 0; i < descriptors.length; i++) {
-          descriptors[i].addBlock(storedBlock);
-        }
-      }
-      // add pipeline locations into the INodeUnderConstruction
-      pendingFile.setLastBlock(storedBlock, descriptors);
-    }
-
-    // If this commit does not want to close the file, persist
-    // blocks only if append is supported and return
-    src = leaseManager.findPath(pendingFile);
-    if (!closeFile) {
-      if (supportAppends) {
+        // commit the last block and complete it if it has minimum replicas
+        blockManager.commitOrCompleteLastBlock(pendingFile, storedBlock);
+
+        //remove lease, close file
+        finalizeINodeFileUnderConstruction(src, pendingFile);
+      } else if (supportAppends) {
+        // If this commit does not want to close the file, persist
+        // blocks only if append is supported 
         dir.persistBlocks(src, pendingFile);
-        getEditLog().logSync();
       }
-      LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
-      return;
-    }
-
-    // commit the last block and complete it if it has minimum replicas
-    blockManager.commitOrCompleteLastBlock(pendingFile, storedBlock);
-
-    //remove lease, close file
-    finalizeINodeFileUnderConstruction(src, pendingFile);
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    LOG.info("commitBlockSynchronization(newblock=" + lastblock
+    if (closeFile) {
+      LOG.info("commitBlockSynchronization(newblock=" + lastblock
           + ", file=" + src
           + ", newgenerationstamp=" + newgenerationstamp
           + ", newlength=" + newlength
           + ", newtargets=" + Arrays.asList(newtargets) + ") successful");
+    } else {
+      LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
+    }
   }
 
 
@@ -2637,9 +2727,15 @@ public class FSNamesystem implements FSC
    * Renew the lease(s) held by the given client
    */
   void renewLease(String holder) throws IOException {
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
-    leaseManager.renewLease(holder);
+    writeLock();
+    try {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
+      }
+      leaseManager.renewLease(holder);
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
@@ -2657,20 +2753,26 @@ public class FSNamesystem implements FSC
   public DirectoryListing getListing(String src, byte[] startAfter,
       boolean needLocation) 
     throws AccessControlException, UnresolvedLinkException, IOException {
-    if (isPermissionEnabled) {
-      if (dir.isDir(src)) {
-        checkPathAccess(src, FsAction.READ_EXECUTE);
+    DirectoryListing dl;
+    readLock();
+    try {
+      if (isPermissionEnabled) {
+        if (dir.isDir(src)) {
+          checkPathAccess(src, FsAction.READ_EXECUTE);
+        } else {
+          checkTraverse(src);
+        }
       }
-      else {
-        checkTraverse(src);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(UserGroupInformation.getCurrentUser(),
+                      Server.getRemoteIp(),
+                      "listStatus", src, null, null);
       }
+      dl = dir.getListing(src, startAfter, needLocation);
+    } finally {
+      readUnlock();
     }
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    Server.getRemoteIp(),
-                    "listStatus", src, null, null);
-    }
-    return dir.getListing(src, startAfter, needLocation);
+    return dl;
   }
 
   /////////////////////////////////////////////////////////
@@ -2701,10 +2803,20 @@ public class FSNamesystem implements FSC
    * 
    * @see org.apache.hadoop.hdfs.server.datanode.DataNode
    */
-  public void registerDatanode(DatanodeRegistration nodeReg
-                                            ) throws IOException {
+  public void registerDatanode(DatanodeRegistration nodeReg)
+      throws IOException {
     writeLock();
     try {
+      registerDatanodeInternal(nodeReg);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  /** @see #registerDatanode(DatanodeRegistration) */
+  public void registerDatanodeInternal(DatanodeRegistration nodeReg)
+      throws IOException {
+    assert hasWriteLock();
     String dnAddress = Server.getRemoteAddress();
     if (dnAddress == null) {
       // Mostly called inside an RPC.
@@ -2821,17 +2933,12 @@ public class FSNamesystem implements FSC
       // because its is done when the descriptor is created
     }
 
-    if (safeMode != null) {
-      safeMode.checkMode();
-    }
-    return;
-    } finally {
-      writeUnlock();
-    }
+    checkSafeMode();
   }
     
   /* Resolve a node's network location */
   private void resolveNetworkLocation (DatanodeDescriptor node) {
+    assert hasWriteLock();
     List<String> names = new ArrayList<String>(1);
     if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
       // get the node's IP address
@@ -2908,7 +3015,23 @@ public class FSNamesystem implements FSC
   DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
       long capacity, long dfsUsed, long remaining, long blockPoolUsed,
       int xceiverCount, int xmitsInProgress, int failedVolumes) 
-      throws IOException {
+        throws IOException {
+    readLock();
+    try {
+      return handleHeartbeatInternal(nodeReg, capacity, dfsUsed, 
+          remaining, blockPoolUsed, xceiverCount, xmitsInProgress, 
+          failedVolumes);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  /** @see #handleHeartbeat(DatanodeRegistration, long, long, long, long, int, int, int) */
+  DatanodeCommand[] handleHeartbeatInternal(DatanodeRegistration nodeReg,
+      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+      int xceiverCount, int xmitsInProgress, int failedVolumes) 
+        throws IOException {
+    assert hasReadLock();
     DatanodeCommand cmd = null;
     synchronized (heartbeats) {
       synchronized (datanodeMap) {
@@ -3149,10 +3272,14 @@ public class FSNamesystem implements FSC
     int workFound = 0;
     int blocksToProcess = 0;
     int nodesToProcess = 0;
-    // blocks should not be replicated or removed if safe mode is on
+    // Blocks should not be replicated or removed if in safe mode.
+    // It's OK to check safe mode here w/o holding lock, in the worst
+    // case extra replications will be scheduled, and these will get
+    // fixed up later.
     if (isInSafeMode())
       return workFound;
-    synchronized(heartbeats) {
+
+    synchronized (heartbeats) {
       blocksToProcess = (int)(heartbeats.size() 
           * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
       nodesToProcess = (int)Math.ceil((double)heartbeats.size() 
@@ -3186,13 +3313,13 @@ public class FSNamesystem implements FSC
     throws IOException {
     writeLock();
     try {
-    DatanodeDescriptor nodeInfo = getDatanode(nodeID);
-    if (nodeInfo != null) {
-      removeDatanode(nodeInfo);
-    } else {
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
-                                   + nodeID.getName() + " does not exist");
-    }
+      DatanodeDescriptor nodeInfo = getDatanode(nodeID);
+      if (nodeInfo != null) {
+        removeDatanode(nodeInfo);
+      } else {
+        NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
+                                     + nodeID.getName() + " does not exist");
+      }
     } finally {
       writeUnlock();
     }
@@ -3203,6 +3330,7 @@ public class FSNamesystem implements FSC
    * @param nodeInfo datanode descriptor.
    */
   private void removeDatanode(DatanodeDescriptor nodeInfo) {
+    assert hasWriteLock();
     synchronized (heartbeats) {
       if (nodeInfo.isAlive) {
         updateStats(nodeInfo, false);
@@ -3218,12 +3346,11 @@ public class FSNamesystem implements FSC
     unprotectedRemoveDatanode(nodeInfo);
     clusterMap.remove(nodeInfo);
 
-    if (safeMode != null) {
-      safeMode.checkMode();
-    }
+    checkSafeMode();
   }
 
   void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
+    assert hasWriteLock();
     nodeDescr.resetBlocks();
     blockManager.removeFromInvalidates(nodeDescr.getStorageID());
     if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -3234,12 +3361,14 @@ public class FSNamesystem implements FSC
   }
     
   void unprotectedAddDatanode(DatanodeDescriptor nodeDescr) {
-    /* To keep host2DataNodeMap consistent with datanodeMap,
-       remove  from host2DataNodeMap the datanodeDescriptor removed
-       from datanodeMap before adding nodeDescr to host2DataNodeMap.
-    */
-    host2DataNodeMap.remove(
+    assert hasWriteLock();
+    // To keep host2DataNodeMap consistent with datanodeMap,
+    // remove  from host2DataNodeMap the datanodeDescriptor removed
+    // from datanodeMap before adding nodeDescr to host2DataNodeMap.
+    synchronized (datanodeMap) {
+      host2DataNodeMap.remove(
                             datanodeMap.put(nodeDescr.getStorageID(), nodeDescr));
+    }
     host2DataNodeMap.add(nodeDescr);
       
     if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -3256,8 +3385,11 @@ public class FSNamesystem implements FSC
    * @throws IOException
    */
   void wipeDatanode(DatanodeID nodeID) throws IOException {
+    assert hasWriteLock();
     String key = nodeID.getStorageID();
-    host2DataNodeMap.remove(datanodeMap.remove(key));
+    synchronized (datanodeMap) {
+      host2DataNodeMap.remove(datanodeMap.remove(key));
+    }
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug(
           "BLOCK* NameSystem.wipeDatanode: "
@@ -3282,8 +3414,9 @@ public class FSNamesystem implements FSC
    * effect causes more datanodes to be declared dead.
    */
   void heartbeatCheck() {
+    // It's OK to check safe mode w/o taking the lock here, we re-check
+    // for safe mode after taking the lock before removing a datanode.
     if (isInSafeMode()) {
-      // not to check dead nodes if in safemode
       return;
     }
     boolean allAlive = false;
@@ -3308,6 +3441,9 @@ public class FSNamesystem implements FSC
       // acquire the fsnamesystem lock, and then remove the dead node.
       if (foundDead) {
         writeLock();
+        if (isInSafeMode()) {
+          return;
+        }
         try {
           synchronized(heartbeats) {
             synchronized (datanodeMap) {
@@ -3343,21 +3479,21 @@ public class FSNamesystem implements FSC
     writeLock();
     startTime = now(); //after acquiring write lock
     try {
-    DatanodeDescriptor node = getDatanode(nodeID);
-    if (node == null || !node.isAlive) {
-      throw new IOException("ProcessReport from dead or unregistered node: "
-                            + nodeID.getName());
-    }
-    // To minimize startup time, we discard any second (or later) block reports
-    // that we receive while still in startup phase.
-    if (isInStartupSafeMode() && node.numBlocks() > 0) {
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
-          + "discarded non-initial block report from " + nodeID.getName()
-          + " because namenode still in startup phase");
-      return;
-    }
-
-    blockManager.processReport(node, newReport);
+      DatanodeDescriptor node = getDatanode(nodeID);
+      if (node == null || !node.isAlive) {
+        throw new IOException("ProcessReport from dead or unregistered node: "
+                              + nodeID.getName());
+      }
+      // To minimize startup time, we discard any second (or later) block reports
+      // that we receive while still in startup phase.
+      if (isInStartupSafeMode() && node.numBlocks() > 0) {
+        NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
+            + "discarded non-initial block report from " + nodeID.getName()
+            + " because namenode still in startup phase");
+        return;
+      }
+  
+      blockManager.processReport(node, newReport);
     } finally {
       endTime = now();
       writeUnlock();
@@ -3389,6 +3525,7 @@ public class FSNamesystem implements FSC
                               DatanodeDescriptor addedNode,
                               DatanodeDescriptor delNodeHint,
                               BlockPlacementPolicy replicator) {
+    assert hasWriteLock();
     // first form a rack to datanodes map and
     INodeFile inode = blockManager.getINode(b);
     HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
@@ -3482,20 +3619,20 @@ public class FSNamesystem implements FSC
                                          ) throws IOException {
     writeLock();
     try {
-    DatanodeDescriptor node = getDatanode(nodeID);
-    if (node == null || !node.isAlive) {
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
-          + " is received from dead or unregistered node " + nodeID.getName());
-      throw new IOException(
-          "Got blockReceived message from unregistered or dead node " + block);
-    }
-        
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
-                                    +block+" is received from " + nodeID.getName());
-    }
-
-    blockManager.addBlock(node, block, delHint);
+      DatanodeDescriptor node = getDatanode(nodeID);
+      if (node == null || !node.isAlive) {
+        NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
+            + " is received from dead or unregistered node " + nodeID.getName());
+        throw new IOException(
+            "Got blockReceived message from unregistered or dead node " + block);
+      }
+          
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
+                                      +block+" is received from " + nodeID.getName());
+      }
+  
+      blockManager.addBlock(node, block, delHint);
     } finally {
       writeUnlock();
     }
@@ -3612,75 +3749,74 @@ public class FSNamesystem implements FSC
   }
 
   private ArrayList<DatanodeDescriptor> getDatanodeListForReport(
-                                                      DatanodeReportType type) {
-    boolean listLiveNodes = type == DatanodeReportType.ALL ||
-                            type == DatanodeReportType.LIVE;
-    boolean listDeadNodes = type == DatanodeReportType.ALL ||
-                            type == DatanodeReportType.DEAD;
-
-    HashMap<String, String> mustList = new HashMap<String, String>();
-
+      DatanodeReportType type) {
     readLock();
-    try {
-    if (listDeadNodes) {
-      //first load all the nodes listed in include and exclude files.
-      for (Iterator<String> it = hostsReader.getHosts().iterator(); 
-           it.hasNext();) {
-        mustList.put(it.next(), "");
+    try {    
+      boolean listLiveNodes = type == DatanodeReportType.ALL ||
+                              type == DatanodeReportType.LIVE;
+      boolean listDeadNodes = type == DatanodeReportType.ALL ||
+                              type == DatanodeReportType.DEAD;
+  
+      HashMap<String, String> mustList = new HashMap<String, String>();
+  
+      if (listDeadNodes) {
+        //first load all the nodes listed in include and exclude files.
+        Iterator<String> it = hostsReader.getHosts().iterator();
+        while (it.hasNext()) {
+          mustList.put(it.next(), "");
+        }
+        it = hostsReader.getExcludedHosts().iterator(); 
+        while (it.hasNext()) {
+          mustList.put(it.next(), "");
+        }
       }
-      for (Iterator<String> it = hostsReader.getExcludedHosts().iterator(); 
-           it.hasNext();) {
-        mustList.put(it.next(), "");
+
+      ArrayList<DatanodeDescriptor> nodes = null;
+      
+      synchronized (datanodeMap) {
+        nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() + 
+                                                  mustList.size());
+        Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
+        while (it.hasNext()) { 
+          DatanodeDescriptor dn = it.next();
+          boolean isDead = isDatanodeDead(dn);
+          if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
+            nodes.add(dn);
+          }
+          //Remove any form of the this datanode in include/exclude lists.
+          mustList.remove(dn.getName());
+          mustList.remove(dn.getHost());
+          mustList.remove(dn.getHostName());
+        }
       }
-    }
-   
-    ArrayList<DatanodeDescriptor> nodes = null;
-    
-    synchronized (datanodeMap) {
-      nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() + 
-                                                mustList.size());
       
-      for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); 
-                                                               it.hasNext();) {
-        DatanodeDescriptor dn = it.next();
-        boolean isDead = isDatanodeDead(dn);
-        if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
+      if (listDeadNodes) {
+        Iterator<String> it = mustList.keySet().iterator();
+        while (it.hasNext()) {
+          DatanodeDescriptor dn = 
+              new DatanodeDescriptor(new DatanodeID(it.next()));
+          dn.setLastUpdate(0);
           nodes.add(dn);
         }
-        //Remove any form of the this datanode in include/exclude lists.
-        mustList.remove(dn.getName());
-        mustList.remove(dn.getHost());
-        mustList.remove(dn.getHostName());
       }
-    }
-    
-    if (listDeadNodes) {
-      for (Iterator<String> it = mustList.keySet().iterator(); it.hasNext();) {
-        DatanodeDescriptor dn = 
-            new DatanodeDescriptor(new DatanodeID(it.next()));
-        dn.setLastUpdate(0);
-        nodes.add(dn);
-      }
-    }
-    
-    return nodes;
+      return nodes;
     } finally {
       readUnlock();
     }
   }
 
-  public DatanodeInfo[] datanodeReport( DatanodeReportType type
-      ) throws AccessControlException {
+  public DatanodeInfo[] datanodeReport( DatanodeReportType type)
+      throws AccessControlException {
     readLock();
     try {
-    checkSuperuserPrivilege();
-
-    ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
-    DatanodeInfo[] arr = new DatanodeInfo[results.size()];
-    for (int i=0; i<arr.length; i++) {
-      arr[i] = new DatanodeInfo(results.get(i));
-    }
-    return arr;
+      checkSuperuserPrivilege();
+  
+      ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
+      DatanodeInfo[] arr = new DatanodeInfo[results.size()];
+      for (int i=0; i<arr.length; i++) {
+        arr[i] = new DatanodeInfo(results.get(i));
+      }
+      return arr;
     } finally {
       readUnlock();
     }
@@ -3695,17 +3831,17 @@ public class FSNamesystem implements FSC
    * @throws IOException if 
    */
   void saveNamespace() throws AccessControlException, IOException {
-    writeLock();
+    readLock();
     try {
-    checkSuperuserPrivilege();
-    if(!isInSafeMode()) {
-      throw new IOException("Safe mode should be turned ON " +
-                            "in order to create namespace image.");
-    }
-    getFSImage().saveNamespace(true);
-    LOG.info("New namespace image has been created.");
+      checkSuperuserPrivilege();
+      if (!isInSafeMode()) {
+        throw new IOException("Safe mode should be turned ON " +
+                              "in order to create namespace image.");
+      }
+      getFSImage().saveNamespace(true);
+      LOG.info("New namespace image has been created.");
     } finally {
-      writeUnlock();
+      readUnlock();
     }
   }
   
@@ -3718,16 +3854,16 @@ public class FSNamesystem implements FSC
   boolean restoreFailedStorage(String arg) throws AccessControlException {
     writeLock();
     try {
-    checkSuperuserPrivilege();
-    
-    // if it is disabled - enable it and vice versa.
-    if(arg.equals("check"))
-      return getFSImage().getStorage().getRestoreFailedStorage();
-    
-    boolean val = arg.equals("true");  // false if not
-    getFSImage().getStorage().setRestoreFailedStorage(val);
-    
-    return val;
+      checkSuperuserPrivilege();
+      
+      // if it is disabled - enable it and vice versa.
+      if(arg.equals("check"))
+        return getFSImage().getStorage().getRestoreFailedStorage();
+      
+      boolean val = arg.equals("true");  // false if not
+      getFSImage().getStorage().setRestoreFailedStorage(val);
+      
+      return val;
     } finally {
       writeUnlock();
     }
@@ -3739,15 +3875,15 @@ public class FSNamesystem implements FSC
                                           ArrayList<DatanodeDescriptor> dead) {
     readLock();
     try {
-    ArrayList<DatanodeDescriptor> results = 
-                            getDatanodeListForReport(DatanodeReportType.ALL);    
-    for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
-      DatanodeDescriptor node = it.next();
-      if (isDatanodeDead(node))
-        dead.add(node);
-      else
-        live.add(node);
-    }
+      ArrayList<DatanodeDescriptor> results = 
+                              getDatanodeListForReport(DatanodeReportType.ALL);    
+      for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
+        DatanodeDescriptor node = it.next();
+        if (isDatanodeDead(node))
+          dead.add(node);
+        else
+          live.add(node);
+      }
     } finally {
       readUnlock();
     }
@@ -3759,13 +3895,13 @@ public class FSNamesystem implements FSC
   private void datanodeDump(PrintWriter out) {
     readLock();
     try {
-    synchronized (datanodeMap) {
-      out.println("Metasave: Number of datanodes: " + datanodeMap.size());
-      for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
-        DatanodeDescriptor node = it.next();
-        out.println(node.dumpDatanode());
+      synchronized (datanodeMap) {
+        out.println("Metasave: Number of datanodes: " + datanodeMap.size());
+        for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
+          DatanodeDescriptor node = it.next();
+          out.println(node.dumpDatanode());
+        }
       }
-    }
     } finally {
       readUnlock();
     }
@@ -3774,9 +3910,9 @@ public class FSNamesystem implements FSC
   /**
    * Start decommissioning the specified datanode. 
    */
-  private void startDecommission (DatanodeDescriptor node) 
+  private void startDecommission(DatanodeDescriptor node) 
     throws IOException {
-
+    assert hasWriteLock();
     if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
       LOG.info("Start Decommissioning node " + node.getName() + " with " + 
           node.numBlocks() +  " blocks.");
@@ -3795,8 +3931,9 @@ public class FSNamesystem implements FSC
   /**
    * Stop decommissioning the specified datanodes.
    */
-  public void stopDecommission (DatanodeDescriptor node) 
+  public void stopDecommission(DatanodeDescriptor node) 
     throws IOException {
+    assert hasWriteLock();
     if (node.isDecommissionInProgress() || node.isDecommissioned()) {
       LOG.info("Stop Decommissioning node " + node.getName());
       synchronized (heartbeats) {
@@ -3807,12 +3944,6 @@ public class FSNamesystem implements FSC
     }
   }
 
-  /** 
-   */
-  public DatanodeInfo getDataNodeInfo(String name) {
-    return datanodeMap.get(name);
-  }
-
   public Date getStartTime() {
     return new Date(systemStart); 
   }
@@ -3881,6 +4012,7 @@ public class FSNamesystem implements FSC
    * decommission completed. Return true if decommission is complete.
    */
   boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
+    assert hasWriteLock();
     //
     // Check to see if all blocks in this decommissioned
     // node has reached their target replication factor.
@@ -3965,7 +4097,7 @@ public class FSNamesystem implements FSC
    * it will be disallowed from registering. 
    */
   private boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr) {
-    assert (hasWriteLock());
+    assert hasWriteLock();
     return inHostsList(nodeReg, ipAddr);
   }
     
@@ -3974,6 +4106,7 @@ public class FSNamesystem implements FSC
    */
   private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr) 
     throws IOException {
+    assert hasWriteLock();
     // If the registered node is in exclude list, then decommission it
     if (inExcludedHostsList(nodeReg, ipAddr)) {
       startDecommission(nodeReg);
@@ -3988,6 +4121,7 @@ public class FSNamesystem implements FSC
    * @throws IOException
    */
   public DatanodeDescriptor getDatanode(DatanodeID nodeID) throws IOException {
+    assert hasReadOrWriteLock();
     UnregisteredNodeException e = null;
     DatanodeDescriptor node = datanodeMap.get(nodeID.getStorageID());
     if (node == null) 
@@ -4478,12 +4612,22 @@ public class FSNamesystem implements FSC
     }
     return isInSafeMode();
   }
+  
+  private void checkSafeMode() {
+    // safeMode is volatile, and may be set to null at any time
+    SafeModeInfo safeMode = this.safeMode;
+    if (safeMode != null) {
+      safeMode.checkMode();
+    }
+  }
 
   /**
    * Check whether the name node is in safe mode.
    * @return true if safe mode is ON, false otherwise
    */
-  synchronized boolean isInSafeMode() {
+  boolean isInSafeMode() {
+    // safeMode is volatile, and may be set to null at any time
+    SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
       return false;
     return safeMode.isOn();
@@ -4492,18 +4636,23 @@ public class FSNamesystem implements FSC
   /**
    * Check whether the name node is in startup mode.
    */
-  synchronized boolean isInStartupSafeMode() {
+  boolean isInStartupSafeMode() {
+    // safeMode is volatile, and may be set to null at any time
+    SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
       return false;
-    return safeMode.isOn() && !safeMode.isManual();
+    return !safeMode.isManual() && safeMode.isOn();
   }
 
   /**
    * Check whether replication queues are populated.
    */
-  synchronized boolean isPopulatingReplQueues() {
-    return (!isInSafeMode() ||
-            safeMode.isPopulatingReplQueues());
+  boolean isPopulatingReplQueues() {
+    // safeMode is volatile, and may be set to null at any time
+    SafeModeInfo safeMode = this.safeMode;
+    if (safeMode == null)
+      return true;
+    return safeMode.isPopulatingReplQueues();
   }
     
   /**
@@ -4511,6 +4660,8 @@ public class FSNamesystem implements FSC
    * @param replication current replication 
    */
   void incrementSafeBlockCount(int replication) {
+    // safeMode is volatile, and may be set to null at any time
+    SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
       return;
     safeMode.incrementSafeBlockCount((short)replication);
@@ -4520,6 +4671,8 @@ public class FSNamesystem implements FSC
    * Decrement number of blocks that reached minimal replication.
    */
   void decrementSafeBlockCount(Block b) {
+    // safeMode is volatile, and may be set to null at any time
+    SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null) // mostly true
       return;
     safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
@@ -4529,6 +4682,8 @@ public class FSNamesystem implements FSC
    * Set the total number of blocks in the system. 
    */
   void setBlockTotal() {
+    // safeMode is volatile, and may be set to null at any time
+    SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
       return;
     safeMode.setBlockTotal((int)getCompleteBlocksTotal());
@@ -4550,29 +4705,34 @@ public class FSNamesystem implements FSC
   long getCompleteBlocksTotal() {
     // Calculate number of blocks under construction
     long numUCBlocks = 0;
-    for (Lease lease : leaseManager.getSortedLeases()) {
-      for (String path : lease.getPaths()) {
-        INode node; 
-        try {
-          node = dir.getFileINode(path);
-        } catch (UnresolvedLinkException e) {
-          throw new AssertionError("Lease files should reside on this FS");
-        }
-        assert node != null : "Found a lease for nonexisting file.";
-        assert node.isUnderConstruction() :
-          "Found a lease for file that is not under construction.";
-        INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
-        BlockInfo[] blocks = cons.getBlocks();
-        if(blocks == null)
-          continue;
-        for(BlockInfo b : blocks) {
-          if(!b.isComplete())
-            numUCBlocks++;
+    readLock();
+    try {
+      for (Lease lease : leaseManager.getSortedLeases()) {
+        for (String path : lease.getPaths()) {
+          INode node;
+          try {
+            node = dir.getFileINode(path);
+          } catch (UnresolvedLinkException e) {
+            throw new AssertionError("Lease files should reside on this FS");
+          }
+          assert node != null : "Found a lease for nonexisting file.";
+          assert node.isUnderConstruction() :
+            "Found a lease for file that is not under construction.";

[... 598 lines stripped ...]