You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/06/21 22:09:57 UTC

svn commit: r1138160 [4/6] - in /hadoop/common/branches/HDFS-1073/hdfs: ./ bin/ src/c++/libhdfs/ src/c++/libhdfs/tests/ src/contrib/ src/contrib/fuse-dfs/ src/contrib/fuse-dfs/src/ src/contrib/hdfsproxy/ src/docs/src/documentation/content/xdocs/ src/ja...

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Jun 21 20:09:54 2011
@@ -17,45 +17,100 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import org.apache.commons.logging.*;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.common.Util;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
-import org.apache.hadoop.util.*;
-import org.apache.hadoop.net.CachedDNSToSwitchMapping;
-import org.apache.hadoop.net.DNSToSwitchMapping;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor.BlockTargetPair;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
+import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -66,51 +121,33 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FsServerDefaults;
-import org.apache.hadoop.fs.InvalidPathException;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableCounterInt;
 import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
 
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.DataOutputStream;
-import java.io.PrintWriter;
-import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.net.URI;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-
 /***************************************************
  * FSNamesystem does the actual bookkeeping work for the
  * DataNode.
@@ -271,8 +308,8 @@ public class FSNamesystem implements FSC
   private FsServerDefaults serverDefaults;
   // allow appending to hdfs files
   private boolean supportAppends = true;
-  private DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure = 
-      DataTransferProtocol.ReplaceDatanodeOnFailure.DEFAULT;
+  private ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure = 
+      ReplaceDatanodeOnFailure.DEFAULT;
 
   private volatile SafeModeInfo safeMode;  // safe mode information
   private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
@@ -455,6 +492,14 @@ public class FSNamesystem implements FSC
     return this.fsLock.isWriteLockedByCurrentThread();
   }
 
+  boolean hasReadLock() {
+    return this.fsLock.getReadHoldCount() > 0;
+  }
+
+  boolean hasReadOrWriteLock() {
+    return hasReadLock() || hasWriteLock();
+  }
+
   /**
    * dirs is a list of directories where the filesystem directory state 
    * is stored
@@ -553,7 +598,7 @@ public class FSNamesystem implements FSC
         + " min(s), blockTokenLifetime=" + blockTokenLifetime / (60 * 1000)
         + " min(s)");
 
-    this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
+    this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
   }
 
   /**
@@ -565,11 +610,14 @@ public class FSNamesystem implements FSC
   }
   
   NamespaceInfo getNamespaceInfo() {
-    return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
-                             getClusterId(),
-                             getBlockPoolId(),
-                             dir.fsImage.getStorage().getCTime(),
-                             getDistributedUpgradeVersion());
+    readLock();
+    try {
+      return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
+          getClusterId(), getBlockPoolId(),
+          dir.fsImage.getStorage().getCTime(), getDistributedUpgradeVersion());
+    } finally {
+      readUnlock();
+    }
   }
 
   /**
@@ -618,32 +666,32 @@ public class FSNamesystem implements FSC
   void metaSave(String filename) throws IOException {
     writeLock();
     try {
-    checkSuperuserPrivilege();
-    File file = new File(System.getProperty("hadoop.log.dir"), filename);
-    PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
-        true)));
-
-    long totalInodes = this.dir.totalInodes();
-    long totalBlocks = this.getBlocksTotal();
-
-    ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
-    ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
-    this.DFSNodesStatus(live, dead);
-    
-    String str = totalInodes + " files and directories, " + totalBlocks
-        + " blocks = " + (totalInodes + totalBlocks) + " total";
-    out.println(str);
-    out.println("Live Datanodes: "+live.size());
-    out.println("Dead Datanodes: "+dead.size());
-    blockManager.metaSave(out);
-
-    //
-    // Dump all datanodes
-    //
-    datanodeDump(out);
-
-    out.flush();
-    out.close();
+      checkSuperuserPrivilege();
+      File file = new File(System.getProperty("hadoop.log.dir"), filename);
+      PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
+          true)));
+  
+      long totalInodes = this.dir.totalInodes();
+      long totalBlocks = this.getBlocksTotal();
+  
+      ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+      ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+      this.DFSNodesStatus(live, dead);
+      
+      String str = totalInodes + " files and directories, " + totalBlocks
+          + " blocks = " + (totalInodes + totalBlocks) + " total";
+      out.println(str);
+      out.println("Live Datanodes: "+live.size());
+      out.println("Dead Datanodes: "+dead.size());
+      blockManager.metaSave(out);
+  
+      //
+      // Dump all datanodes
+      //
+      datanodeDump(out);
+  
+      out.flush();
+      out.close();
     } finally {
       writeUnlock();
     }
@@ -681,46 +729,46 @@ public class FSNamesystem implements FSC
       throws IOException {
     readLock();
     try {
-    checkSuperuserPrivilege();
-
-    DatanodeDescriptor node = getDatanode(datanode);
-    if (node == null) {
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
-          + "Asking for blocks from an unrecorded node " + datanode.getName());
-      throw new IllegalArgumentException(
-          "Unexpected exception.  Got getBlocks message for datanode " +
-          datanode.getName() + ", but there is no info for it");
-    }
-
-    int numBlocks = node.numBlocks();
-    if(numBlocks == 0) {
-      return new BlocksWithLocations(new BlockWithLocations[0]);
-    }
-    Iterator<BlockInfo> iter = node.getBlockIterator();
-    int startBlock = r.nextInt(numBlocks); // starting from a random block
-    // skip blocks
-    for(int i=0; i<startBlock; i++) {
-      iter.next();
-    }
-    List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
-    long totalSize = 0;
-    BlockInfo curBlock;
-    while(totalSize<size && iter.hasNext()) {
-      curBlock = iter.next();
-      if(!curBlock.isComplete())  continue;
-      totalSize += addBlock(curBlock, results);
-    }
-    if(totalSize<size) {
-      iter = node.getBlockIterator(); // start from the beginning
-      for(int i=0; i<startBlock&&totalSize<size; i++) {
+      checkSuperuserPrivilege();
+  
+      DatanodeDescriptor node = getDatanode(datanode);
+      if (node == null) {
+        NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
+            + "Asking for blocks from an unrecorded node " + datanode.getName());
+        throw new IllegalArgumentException(
+            "Unexpected exception.  Got getBlocks message for datanode " +
+            datanode.getName() + ", but there is no info for it");
+      }
+  
+      int numBlocks = node.numBlocks();
+      if(numBlocks == 0) {
+        return new BlocksWithLocations(new BlockWithLocations[0]);
+      }
+      Iterator<BlockInfo> iter = node.getBlockIterator();
+      int startBlock = r.nextInt(numBlocks); // starting from a random block
+      // skip blocks
+      for(int i=0; i<startBlock; i++) {
+        iter.next();
+      }
+      List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
+      long totalSize = 0;
+      BlockInfo curBlock;
+      while(totalSize<size && iter.hasNext()) {
         curBlock = iter.next();
         if(!curBlock.isComplete())  continue;
         totalSize += addBlock(curBlock, results);
       }
-    }
-
-    return new BlocksWithLocations(
-        results.toArray(new BlockWithLocations[results.size()]));
+      if(totalSize<size) {
+        iter = node.getBlockIterator(); // start from the beginning
+        for(int i=0; i<startBlock&&totalSize<size; i++) {
+          curBlock = iter.next();
+          if(!curBlock.isComplete())  continue;
+          totalSize += addBlock(curBlock, results);
+        }
+      }
+  
+      return new BlocksWithLocations(
+          results.toArray(new BlockWithLocations[results.size()]));
     } finally {
       readUnlock();
     }
@@ -741,6 +789,7 @@ public class FSNamesystem implements FSC
    * return the length of the added block; 0 if the block is not added
    */
   private long addBlock(Block block, List<BlockWithLocations> results) {
+    assert hasReadOrWriteLock();
     ArrayList<String> machineSet = blockManager.getValidLocations(block);
     if(machineSet.size() == 0) {
       return 0;
@@ -763,21 +812,25 @@ public class FSNamesystem implements FSC
   public void setPermission(String src, FsPermission permission)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
+    HdfsFileStatus resultingStat = null;
     writeLock();
     try {
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot set permission for " + src, safeMode);
-    checkOwner(src);
-    dir.setPermission(src, permission);
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot set permission for " + src, safeMode);
+      }
+      checkOwner(src);
+      dir.setPermission(src, permission);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(src, false);
+      }
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
-                    "setPermission", src, null, stat);
-    }
-    } finally {
-      writeUnlock();
+                    "setPermission", src, null, resultingStat);
     }
   }
 
@@ -788,30 +841,34 @@ public class FSNamesystem implements FSC
   public void setOwner(String src, String username, String group)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
+    HdfsFileStatus resultingStat = null;
     writeLock();
     try {
-    if (isInSafeMode())
+      if (isInSafeMode()) {
         throw new SafeModeException("Cannot set owner for " + src, safeMode);
-    FSPermissionChecker pc = checkOwner(src);
-    if (!pc.isSuper) {
-      if (username != null && !pc.user.equals(username)) {
-        throw new AccessControlException("Non-super user cannot change owner.");
       }
-      if (group != null && !pc.containsGroup(group)) {
-        throw new AccessControlException("User does not belong to " + group
+      FSPermissionChecker pc = checkOwner(src);
+      if (!pc.isSuper) {
+        if (username != null && !pc.user.equals(username)) {
+          throw new AccessControlException("Non-super user cannot change owner.");
+        }
+        if (group != null && !pc.containsGroup(group)) {
+          throw new AccessControlException("User does not belong to " + group
             + " .");
+        }
+      }
+      dir.setOwner(src, username, group);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(src, false);
       }
+    } finally {
+      writeUnlock();
     }
-    dir.setOwner(src, username, group);
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
-                    "setOwner", src, null, stat);
-    }
-    } finally {
-      writeUnlock();
+                    "setOwner", src, null, resultingStat);
     }
   }
 
@@ -840,7 +897,7 @@ public class FSNamesystem implements FSC
   /**
    * Get block locations within the specified range.
    * @see ClientProtocol#getBlockLocations(String, long, long)
-   * @throws FileNotFoundException
+   * @throws FileNotFoundException, UnresolvedLinkException, IOException
    */
   LocatedBlocks getBlockLocations(String src, long offset, long length,
       boolean doAccessTime, boolean needBlockToken) throws FileNotFoundException,
@@ -857,7 +914,7 @@ public class FSNamesystem implements FSC
       throw new HadoopIllegalArgumentException(
           "Negative length is not supported. File: " + src);
     }
-    final LocatedBlocks ret = getBlockLocationsInternal(src,
+    final LocatedBlocks ret = getBlockLocationsUpdateTimes(src,
         offset, length, doAccessTime, needBlockToken);  
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
@@ -867,7 +924,11 @@ public class FSNamesystem implements FSC
     return ret;
   }
 
-  private LocatedBlocks getBlockLocationsInternal(String src,
+  /*
+   * Get block locations within the specified range, updating the
+   * access times if necessary. 
+   */
+  private LocatedBlocks getBlockLocationsUpdateTimes(String src,
                                                        long offset, 
                                                        long length,
                                                        boolean doAccessTime, 
@@ -918,8 +979,7 @@ public class FSNamesystem implements FSC
   LocatedBlocks getBlockLocationsInternal(INodeFile inode,
       long offset, long length, boolean needBlockToken)
   throws IOException {
-    readLock();
-    try {
+    assert hasReadOrWriteLock();
     final BlockInfo[] blocks = inode.getBlocks();
     if (LOG.isDebugEnabled()) {
       LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
@@ -951,9 +1011,6 @@ public class FSNamesystem implements FSC
       return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
           lastBlock, last.isComplete());
     }
-    } finally {
-      readUnlock();
-    }
   }
 
   /** Create a LocatedBlock. */
@@ -985,146 +1042,152 @@ public class FSNamesystem implements FSC
    * @throws IOException
    */
   public void concat(String target, String [] srcs) 
-    throws IOException, UnresolvedLinkException {
+      throws IOException, UnresolvedLinkException {
     if(FSNamesystem.LOG.isDebugEnabled()) {
       FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
           " to " + target);
     }
-    // check safe mode
-    if (isInSafeMode()) {
-      throw new SafeModeException("concat: cannot concat " + target, safeMode);
-    }
     
     // verify args
     if(target.isEmpty()) {
-      throw new IllegalArgumentException("concat: trg file name is empty");
+      throw new IllegalArgumentException("Target file name is empty");
     }
     if(srcs == null || srcs.length == 0) {
-      throw new IllegalArgumentException("concat: srcs list is empty or null");
+      throw new IllegalArgumentException("No sources given");
     }
     
-    // currently we require all the files to be in the same dir
+    // We require all files be in the same directory
     String trgParent = 
       target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
-    for(String s : srcs) {
+    for (String s : srcs) {
       String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
-      if(! srcParent.equals(trgParent)) {
-        throw new IllegalArgumentException
-           ("concat:  srcs and target shoould be in same dir");
+      if (!srcParent.equals(trgParent)) {
+        throw new IllegalArgumentException(
+           "Sources and target are not in the same directory");
       }
     }
-    
+
+    HdfsFileStatus resultingStat = null;
     writeLock();
     try {
-      // write permission for the target
-      if (isPermissionEnabled) {
-        checkPathAccess(target, FsAction.WRITE);
-
-        // and srcs
-        for(String aSrc: srcs) {
-          checkPathAccess(aSrc, FsAction.READ); // read the file
-          checkParentAccess(aSrc, FsAction.WRITE); // for delete 
-        }
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot concat " + target, safeMode);
       }
+      concatInternal(target, srcs);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(target, false);
+      }
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(UserGroupInformation.getLoginUser(),
+                    Server.getRemoteIp(),
+                    "concat", Arrays.toString(srcs), target, resultingStat);
+    }
+  }
 
+  /** See {@link #concat(String, String[])} */
+  public void concatInternal(String target, String [] srcs) 
+      throws IOException, UnresolvedLinkException {
+    assert hasWriteLock();
 
-      // to make sure no two files are the same
-      Set<INode> si = new HashSet<INode>();
-
-      // we put the following prerequisite for the operation
-      // replication and blocks sizes should be the same for ALL the blocks
-      // check the target
-      INode inode = dir.getFileINode(target);
+    // write permission for the target
+    if (isPermissionEnabled) {
+      checkPathAccess(target, FsAction.WRITE);
 
-      if(inode == null) {
-        throw new IllegalArgumentException("concat: trg file doesn't exist");
-      }
-      if(inode.isUnderConstruction()) {
-        throw new IllegalArgumentException("concat: trg file is uner construction");
+      // and srcs
+      for(String aSrc: srcs) {
+        checkPathAccess(aSrc, FsAction.READ); // read the file
+        checkParentAccess(aSrc, FsAction.WRITE); // for delete 
       }
+    }
 
-      INodeFile trgInode = (INodeFile) inode;
+    // to make sure no two files are the same
+    Set<INode> si = new HashSet<INode>();
 
-      // per design trg shouldn't be empty and all the blocks same size
-      if(trgInode.blocks.length == 0) {
-        throw new IllegalArgumentException("concat: "+ target + " file is empty");
-      }
+    // we put the following prerequisite for the operation
+    // replication and blocks sizes should be the same for ALL the blocks
+    // check the target
+    INode inode = dir.getFileINode(target);
 
-      long blockSize = trgInode.getPreferredBlockSize();
+    if(inode == null) {
+      throw new IllegalArgumentException("concat: trg file doesn't exist");
+    }
+    if(inode.isUnderConstruction()) {
+      throw new IllegalArgumentException("concat: trg file is uner construction");
+    }
 
-      // check the end block to be full
-      if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) {
-        throw new IllegalArgumentException(target + " blocks size should be the same");
-      }
+    INodeFile trgInode = (INodeFile) inode;
 
-      si.add(trgInode);
-      short repl = trgInode.getReplication();
+    // per design trg shouldn't be empty and all the blocks same size
+    if(trgInode.blocks.length == 0) {
+      throw new IllegalArgumentException("concat: "+ target + " file is empty");
+    }
 
-      // now check the srcs
-      boolean endSrc = false; // final src file doesn't have to have full end block
-      for(int i=0; i<srcs.length; i++) {
-        String src = srcs[i];
-        if(i==srcs.length-1)
-          endSrc=true;
+    long blockSize = trgInode.getPreferredBlockSize();
 
-        INodeFile srcInode = dir.getFileINode(src);
+    // check the end block to be full
+    if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) {
+      throw new IllegalArgumentException(target + " blocks size should be the same");
+    }
 
-        if(src.isEmpty() 
-            || srcInode == null
-            || srcInode.isUnderConstruction()
-            || srcInode.blocks.length == 0) {
-          throw new IllegalArgumentException("concat: file " + src + 
-          " is invalid or empty or underConstruction");
-        }
+    si.add(trgInode);
+    short repl = trgInode.getReplication();
 
-        // check replication and blocks size
-        if(repl != srcInode.getReplication()) {
-          throw new IllegalArgumentException(src + " and " + target + " " +
-              "should have same replication: "
-              + repl + " vs. " + srcInode.getReplication());
-        }
+    // now check the srcs
+    boolean endSrc = false; // final src file doesn't have to have full end block
+    for(int i=0; i<srcs.length; i++) {
+      String src = srcs[i];
+      if(i==srcs.length-1)
+        endSrc=true;
 
-        //boolean endBlock=false;
-        // verify that all the blocks are of the same length as target
-        // should be enough to check the end blocks
-        int idx = srcInode.blocks.length-1;
-        if(endSrc)
-          idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full
-        if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) {
-          throw new IllegalArgumentException("concat: blocks sizes of " + 
-              src + " and " + target + " should all be the same");
-        }
+      INodeFile srcInode = dir.getFileINode(src);
 
-        si.add(srcInode);
+      if(src.isEmpty() 
+          || srcInode == null
+          || srcInode.isUnderConstruction()
+          || srcInode.blocks.length == 0) {
+        throw new IllegalArgumentException("concat: file " + src + 
+        " is invalid or empty or underConstruction");
       }
 
-      // make sure no two files are the same
-      if(si.size() < srcs.length+1) { // trg + srcs
-        // it means at least two files are the same
-        throw new IllegalArgumentException("at least two files are the same");
+      // check replication and blocks size
+      if(repl != srcInode.getReplication()) {
+        throw new IllegalArgumentException(src + " and " + target + " " +
+            "should have same replication: "
+            + repl + " vs. " + srcInode.getReplication());
       }
 
-      if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " + 
-            Arrays.toString(srcs) + " to " + target);
+      //boolean endBlock=false;
+      // verify that all the blocks are of the same length as target
+      // should be enough to check the end blocks
+      int idx = srcInode.blocks.length-1;
+      if(endSrc)
+        idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full
+      if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) {
+        throw new IllegalArgumentException("concat: blocks sizes of " + 
+            src + " and " + target + " should all be the same");
       }
 
-      dir.concatInternal(target,srcs);
-    } finally {
-      writeUnlock();
+      si.add(srcInode);
     }
-    getEditLog().logSync();
-   
-    
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      final HdfsFileStatus stat = dir.getFileInfo(target, false);
-      logAuditEvent(UserGroupInformation.getLoginUser(),
-                    Server.getRemoteIp(),
-                    "concat", Arrays.toString(srcs), target, stat);
+
+    // make sure no two files are the same
+    if(si.size() < srcs.length+1) { // trg + srcs
+      // it means at least two files are the same
+      throw new IllegalArgumentException("at least two files are the same");
     }
-   
-  }
 
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " + 
+          Arrays.toString(srcs) + " to " + target);
+    }
+
+    dir.concat(target,srcs);
+  }
+  
   /**
    * stores the modification and access time for this inode. 
    * The access time is precise upto an hour. The transaction, if needed, is
@@ -1138,23 +1201,22 @@ public class FSNamesystem implements FSC
     }
     writeLock();
     try {
-    //
-    // The caller needs to have write access to set access & modification times.
-    if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.WRITE);
-    }
-    INodeFile inode = dir.getFileINode(src);
-    if (inode != null) {
-      dir.setTimes(src, inode, mtime, atime, true);
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-        final HdfsFileStatus stat = dir.getFileInfo(src, false);
-        logAuditEvent(UserGroupInformation.getCurrentUser(),
-                      Server.getRemoteIp(),
-                      "setTimes", src, null, stat);
+      // Write access is required to set access and modification times
+      if (isPermissionEnabled) {
+        checkPathAccess(src, FsAction.WRITE);
+      }
+      INodeFile inode = dir.getFileINode(src);
+      if (inode != null) {
+        dir.setTimes(src, inode, mtime, atime, true);
+        if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+          final HdfsFileStatus stat = dir.getFileInfo(src, false);
+          logAuditEvent(UserGroupInformation.getCurrentUser(),
+                        Server.getRemoteIp(),
+                        "setTimes", src, null, stat);
+        }
+      } else {
+        throw new FileNotFoundException("File " + src + " does not exist.");
       }
-    } else {
-      throw new FileNotFoundException("File " + src + " does not exist.");
-    }
     } finally {
       writeUnlock();
     }
@@ -1166,21 +1228,24 @@ public class FSNamesystem implements FSC
   public void createSymlink(String target, String link,
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
+    HdfsFileStatus resultingStat = null;
     writeLock();
     try {
-    if (!createParent) {
-      verifyParentDir(link);
-    }
-    createSymlinkInternal(target, link, dirPerms, createParent);
+      if (!createParent) {
+        verifyParentDir(link);
+      }
+      createSymlinkInternal(target, link, dirPerms, createParent);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(link, false);
+      }
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      final HdfsFileStatus stat = dir.getFileInfo(link, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
-                    "createSymlink", link, target, stat);
+                    "createSymlink", link, target, resultingStat);
     }
   }
 
@@ -1190,13 +1255,11 @@ public class FSNamesystem implements FSC
   private void createSymlinkInternal(String target, String link,
       PermissionStatus dirPerms, boolean createParent)
       throws IOException, UnresolvedLinkException {
-    writeLock();
-    try {
+    assert hasWriteLock();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target=" + 
         target + " link=" + link);
     }
-
     if (isInSafeMode()) {
       throw new SafeModeException("Cannot create symlink " + link, safeMode);
     }
@@ -1215,9 +1278,6 @@ public class FSNamesystem implements FSC
 
     // add symbolic link to namespace
     dir.addSymlink(link, target, dirPerms, createParent);
-    } finally {
-      writeUnlock();
-    }
   }
 
   /**
@@ -1235,7 +1295,16 @@ public class FSNamesystem implements FSC
    */
   public boolean setReplication(String src, short replication) 
     throws IOException, UnresolvedLinkException {
-    boolean status = setReplicationInternal(src, replication);
+    boolean status = false;
+    writeLock();
+    try {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot set replication for " + src, safeMode);
+      }
+      status = setReplicationInternal(src, replication);
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
@@ -1248,10 +1317,7 @@ public class FSNamesystem implements FSC
   private boolean setReplicationInternal(String src,
       short replication) throws AccessControlException, QuotaExceededException,
       SafeModeException, UnresolvedLinkException, IOException {
-    writeLock();
-    try {
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot set replication for " + src, safeMode);
+    assert hasWriteLock();
     blockManager.verifyReplication(src, replication, null);
     if (isPermissionEnabled) {
       checkPathAccess(src, FsAction.WRITE);
@@ -1281,17 +1347,19 @@ public class FSNamesystem implements FSC
           + ". New replication is " + replication);
     }
     return true;
-    } finally {
-      writeUnlock();
-    }
   }
     
   long getPreferredBlockSize(String filename) 
-    throws IOException, UnresolvedLinkException {
-    if (isPermissionEnabled) {
-      checkTraverse(filename);
+      throws IOException, UnresolvedLinkException {
+    readLock();
+    try {
+      if (isPermissionEnabled) {
+        checkTraverse(filename);
+      }
+      return dir.getPreferredBlockSize(filename);
+    } finally {
+      readUnlock();
     }
-    return dir.getPreferredBlockSize(filename);
   }
 
   /*
@@ -1299,6 +1367,7 @@ public class FSNamesystem implements FSC
    */
   private void verifyParentDir(String src) throws FileNotFoundException,
       ParentNotDirectoryException, UnresolvedLinkException {
+    assert hasReadOrWriteLock();
     Path parent = new Path(src).getParent();
     if (parent != null) {
       INode[] pathINodes = dir.getExistingPathINodes(parent.toString());
@@ -1324,8 +1393,13 @@ public class FSNamesystem implements FSC
       short replication, long blockSize) throws AccessControlException,
       SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
       FileNotFoundException, ParentNotDirectoryException, IOException {
-    startFileInternal(src, permissions, holder, clientMachine, flag,
-        createParent, replication, blockSize);
+    writeLock();
+    try {
+      startFileInternal(src, permissions, holder, clientMachine, flag,
+          createParent, replication, blockSize);
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
@@ -1357,6 +1431,7 @@ public class FSNamesystem implements FSC
       long blockSize) throws SafeModeException, FileAlreadyExistsException,
       AccessControlException, UnresolvedLinkException, FileNotFoundException,
       ParentNotDirectoryException, IOException {
+    assert hasWriteLock();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
           + ", holder=" + holder
@@ -1365,10 +1440,9 @@ public class FSNamesystem implements FSC
           + ", replication=" + replication
           + ", createFlag=" + flag.toString());
     }
-    writeLock();
-    try {
-    if (isInSafeMode())
+    if (isInSafeMode()) {
       throw new SafeModeException("Cannot create file" + src, safeMode);
+    }
     if (!DFSUtil.isValidName(src)) {
       throw new InvalidPathException(src);
     }
@@ -1476,9 +1550,6 @@ public class FSNamesystem implements FSC
                                    +ie.getMessage());
       throw ie;
     }
-    } finally {
-      writeUnlock();
-    }
     return null;
   }
 
@@ -1493,35 +1564,41 @@ public class FSNamesystem implements FSC
    * @return true if the file is already closed
    * @throws IOException
    */
-  synchronized boolean recoverLease(String src, String holder, String clientMachine)
-  throws IOException {
-    if (isInSafeMode()) {
-      throw new SafeModeException(
-          "Cannot recover the lease of " + src, safeMode);
-    }
-    if (!DFSUtil.isValidName(src)) {
-      throw new IOException("Invalid file name: " + src);
-    }
-
-    INode inode = dir.getFileINode(src);
-    if (inode == null) {
-      throw new FileNotFoundException("File not found " + src);
-    }
-
-    if (!inode.isUnderConstruction()) {
-      return true;
-    }
-    if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.WRITE);
+  boolean recoverLease(String src, String holder, String clientMachine)
+      throws IOException {
+    writeLock();
+    try {
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+            "Cannot recover the lease of " + src, safeMode);
+      }
+      if (!DFSUtil.isValidName(src)) {
+        throw new IOException("Invalid file name: " + src);
+      }
+  
+      INode inode = dir.getFileINode(src);
+      if (inode == null) {
+        throw new FileNotFoundException("File not found " + src);
+      }
+  
+      if (!inode.isUnderConstruction()) {
+        return true;
+      }
+      if (isPermissionEnabled) {
+        checkPathAccess(src, FsAction.WRITE);
+      }
+  
+      recoverLeaseInternal(inode, src, holder, clientMachine, true);
+    } finally {
+      writeUnlock();
     }
-
-    recoverLeaseInternal(inode, src, holder, clientMachine, true);
     return false;
   }
 
   private void recoverLeaseInternal(INode fileInode, 
       String src, String holder, String clientMachine, boolean force)
-  throws IOException {
+      throws IOException {
+    assert hasWriteLock();
     if (fileInode != null && fileInode.isUnderConstruction()) {
       INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode;
       //
@@ -1607,12 +1684,16 @@ public class FSNamesystem implements FSC
       throw new UnsupportedOperationException("Append to hdfs not supported." +
                             " Please refer to dfs.support.append configuration parameter.");
     }
-    LocatedBlock lb = 
-      startFileInternal(src, null, holder, clientMachine, 
+    LocatedBlock lb = null;
+    writeLock();
+    try {
+      lb = startFileInternal(src, null, holder, clientMachine, 
                         EnumSet.of(CreateFlag.APPEND), 
                         false, (short)blockManager.maxReplication, (long)0);
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
-
     if (lb != null) {
       if (NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
@@ -1621,7 +1702,6 @@ public class FSNamesystem implements FSC
             +" block size " + lb.getBlock().getNumBytes());
       }
     }
-
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
@@ -1713,6 +1793,9 @@ public class FSNamesystem implements FSC
     // Allocate a new block and record it in the INode. 
     writeLock();
     try {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot add block to " + src, safeMode);
+      }
       INode[] pathINodes = dir.getExistingPathINodes(src);
       int inodesLen = pathINodes.length;
       checkLease(src, clientName, pathINodes[inodesLen-1]);
@@ -1732,7 +1815,7 @@ public class FSNamesystem implements FSC
     } finally {
       writeUnlock();
     }
-        
+
     // Create next block
     LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, fileLength);
     if (isBlockTokenEnabled) {
@@ -1752,6 +1835,7 @@ public class FSNamesystem implements FSC
 
     final DatanodeDescriptor clientnode;
     final long preferredblocksize;
+    final List<DatanodeDescriptor> chosen;
     readLock();
     try {
       //check safe mode
@@ -1764,17 +1848,17 @@ public class FSNamesystem implements FSC
       final INodeFileUnderConstruction file = checkLease(src, clientName);
       clientnode = file.getClientNode();
       preferredblocksize = file.getPreferredBlockSize();
-    } finally {
-      readUnlock();
-    }
 
-    //find datanode descriptors
-    final List<DatanodeDescriptor> chosen = new ArrayList<DatanodeDescriptor>();
-    for(DatanodeInfo d : existings) {
-      final DatanodeDescriptor descriptor = getDatanode(d);
-      if (descriptor != null) {
-        chosen.add(descriptor);
+      //find datanode descriptors
+      chosen = new ArrayList<DatanodeDescriptor>();
+      for(DatanodeInfo d : existings) {
+        final DatanodeDescriptor descriptor = getDatanode(d);
+        if (descriptor != null) {
+          chosen.add(descriptor);
+        }
       }
+    } finally {
+      readUnlock();
     }
 
     // choose new datanodes.
@@ -1793,25 +1877,28 @@ public class FSNamesystem implements FSC
    * The client would like to let go of the given block
    */
   public boolean abandonBlock(ExtendedBlock b, String src, String holder)
-      throws LeaseExpiredException, FileNotFoundException,
-      UnresolvedLinkException, IOException {
-    writeLock();
-    try {
-    //
-    // Remove the block from the pending creates list
-    //
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
-                                    +b+"of file "+src);
-    }
-    INodeFileUnderConstruction file = checkLease(src, holder);
-    dir.removeBlock(src, file, ExtendedBlock.getLocalBlock(b));
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
-                                    + b
-                                    + " is removed from pendingCreates");
-    }
-    return true;
+      throws LeaseExpiredException, FileNotFoundException,
+      UnresolvedLinkException, IOException {
+    writeLock();
+    try {
+      //
+      // Remove the block from the pending creates list
+      //
+      if(NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+                                      +b+"of file "+src);
+      }
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot abandon block " + b +
+                                    " for fle" + src, safeMode);
+      }
+      INodeFileUnderConstruction file = checkLease(src, holder);
+      dir.removeBlock(src, file, ExtendedBlock.getLocalBlock(b));
+      if(NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+                                      + b + " is removed from pendingCreates");
+      }
+      return true;
     } finally {
       writeUnlock();
     }
@@ -1819,7 +1906,8 @@ public class FSNamesystem implements FSC
   
   // make sure that we still have the lease on this file.
   private INodeFileUnderConstruction checkLease(String src, String holder) 
-    throws LeaseExpiredException, UnresolvedLinkException {
+      throws LeaseExpiredException, UnresolvedLinkException {
+    assert hasReadOrWriteLock();
     INodeFile file = dir.getFileINode(src);
     checkLease(src, holder, file);
     return (INodeFileUnderConstruction)file;
@@ -1827,7 +1915,7 @@ public class FSNamesystem implements FSC
 
   private void checkLease(String src, String holder, INode file)
       throws LeaseExpiredException {
-
+    assert hasReadOrWriteLock();
     if (file == null || file.isDirectory()) {
       Lease lease = leaseManager.getLease(holder);
       throw new LeaseExpiredException("No lease on " + src +
@@ -1860,23 +1948,29 @@ public class FSNamesystem implements FSC
   public boolean completeFile(String src, String holder, ExtendedBlock last) 
     throws SafeModeException, UnresolvedLinkException, IOException {
     checkBlock(last);
-    boolean success = completeFileInternal(src, holder, 
+    boolean success = false;
+    writeLock();
+    try {
+      success = completeFileInternal(src, holder, 
         ExtendedBlock.getLocalBlock(last));
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
-    return success ;
+    return success;
   }
 
   private boolean completeFileInternal(String src, 
       String holder, Block last) throws SafeModeException,
       UnresolvedLinkException, IOException {
-    writeLock();
-    try {
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
+    assert hasWriteLock();
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
           src + " for " + holder);
     }
-    if (isInSafeMode())
+    if (isInSafeMode()) {
       throw new SafeModeException("Cannot complete file " + src, safeMode);
+    }
 
     INodeFileUnderConstruction pendingFile = checkLease(src, holder);
     // commit the last block and complete it if it has minimum replicas
@@ -1891,9 +1985,6 @@ public class FSNamesystem implements FSC
     NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: file " + src
                                   + " is closed by " + holder);
     return true;
-    } finally {
-      writeUnlock();
-    }
   }
 
   /** 
@@ -1922,6 +2013,7 @@ public class FSNamesystem implements FSC
    */
   private Block allocateBlock(String src, INode[] inodes,
       DatanodeDescriptor targets[]) throws QuotaExceededException {
+    assert hasWriteLock();
     Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0); 
     while(isValidBlock(b)) {
       b.setBlockId(FSNamesystem.randBlockId.nextLong());
@@ -1939,35 +2031,35 @@ public class FSNamesystem implements FSC
    * all blocks, otherwise check only penultimate block.
    */
   boolean checkFileProgress(INodeFile v, boolean checkall) {
-    writeLock();
+    readLock();
     try {
-    if (checkall) {
-      //
-      // check all blocks of the file.
-      //
-      for (BlockInfo block: v.getBlocks()) {
-        if (!block.isComplete()) {
+      if (checkall) {
+        //
+        // check all blocks of the file.
+        //
+        for (BlockInfo block: v.getBlocks()) {
+          if (!block.isComplete()) {
+            LOG.info("BLOCK* NameSystem.checkFileProgress: "
+                + "block " + block + " has not reached minimal replication "
+                + blockManager.minReplication);
+            return false;
+          }
+        }
+      } else {
+        //
+        // check the penultimate block of this file
+        //
+        BlockInfo b = v.getPenultimateBlock();
+        if (b != null && !b.isComplete()) {
           LOG.info("BLOCK* NameSystem.checkFileProgress: "
-              + "block " + block + " has not reached minimal replication "
+              + "block " + b + " has not reached minimal replication "
               + blockManager.minReplication);
           return false;
         }
       }
-    } else {
-      //
-      // check the penultimate block of this file
-      //
-      BlockInfo b = v.getPenultimateBlock();
-      if (b != null && !b.isComplete()) {
-        LOG.info("BLOCK* NameSystem.checkFileProgress: "
-            + "block " + b + " has not reached minimal replication "
-            + blockManager.minReplication);
-        return false;
-      }
-    }
-    return true;
+      return true;
     } finally {
-      writeUnlock();
+      readUnlock();
     }
   }
 
@@ -2006,13 +2098,26 @@ public class FSNamesystem implements FSC
   @Deprecated
   boolean renameTo(String src, String dst) 
     throws IOException, UnresolvedLinkException {
-    boolean status = renameToInternal(src, dst);
+    boolean status = false;
+    HdfsFileStatus resultingStat = null;
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
+          " to " + dst);
+    }
+    writeLock();
+    try {
+      status = renameToInternal(src, dst);
+      if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(dst, false);
+      }
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
-      final HdfsFileStatus stat = dir.getFileInfo(dst, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
-                    "rename", src, dst, stat);
+                    "rename", src, dst, resultingStat);
     }
     return status;
   }
@@ -2021,19 +2126,13 @@ public class FSNamesystem implements FSC
   @Deprecated
   private boolean renameToInternal(String src, String dst)
     throws IOException, UnresolvedLinkException {
-
-    writeLock();
-    try {
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
-          " to " + dst);
-    }
-    if (isInSafeMode())
+    assert hasWriteLock();
+    if (isInSafeMode()) {
       throw new SafeModeException("Cannot rename " + src, safeMode);
+    }
     if (!DFSUtil.isValidName(dst)) {
       throw new IOException("Invalid name: " + dst);
     }
-
     if (isPermissionEnabled) {
       //We should not be doing this.  This is move() not renameTo().
       //but for now,
@@ -2045,40 +2144,44 @@ public class FSNamesystem implements FSC
 
     HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
     if (dir.renameTo(src, dst)) {
-      changeLease(src, dst, dinfo);     // update lease with new filename
+      unprotectedChangeLease(src, dst, dinfo);     // update lease with new filename
       return true;
     }
     return false;
-    } finally {
-      writeUnlock();
-    }
   }
   
 
   /** Rename src to dst */
   void renameTo(String src, String dst, Options.Rename... options)
       throws IOException, UnresolvedLinkException {
-    renameToInternal(src, dst, options);
+    HdfsFileStatus resultingStat = null;
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
+          + src + " to " + dst);
+    }
+    writeLock();
+    try {
+      renameToInternal(src, dst, options);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(dst, false); 
+      }
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       StringBuilder cmd = new StringBuilder("rename options=");
       for (Rename option : options) {
         cmd.append(option.value()).append(" ");
       }
-      final HdfsFileStatus stat = dir.getFileInfo(dst, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(), Server.getRemoteIp(),
-                    cmd.toString(), src, dst, stat);
+                    cmd.toString(), src, dst, resultingStat);
     }
   }
 
   private void renameToInternal(String src, String dst,
       Options.Rename... options) throws IOException {
-    writeLock();
-    try {
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
-          + src + " to " + dst);
-    }
+    assert hasWriteLock();
     if (isInSafeMode()) {
       throw new SafeModeException("Cannot rename " + src, safeMode);
     }
@@ -2092,10 +2195,7 @@ public class FSNamesystem implements FSC
 
     HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
     dir.renameTo(src, dst, options);
-    changeLease(src, dst, dinfo); // update lease with new filename
-    } finally {
-      writeUnlock();
-    }
+    unprotectedChangeLease(src, dst, dinfo); // update lease with new filename
   }
   
   /**
@@ -2105,15 +2205,12 @@ public class FSNamesystem implements FSC
    * description of exceptions
    */
     public boolean delete(String src, boolean recursive)
-      throws AccessControlException, SafeModeException,
-      UnresolvedLinkException, IOException {
-      if ((!recursive) && (!dir.isDirEmpty(src))) {
-        throw new IOException(src + " is non empty");
-      }
+        throws AccessControlException, SafeModeException,
+               UnresolvedLinkException, IOException {
       if (NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
       }
-      boolean status = deleteInternal(src, true);
+      boolean status = deleteInternal(src, recursive, true);
       if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(UserGroupInformation.getCurrentUser(),
                       Server.getRemoteIp(),
@@ -2133,9 +2230,10 @@ public class FSNamesystem implements FSC
    * 
    * @see ClientProtocol#delete(String, boolean) for description of exceptions
    */
-  private boolean deleteInternal(String src, boolean enforcePermission)
-      throws AccessControlException, SafeModeException,
-      UnresolvedLinkException, IOException{
+  private boolean deleteInternal(String src, boolean recursive,
+      boolean enforcePermission)
+      throws AccessControlException, SafeModeException, UnresolvedLinkException,
+             IOException {
     boolean deleteNow = false;
     ArrayList<Block> collectedBlocks = new ArrayList<Block>();
 
@@ -2144,6 +2242,9 @@ public class FSNamesystem implements FSC
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot delete " + src, safeMode);
       }
+      if (!recursive && !dir.isDirEmpty(src)) {
+        throw new IOException(src + " is non empty");
+      }
       if (enforcePermission && isPermissionEnabled) {
         checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
       }
@@ -2158,10 +2259,16 @@ public class FSNamesystem implements FSC
     } finally {
       writeUnlock();
     }
-    // Log directory deletion to editlog
+
     getEditLog().logSync();
-    if (!deleteNow) {
-      removeBlocks(collectedBlocks); // Incremental deletion of blocks
+
+    writeLock();
+    try {
+      if (!deleteNow) {
+        removeBlocks(collectedBlocks); // Incremental deletion of blocks
+      }
+    } finally {
+      writeUnlock();
     }
     collectedBlocks.clear();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2173,24 +2280,21 @@ public class FSNamesystem implements FSC
 
   /** From the given list, incrementally remove the blocks from blockManager */
   private void removeBlocks(List<Block> blocks) {
+    assert hasWriteLock();
     int start = 0;
     int end = 0;
     while (start < blocks.size()) {
       end = BLOCK_DELETION_INCREMENT + start;
       end = end > blocks.size() ? blocks.size() : end;
-      writeLock();
-      try {
-        for (int i=start; i<end; i++) {
-          blockManager.removeBlock(blocks.get(i));
-        }
-      } finally {
-        writeUnlock();
+      for (int i=start; i<end; i++) {
+        blockManager.removeBlock(blocks.get(i));
       }
       start = end;
     }
   }
   
   void removePathAndBlocks(String src, List<Block> blocks) {
+    assert hasWriteLock();
     leaseManager.removeLeaseWithPrefixPath(src);
     if (blocks == null) {
       return;
@@ -2213,13 +2317,18 @@ public class FSNamesystem implements FSC
    */
   HdfsFileStatus getFileInfo(String src, boolean resolveLink) 
     throws AccessControlException, UnresolvedLinkException {
-    if (!DFSUtil.isValidName(src)) {
-      throw new InvalidPathException("Invalid file name: " + src);
-    }
-    if (isPermissionEnabled) {
-      checkTraverse(src);
+    readLock();
+    try {
+      if (!DFSUtil.isValidName(src)) {
+        throw new InvalidPathException("Invalid file name: " + src);
+      }
+      if (isPermissionEnabled) {
+        checkTraverse(src);
+      }
+      return dir.getFileInfo(src, resolveLink);
+    } finally {
+      readUnlock();
     }
-    return dir.getFileInfo(src, resolveLink);
   }
 
   /**
@@ -2227,7 +2336,16 @@ public class FSNamesystem implements FSC
    */
   public boolean mkdirs(String src, PermissionStatus permissions,
       boolean createParent) throws IOException, UnresolvedLinkException {
-    boolean status = mkdirsInternal(src, permissions, createParent);
+    boolean status = false;
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
+    }
+    writeLock();
+    try {
+      status = mkdirsInternal(src, permissions, createParent);
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
@@ -2244,10 +2362,9 @@ public class FSNamesystem implements FSC
   private boolean mkdirsInternal(String src,
       PermissionStatus permissions, boolean createParent) 
       throws IOException, UnresolvedLinkException {
-    writeLock();
-    try {
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
+    assert hasWriteLock();
+    if (isInSafeMode()) {
+      throw new SafeModeException("Cannot create directory " + src, safeMode);
     }
     if (isPermissionEnabled) {
       checkTraverse(src);
@@ -2257,15 +2374,12 @@ public class FSNamesystem implements FSC
       // a new directory is not created.
       return true;
     }
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot create directory " + src, safeMode);
     if (!DFSUtil.isValidName(src)) {
       throw new InvalidPathException(src);
     }
     if (isPermissionEnabled) {
       checkAncestorAccess(src, FsAction.WRITE);
     }
-
     if (!createParent) {
       verifyParentDir(src);
     }
@@ -2279,17 +2393,19 @@ public class FSNamesystem implements FSC
       throw new IOException("Failed to create directory: " + src);
     }
     return true;
-    } finally {
-      writeUnlock();
-    }
   }
 
   ContentSummary getContentSummary(String src) throws AccessControlException,
       FileNotFoundException, UnresolvedLinkException {
-    if (isPermissionEnabled) {
-      checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
+    readLock();
+    try {
+      if (isPermissionEnabled) {
+        checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
+      }
+      return dir.getContentSummary(src);
+    } finally {
+      readUnlock();
     }
-    return dir.getContentSummary(src);
   }
 
   /**
@@ -2299,12 +2415,18 @@ public class FSNamesystem implements FSC
    */
   void setQuota(String path, long nsQuota, long dsQuota) 
       throws IOException, UnresolvedLinkException {
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot set quota on " + path, safeMode);
-    if (isPermissionEnabled) {
-      checkSuperuserPrivilege();
+    writeLock();
+    try {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot set quota on " + path, safeMode);
+      }
+      if (isPermissionEnabled) {
+        checkSuperuserPrivilege();
+      }
+      dir.setQuota(path, nsQuota, dsQuota);
+    } finally {
+      writeUnlock();
     }
-    dir.setQuota(path, nsQuota, dsQuota);
     getEditLog().logSync();
   }
   
@@ -2315,7 +2437,6 @@ public class FSNamesystem implements FSC
    */
   void fsync(String src, String clientName) 
       throws IOException, UnresolvedLinkException {
-
     NameNode.stateChangeLog.info("BLOCK* NameSystem.fsync: file "
                                   + src + " for " + clientName);
     writeLock();
@@ -2328,6 +2449,7 @@ public class FSNamesystem implements FSC
     } finally {
       writeUnlock();
     }
+    getEditLog().logSync();
   }
 
   /**
@@ -2347,9 +2469,8 @@ public class FSNamesystem implements FSC
       String recoveryLeaseHolder) throws AlreadyBeingCreatedException, 
       IOException, UnresolvedLinkException {
     LOG.info("Recovering lease=" + lease + ", src=" + src);
-    
     assert !isInSafeMode();
-
+    assert hasWriteLock();
     INodeFile iFile = dir.getFileINode(src);
     if (iFile == null) {
       final String message = "DIR* NameSystem.internalReleaseLease: "
@@ -2471,6 +2592,7 @@ public class FSNamesystem implements FSC
 
   Lease reassignLease(Lease lease, String src, String newHolder,
       INodeFileUnderConstruction pendingFile) throws IOException {
+    assert hasWriteLock();
     if(newHolder == null)
       return lease;
     logReassignLease(lease.getHolder(), src, newHolder);
@@ -2479,6 +2601,7 @@ public class FSNamesystem implements FSC
   
   Lease reassignLeaseInternal(Lease lease, String src, String newHolder,
       INodeFileUnderConstruction pendingFile) throws IOException {
+    assert hasWriteLock();
     pendingFile.setClientName(newHolder);
     return leaseManager.reassignLease(lease, src, newHolder);
   }
@@ -2487,7 +2610,7 @@ public class FSNamesystem implements FSC
   private void finalizeINodeFileUnderConstruction(String src, 
       INodeFileUnderConstruction pendingFile) 
       throws IOException, UnresolvedLinkException {
-      
+    assert hasWriteLock();
     leaseManager.removeLease(pendingFile.getClientName(), src);
 
     // The file is no longer pending.
@@ -2508,92 +2631,96 @@ public class FSNamesystem implements FSC
     String src = "";
     writeLock();
     try {
-    LOG.info("commitBlockSynchronization(lastblock=" + lastblock
-          + ", newgenerationstamp=" + newgenerationstamp
-          + ", newlength=" + newlength
-          + ", newtargets=" + Arrays.asList(newtargets)
-          + ", closeFile=" + closeFile
-          + ", deleteBlock=" + deleteblock
-          + ")");
-    final BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+          "Cannot commitBlockSynchronization while in safe mode",
+          safeMode);
+      }
+      LOG.info("commitBlockSynchronization(lastblock=" + lastblock
+               + ", newgenerationstamp=" + newgenerationstamp
+               + ", newlength=" + newlength
+               + ", newtargets=" + Arrays.asList(newtargets)
+               + ", closeFile=" + closeFile
+               + ", deleteBlock=" + deleteblock
+               + ")");
+      final BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock
         .getLocalBlock(lastblock));
-    if (storedBlock == null) {
-      throw new IOException("Block (=" + lastblock + ") not found");
-    }
-    INodeFile iFile = storedBlock.getINode();
-    if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
-      throw new IOException("Unexpected block (=" + lastblock
-          + ") since the file (=" + iFile.getLocalName()
-          + ") is not under construction");
-    }
-
-    long recoveryId =
-      ((BlockInfoUnderConstruction)storedBlock).getBlockRecoveryId();
-    if(recoveryId != newgenerationstamp) {
-      throw new IOException("The recovery id " + newgenerationstamp
-          + " does not match current recovery id "
-          + recoveryId + " for block " + lastblock); 
-    }
-        
-    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
+      if (storedBlock == null) {
+        throw new IOException("Block (=" + lastblock + ") not found");
+      }
+      INodeFile iFile = storedBlock.getINode();
+      if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
+        throw new IOException("Unexpected block (=" + lastblock
+                              + ") since the file (=" + iFile.getLocalName()
+                              + ") is not under construction");
+      }
+
+      long recoveryId =
+        ((BlockInfoUnderConstruction)storedBlock).getBlockRecoveryId();
+      if(recoveryId != newgenerationstamp) {
+        throw new IOException("The recovery id " + newgenerationstamp
+                              + " does not match current recovery id "
+                              + recoveryId + " for block " + lastblock); 
+      }
+
+      INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
 
-    if (deleteblock) {
-      pendingFile.removeLastBlock(ExtendedBlock.getLocalBlock(lastblock));
-      blockManager.removeBlockFromMap(storedBlock);
-    }
-    else {
-      // update last block
-      storedBlock.setGenerationStamp(newgenerationstamp);
-      storedBlock.setNumBytes(newlength);
-
-      // find the DatanodeDescriptor objects
-      // There should be no locations in the blockManager till now because the
-      // file is underConstruction
-      DatanodeDescriptor[] descriptors = null;
-      if (newtargets.length > 0) {
-        descriptors = new DatanodeDescriptor[newtargets.length];
-        for(int i = 0; i < newtargets.length; i++) {
-          descriptors[i] = getDatanode(newtargets[i]);
+      if (deleteblock) {
+        pendingFile.removeLastBlock(ExtendedBlock.getLocalBlock(lastblock));
+        blockManager.removeBlockFromMap(storedBlock);
+      }
+      else {
+        // update last block
+        storedBlock.setGenerationStamp(newgenerationstamp);
+        storedBlock.setNumBytes(newlength);
+
+        // find the DatanodeDescriptor objects
+        // There should be no locations in the blockManager till now because the
+        // file is underConstruction
+        DatanodeDescriptor[] descriptors = null;
+        if (newtargets.length > 0) {
+          descriptors = new DatanodeDescriptor[newtargets.length];
+          for(int i = 0; i < newtargets.length; i++) {
+            descriptors[i] = getDatanode(newtargets[i]);
+          }
+        }
+        if (closeFile) {
+          // the file is getting closed. Insert block locations into blockManager.
+          // Otherwise fsck will report these blocks as MISSING, especially if the
+          // blocksReceived from Datanodes take a long time to arrive.
+          for (int i = 0; i < descriptors.length; i++) {
+            descriptors[i].addBlock(storedBlock);
+          }
         }
+        // add pipeline locations into the INodeUnderConstruction
+        pendingFile.setLastBlock(storedBlock, descriptors);
       }
+
+      src = leaseManager.findPath(pendingFile);
       if (closeFile) {
-        // the file is getting closed. Insert block locations into blockManager.
-        // Otherwise fsck will report these blocks as MISSING, especially if the
-        // blocksReceived from Datanodes take a long time to arrive.
-        for (int i = 0; i < descriptors.length; i++) {
-          descriptors[i].addBlock(storedBlock);
-        }
-      }
-      // add pipeline locations into the INodeUnderConstruction
-      pendingFile.setLastBlock(storedBlock, descriptors);
-    }
-
-    // If this commit does not want to close the file, persist
-    // blocks only if append is supported and return
-    src = leaseManager.findPath(pendingFile);
-    if (!closeFile) {
-      if (supportAppends) {
+        // commit the last block and complete it if it has minimum replicas
+        blockManager.commitOrCompleteLastBlock(pendingFile, storedBlock);
+
+        //remove lease, close file
+        finalizeINodeFileUnderConstruction(src, pendingFile);
+      } else if (supportAppends) {
+        // If this commit does not want to close the file, persist
+        // blocks only if append is supported 
         dir.persistBlocks(src, pendingFile);
-        getEditLog().logSync();
       }
-      LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
-      return;
-    }
-
-    // commit the last block and complete it if it has minimum replicas
-    blockManager.commitOrCompleteLastBlock(pendingFile, storedBlock);
-
-    //remove lease, close file
-    finalizeINodeFileUnderConstruction(src, pendingFile);
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    LOG.info("commitBlockSynchronization(newblock=" + lastblock
+    if (closeFile) {
+      LOG.info("commitBlockSynchronization(newblock=" + lastblock
           + ", file=" + src
           + ", newgenerationstamp=" + newgenerationstamp
           + ", newlength=" + newlength
           + ", newtargets=" + Arrays.asList(newtargets) + ") successful");
+    } else {
+      LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
+    }
   }
 
 
@@ -2601,9 +2728,15 @@ public class FSNamesystem implements FSC
    * Renew the lease(s) held by the given client
    */
   void renewLease(String holder) throws IOException {
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
-    leaseManager.renewLease(holder);
+    writeLock();
+    try {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
+      }
+      leaseManager.renewLease(holder);
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
@@ -2621,20 +2754,26 @@ public class FSNamesystem implements FSC
   public DirectoryListing getListing(String src, byte[] startAfter,
       boolean needLocation) 
     throws AccessControlException, UnresolvedLinkException, IOException {
-    if (isPermissionEnabled) {
-      if (dir.isDir(src)) {
-        checkPathAccess(src, FsAction.READ_EXECUTE);
+    DirectoryListing dl;
+    readLock();
+    try {
+      if (isPermissionEnabled) {
+        if (dir.isDir(src)) {
+          checkPathAccess(src, FsAction.READ_EXECUTE);
+        } else {
+          checkTraverse(src);
+        }
       }
-      else {
-        checkTraverse(src);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(UserGroupInformation.getCurrentUser(),
+                      Server.getRemoteIp(),
+                      "listStatus", src, null, null);
       }
+      dl = dir.getListing(src, startAfter, needLocation);
+    } finally {
+      readUnlock();
     }
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    Server.getRemoteIp(),
-                    "listStatus", src, null, null);
-    }
-    return dir.getListing(src, startAfter, needLocation);
+    return dl;
   }
 
   /////////////////////////////////////////////////////////
@@ -2665,10 +2804,20 @@ public class FSNamesystem implements FSC
    * 
    * @see org.apache.hadoop.hdfs.server.datanode.DataNode
    */
-  public void registerDatanode(DatanodeRegistration nodeReg
-                                            ) throws IOException {
+  public void registerDatanode(DatanodeRegistration nodeReg)
+      throws IOException {
     writeLock();
     try {
+      registerDatanodeInternal(nodeReg);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  /** @see #registerDatanode(DatanodeRegistration) */
+  public void registerDatanodeInternal(DatanodeRegistration nodeReg)
+      throws IOException {
+    assert hasWriteLock();
     String dnAddress = Server.getRemoteAddress();
     if (dnAddress == null) {
       // Mostly called inside an RPC.
@@ -2785,17 +2934,12 @@ public class FSNamesystem implements FSC
       // because its is done when the descriptor is created
     }
 
-    if (safeMode != null) {
-      safeMode.checkMode();
-    }
-    return;
-    } finally {
-      writeUnlock();
-    }
+    checkSafeMode();
   }
     
   /* Resolve a node's network location */
   private void resolveNetworkLocation (DatanodeDescriptor node) {
+    assert hasWriteLock();
     List<String> names = new ArrayList<String>(1);
     if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
       // get the node's IP address
@@ -2872,7 +3016,23 @@ public class FSNamesystem implements FSC
   DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
       long capacity, long dfsUsed, long remaining, long blockPoolUsed,
       int xceiverCount, int xmitsInProgress, int failedVolumes) 
-      throws IOException {
+        throws IOException {
+    readLock();
+    try {
+      return handleHeartbeatInternal(nodeReg, capacity, dfsUsed, 
+          remaining, blockPoolUsed, xceiverCount, xmitsInProgress, 
+          failedVolumes);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  /** @see #handleHeartbeat(DatanodeRegistration, long, long, long, long, int, int, int) */
+  DatanodeCommand[] handleHeartbeatInternal(DatanodeRegistration nodeReg,
+      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+      int xceiverCount, int xmitsInProgress, int failedVolumes) 
+        throws IOException {
+    assert hasReadLock();
     DatanodeCommand cmd = null;
     synchronized (heartbeats) {
       synchronized (datanodeMap) {
@@ -3113,10 +3273,14 @@ public class FSNamesystem implements FSC
     int workFound = 0;
     int blocksToProcess = 0;
     int nodesToProcess = 0;
-    // blocks should not be replicated or removed if safe mode is on
+    // Blocks should not be replicated or removed if in safe mode.
+    // It's OK to check safe mode here w/o holding lock, in the worst
+    // case extra replications will be scheduled, and these will get
+    // fixed up later.
     if (isInSafeMode())
       return workFound;
-    synchronized(heartbeats) {
+
+    synchronized (heartbeats) {
       blocksToProcess = (int)(heartbeats.size() 
           * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
       nodesToProcess = (int)Math.ceil((double)heartbeats.size() 
@@ -3150,13 +3314,13 @@ public class FSNamesystem implements FSC
     throws IOException {
     writeLock();
     try {
-    DatanodeDescriptor nodeInfo = getDatanode(nodeID);
-    if (nodeInfo != null) {
-      removeDatanode(nodeInfo);
-    } else {
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
-                                   + nodeID.getName() + " does not exist");
-    }
+      DatanodeDescriptor nodeInfo = getDatanode(nodeID);
+      if (nodeInfo != null) {
+        removeDatanode(nodeInfo);
+      } else {
+        NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
+                                     + nodeID.getName() + " does not exist");
+      }
     } finally {
       writeUnlock();
     }
@@ -3167,6 +3331,7 @@ public class FSNamesystem implements FSC
    * @param nodeInfo datanode descriptor.
    */
   private void removeDatanode(DatanodeDescriptor nodeInfo) {
+    assert hasWriteLock();
     synchronized (heartbeats) {
       if (nodeInfo.isAlive) {
         updateStats(nodeInfo, false);
@@ -3180,14 +3345,13 @@ public class FSNamesystem implements FSC
       blockManager.removeStoredBlock(it.next(), nodeInfo);
     }
     unprotectedRemoveDatanode(nodeInfo);
-    clusterMap.remove(nodeInfo);
-
-    if (safeMode != null) {
-      safeMode.checkMode();
-    }
+    clusterMap.remove(nodeInfo);
+
+    checkSafeMode();
   }
 
   void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
+    assert hasWriteLock();
     nodeDescr.resetBlocks();
     blockManager.removeFromInvalidates(nodeDescr.getStorageID());
     if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -3198,12 +3362,14 @@ public class FSNamesystem implements FSC
   }
     
   void unprotectedAddDatanode(DatanodeDescriptor nodeDescr) {
-    /* To keep host2DataNodeMap consistent with datanodeMap,
-       remove  from host2DataNodeMap the datanodeDescriptor removed
-       from datanodeMap before adding nodeDescr to host2DataNodeMap.
-    */
-    host2DataNodeMap.remove(
+    assert hasWriteLock();
+    // To keep host2DataNodeMap consistent with datanodeMap,
+    // remove  from host2DataNodeMap the datanodeDescriptor removed
+    // from datanodeMap before adding nodeDescr to host2DataNodeMap.
+    synchronized (datanodeMap) {
+      host2DataNodeMap.remove(
                             datanodeMap.put(nodeDescr.getStorageID(), nodeDescr));
+    }
     host2DataNodeMap.add(nodeDescr);
       
     if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -3220,8 +3386,11 @@ public class FSNamesystem implements FSC
    * @throws IOException
    */
   void wipeDatanode(DatanodeID nodeID) throws IOException {
+    assert hasWriteLock();
     String key = nodeID.getStorageID();
-    host2DataNodeMap.remove(datanodeMap.remove(key));
+    synchronized (datanodeMap) {
+      host2DataNodeMap.remove(datanodeMap.remove(key));
+    }
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug(
           "BLOCK* NameSystem.wipeDatanode: "
@@ -3246,8 +3415,9 @@ public class FSNamesystem implements FSC
    * effect causes more datanodes to be declared dead.
    */
   void heartbeatCheck() {
+    // It's OK to check safe mode w/o taking the lock here, we re-check
+    // for safe mode after taking the lock before removing a datanode.
     if (isInSafeMode()) {
-      // not to check dead nodes if in safemode
       return;
     }
     boolean allAlive = false;
@@ -3272,6 +3442,9 @@ public class FSNamesystem implements FSC
       // acquire the fsnamesystem lock, and then remove the dead node.
       if (foundDead) {
         writeLock();
+        if (isInSafeMode()) {
+          return;
+        }
         try {
           synchronized(heartbeats) {
             synchronized (datanodeMap) {
@@ -3307,21 +3480,21 @@ public class FSNamesystem implements FSC
     writeLock();
     startTime = now(); //after acquiring write lock
     try {
-    DatanodeDescriptor node = getDatanode(nodeID);
-    if (node == null || !node.isAlive) {
-      throw new IOException("ProcessReport from dead or unregistered node: "
-                            + nodeID.getName());
-    }
-    // To minimize startup time, we discard any second (or later) block reports
-    // that we receive while still in startup phase.
-    if (isInStartupSafeMode() && node.numBlocks() > 0) {
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
-          + "discarded non-initial block report from " + nodeID.getName()
-          + " because namenode still in startup phase");
-      return;
-    }
-
-    blockManager.processReport(node, newReport);
+      DatanodeDescriptor node = getDatanode(nodeID);
+      if (node == null || !node.isAlive) {
+        throw new IOException("ProcessReport from dead or unregistered node: "
+                              + nodeID.getName());
+      }
+      // To minimize startup time, we discard any second (or later) block reports
+      // that we receive while still in startup phase.
+      if (isInStartupSafeMode() && node.numBlocks() > 0) {
+        NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
+            + "discarded non-initial block report from " + nodeID.getName()
+            + " because namenode still in startup phase");
+        return;
+      }
+  
+      blockManager.processReport(node, newReport);
     } finally {
       endTime = now();
       writeUnlock();
@@ -3353,6 +3526,7 @@ public class FSNamesystem implements FSC
                               DatanodeDescriptor addedNode,
                               DatanodeDescriptor delNodeHint,
                               BlockPlacementPolicy replicator) {
+    assert hasWriteLock();
     // first form a rack to datanodes map and
     INodeFile inode = blockManager.getINode(b);
     HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
@@ -3446,20 +3620,20 @@ public class FSNamesystem implements FSC
                                          ) throws IOException {
     writeLock();
     try {
-    DatanodeDescriptor node = getDatanode(nodeID);
-    if (node == null || !node.isAlive) {
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
-          + " is received from dead or unregistered node " + nodeID.getName());
-      throw new IOException(
-          "Got blockReceived message from unregistered or dead node " + block);
-    }
-        
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
-                                    +block+" is received from " + nodeID.getName());
-    }
-
-    blockManager.addBlock(node, block, delHint);
+      DatanodeDescriptor node = getDatanode(nodeID);
+      if (node == null || !node.isAlive) {
+        NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
+            + " is received from dead or unregistered node " + nodeID.getName());
+        throw new IOException(
+            "Got blockReceived message from unregistered or dead node " + block);
+      }
+          
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
+                                      +block+" is received from " + nodeID.getName());
+      }
+  
+      blockManager.addBlock(node, block, delHint);
     } finally {
       writeUnlock();
     }
@@ -3576,75 +3750,74 @@ public class FSNamesystem implements FSC
   }
 
   private ArrayList<DatanodeDescriptor> getDatanodeListForReport(
-                                                      DatanodeReportType type) {
-    boolean listLiveNodes = type == DatanodeReportType.ALL ||
-                            type == DatanodeReportType.LIVE;
-    boolean listDeadNodes = type == DatanodeReportType.ALL ||
-                            type == DatanodeReportType.DEAD;
-
-    HashMap<String, String> mustList = new HashMap<String, String>();
-
+      DatanodeReportType type) {
     readLock();
-    try {
-    if (listDeadNodes) {
-      //first load all the nodes listed in include and exclude files.
-      for (Iterator<String> it = hostsReader.getHosts().iterator(); 
-           it.hasNext();) {
-        mustList.put(it.next(), "");
+    try {    
+      boolean listLiveNodes = type == DatanodeReportType.ALL ||
+                              type == DatanodeReportType.LIVE;
+      boolean listDeadNodes = type == DatanodeReportType.ALL ||
+                              type == DatanodeReportType.DEAD;
+  
+      HashMap<String, String> mustList = new HashMap<String, String>();
+  
+      if (listDeadNodes) {
+        //first load all the nodes listed in include and exclude files.
+        Iterator<String> it = hostsReader.getHosts().iterator();
+        while (it.hasNext()) {
+          mustList.put(it.next(), "");
+        }
+        it = hostsReader.getExcludedHosts().iterator(); 
+        while (it.hasNext()) {
+          mustList.put(it.next(), "");
+        }
       }
-      for (Iterator<String> it = hostsReader.getExcludedHosts().iterator(); 
-           it.hasNext();) {
-        mustList.put(it.next(), "");
+
+      ArrayList<DatanodeDescriptor> nodes = null;
+      
+      synchronized (datanodeMap) {
+        nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() + 
+                                                  mustList.size());
+        Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
+        while (it.hasNext()) { 
+          DatanodeDescriptor dn = it.next();
+          boolean isDead = isDatanodeDead(dn);
+          if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
+            nodes.add(dn);
+          }
+          //Remove any form of the this datanode in include/exclude lists.
+          mustList.remove(dn.getName());
+          mustList.remove(dn.getHost());
+          mustList.remove(dn.getHostName());
+        }
       }
-    }
-   
-    ArrayList<DatanodeDescriptor> nodes = null;
-    
-    synchronized (datanodeMap) {
-      nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() + 
-                                                mustList.size());
       
-      for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); 
-                                                               it.hasNext();) {
-        DatanodeDescriptor dn = it.next();
-        boolean isDead = isDatanodeDead(dn);
-        if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
+      if (listDeadNodes) {
+        Iterator<String> it = mustList.keySet().iterator();
+        while (it.hasNext()) {
+          DatanodeDescriptor dn = 
+              new DatanodeDescriptor(new DatanodeID(it.next()));
+          dn.setLastUpdate(0);
           nodes.add(dn);
         }
-        //Remove any form of the this datanode in include/exclude lists.
-        mustList.remove(dn.getName());
-        mustList.remove(dn.getHost());
-        mustList.remove(dn.getHostName());
       }
-    }
-    
-    if (listDeadNodes) {
-      for (Iterator<String> it = mustList.keySet().iterator(); it.hasNext();) {
-        DatanodeDescriptor dn = 
-            new DatanodeDescriptor(new DatanodeID(it.next()));
-        dn.setLastUpdate(0);
-        nodes.add(dn);
-      }
-    }
-    
-    return nodes;
+      return nodes;
     } finally {
       readUnlock();
     }
   }
 
-  public DatanodeInfo[] datanodeReport( DatanodeReportType type
-      ) throws AccessControlException {
+  public DatanodeInfo[] datanodeReport( DatanodeReportType type)
+      throws AccessControlException {
     readLock();
     try {
-    checkSuperuserPrivilege();
-
-    ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
-    DatanodeInfo[] arr = new DatanodeInfo[results.size()];
-    for (int i=0; i<arr.length; i++) {
-      arr[i] = new DatanodeInfo(results.get(i));
-    }
-    return arr;
+      checkSuperuserPrivilege();
+  
+      ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
+      DatanodeInfo[] arr = new DatanodeInfo[results.size()];
+      for (int i=0; i<arr.length; i++) {
+        arr[i] = new DatanodeInfo(results.get(i));
+      }
+      return arr;
     } finally {
       readUnlock();
     }

[... 863 lines stripped ...]