You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/06/21 22:09:57 UTC

svn commit: r1138160 [3/6] - in /hadoop/common/branches/HDFS-1073/hdfs: ./ bin/ src/c++/libhdfs/ src/c++/libhdfs/tests/ src/contrib/ src/contrib/fuse-dfs/ src/contrib/fuse-dfs/src/ src/contrib/hdfsproxy/ src/docs/src/documentation/content/xdocs/ src/ja...

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Jun 21 20:09:54 2011
@@ -51,7 +51,6 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.util.ByteArray;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.security.UserGroupInformation;
 
 /*************************************************
  * FSDirectory stores the filesystem directory state.
@@ -72,29 +71,33 @@ class FSDirectory implements Closeable {
   private final int maxDirItems;
   private final int lsLimit;  // max list limit
 
-  // lock to protect BlockMap.
-  private ReentrantReadWriteLock bLock;
+  // lock to protect the directory and BlockMap
+  private ReentrantReadWriteLock dirLock;
   private Condition cond;
 
   // utility methods to acquire and release read lock and write lock
   void readLock() {
-    this.bLock.readLock().lock();
+    this.dirLock.readLock().lock();
   }
 
   void readUnlock() {
-    this.bLock.readLock().unlock();
+    this.dirLock.readLock().unlock();
   }
 
   void writeLock() {
-    this.bLock.writeLock().lock();
+    this.dirLock.writeLock().lock();
   }
 
   void writeUnlock() {
-    this.bLock.writeLock().unlock();
+    this.dirLock.writeLock().unlock();
   }
 
   boolean hasWriteLock() {
-    return this.bLock.isWriteLockedByCurrentThread();
+    return this.dirLock.isWriteLockedByCurrentThread();
+  }
+
+  boolean hasReadLock() {
+    return this.dirLock.getReadHoldCount() > 0;
   }
 
   /**
@@ -109,8 +112,8 @@ class FSDirectory implements Closeable {
   }
 
   FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
-    this.bLock = new ReentrantReadWriteLock(true); // fair
-    this.cond = bLock.writeLock().newCondition();
+    this.dirLock = new ReentrantReadWriteLock(true); // fair
+    this.cond = dirLock.writeLock().newCondition();
     fsImage.setFSNamesystem(ns);
     rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
         ns.createFsOwnerPermissions(new FsPermission((short)0755)),
@@ -277,6 +280,7 @@ class FSDirectory implements Closeable {
       throws UnresolvedLinkException {
     INode newNode;
     long diskspace = UNKNOWN_DISK_SPACE;
+    assert hasWriteLock();
     if (blocks == null)
       newNode = new INodeDirectory(permissions, modificationTime);
     else {
@@ -465,8 +469,13 @@ class FSDirectory implements Closeable {
     }
     waitForReady();
     long now = now();
-    if (!unprotectedRenameTo(src, dst, now))
-      return false;
+    writeLock();
+    try {
+      if (!unprotectedRenameTo(src, dst, now))
+        return false;
+    } finally {
+      writeUnlock();
+    }
     fsImage.getEditLog().logRename(src, dst, now);
     return true;
   }
@@ -484,8 +493,13 @@ class FSDirectory implements Closeable {
     }
     waitForReady();
     long now = now();
-    if (unprotectedRenameTo(src, dst, now, options)) {
-      incrDeletedFileCount(1);
+    writeLock();
+    try {
+      if (unprotectedRenameTo(src, dst, now, options)) {
+        incrDeletedFileCount(1);
+      }
+    } finally {
+      writeUnlock();
     }
     fsImage.getEditLog().logRename(src, dst, now, options);
   }
@@ -504,108 +518,104 @@ class FSDirectory implements Closeable {
   boolean unprotectedRenameTo(String src, String dst, long timestamp)
     throws QuotaExceededException, UnresolvedLinkException, 
     FileAlreadyExistsException {
-    writeLock();
-    try {
-      INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
-      INode srcInode = srcInodes[srcInodes.length-1];
-      
-      // check the validation of the source
-      if (srcInode == null) {
+    assert hasWriteLock();
+    INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
+    INode srcInode = srcInodes[srcInodes.length-1];
+    
+    // check the validation of the source
+    if (srcInode == null) {
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + "failed to rename " + src + " to " + dst
+          + " because source does not exist");
+      return false;
+    } 
+    if (srcInodes.length == 1) {
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          +"failed to rename "+src+" to "+dst+ " because source is the root");
+      return false;
+    }
+    if (isDir(dst)) {
+      dst += Path.SEPARATOR + new Path(src).getName();
+    }
+    
+    // check the validity of the destination
+    if (dst.equals(src)) {
+      return true;
+    }
+    if (srcInode.isLink() && 
+        dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
+      throw new FileAlreadyExistsException(
+          "Cannot rename symlink "+src+" to its target "+dst);
+    }
+    
+    // dst cannot be directory or a file under src
+    if (dst.startsWith(src) && 
+        dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + "failed to rename " + src + " to " + dst
+          + " because destination starts with src");
+      return false;
+    }
+    
+    byte[][] dstComponents = INode.getPathComponents(dst);
+    INode[] dstInodes = new INode[dstComponents.length];
+    rootDir.getExistingPathINodes(dstComponents, dstInodes, false);
+    if (dstInodes[dstInodes.length-1] != null) {
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+                                   +"failed to rename "+src+" to "+dst+ 
+                                   " because destination exists");
+      return false;
+    }
+    if (dstInodes[dstInodes.length-2] == null) {
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          +"failed to rename "+src+" to "+dst+ 
+          " because destination's parent does not exist");
+      return false;
+    }
+    
+    // Ensure dst has quota to accommodate rename
+    verifyQuotaForRename(srcInodes,dstInodes);
+    
+    INode dstChild = null;
+    INode srcChild = null;
+    String srcChildName = null;
+    try {
+      // remove src
+      srcChild = removeChild(srcInodes, srcInodes.length-1);
+      if (srcChild == null) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             + "failed to rename " + src + " to " + dst
-            + " because source does not exist");
-        return false;
-      } 
-      if (srcInodes.length == 1) {
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            +"failed to rename "+src+" to "+dst+ " because source is the root");
+            + " because the source can not be removed");
         return false;
       }
-      if (isDir(dst)) {
-        dst += Path.SEPARATOR + new Path(src).getName();
-      }
+      srcChildName = srcChild.getLocalName();
+      srcChild.setLocalName(dstComponents[dstInodes.length-1]);
       
-      // check the validity of the destination
-      if (dst.equals(src)) {
+      // add src to the destination
+      dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+          srcChild, UNKNOWN_DISK_SPACE, false);
+      if (dstChild != null) {
+        srcChild = null;
+        if (NameNode.stateChangeLog.isDebugEnabled()) {
+          NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: " 
+              + src + " is renamed to " + dst);
+        }
+        // update modification time of dst and the parent of src
+        srcInodes[srcInodes.length-2].setModificationTime(timestamp);
+        dstInodes[dstInodes.length-2].setModificationTime(timestamp);
         return true;
       }
-      if (srcInode.isLink() && 
-          dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
-        throw new FileAlreadyExistsException(
-            "Cannot rename symlink "+src+" to its target "+dst);
-      }
-      
-      // dst cannot be directory or a file under src
-      if (dst.startsWith(src) && 
-          dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + "failed to rename " + src + " to " + dst
-            + " because destination starts with src");
-        return false;
-      }
-      
-      byte[][] dstComponents = INode.getPathComponents(dst);
-      INode[] dstInodes = new INode[dstComponents.length];
-      rootDir.getExistingPathINodes(dstComponents, dstInodes, false);
-      if (dstInodes[dstInodes.length-1] != null) {
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-                                     +"failed to rename "+src+" to "+dst+ 
-                                     " because destination exists");
-        return false;
-      }
-      if (dstInodes[dstInodes.length-2] == null) {
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            +"failed to rename "+src+" to "+dst+ 
-            " because destination's parent does not exist");
-        return false;
-      }
-      
-      // Ensure dst has quota to accommodate rename
-      verifyQuotaForRename(srcInodes,dstInodes);
-      
-      INode dstChild = null;
-      INode srcChild = null;
-      String srcChildName = null;
-      try {
-        // remove src
-        srcChild = removeChild(srcInodes, srcInodes.length-1);
-        if (srcChild == null) {
-          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-              + "failed to rename " + src + " to " + dst
-              + " because the source can not be removed");
-          return false;
-        }
-        srcChildName = srcChild.getLocalName();
-        srcChild.setLocalName(dstComponents[dstInodes.length-1]);
-        
-        // add src to the destination
-        dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
-            srcChild, UNKNOWN_DISK_SPACE, false);
-        if (dstChild != null) {
-          srcChild = null;
-          if (NameNode.stateChangeLog.isDebugEnabled()) {
-            NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: " 
-                + src + " is renamed to " + dst);
-          }
-          // update modification time of dst and the parent of src
-          srcInodes[srcInodes.length-2].setModificationTime(timestamp);
-          dstInodes[dstInodes.length-2].setModificationTime(timestamp);
-          return true;
-        }
-      } finally {
-        if (dstChild == null && srcChild != null) {
-          // put it back
-          srcChild.setLocalName(srcChildName);
-          addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, srcChild, 
-              UNKNOWN_DISK_SPACE, false);
-        }
-      }
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          +"failed to rename "+src+" to "+dst);
-      return false;
     } finally {
-      writeUnlock();
+      if (dstChild == null && srcChild != null) {
+        // put it back
+        srcChild.setLocalName(srcChildName);
+        addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, srcChild, 
+            UNKNOWN_DISK_SPACE, false);
+      }
     }
+    NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+        +"failed to rename "+src+" to "+dst);
+    return false;
   }
 
   /**
@@ -622,6 +632,7 @@ class FSDirectory implements Closeable {
       Options.Rename... options) throws FileAlreadyExistsException,
       FileNotFoundException, ParentNotDirectoryException,
       QuotaExceededException, UnresolvedLinkException, IOException {
+    assert hasWriteLock();
     boolean overwrite = false;
     if (null != options) {
       for (Rename option : options) {
@@ -631,157 +642,152 @@ class FSDirectory implements Closeable {
       }
     }
     String error = null;
-    writeLock();
-    try {
-      final INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
-      final INode srcInode = srcInodes[srcInodes.length - 1];
-      // validate source
-      if (srcInode == null) {
-        error = "rename source " + src + " is not found.";
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new FileNotFoundException(error);
-      }
-      if (srcInodes.length == 1) {
-        error = "rename source cannot be the root";
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new IOException(error);
-      }
+    final INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
+    final INode srcInode = srcInodes[srcInodes.length - 1];
+    // validate source
+    if (srcInode == null) {
+      error = "rename source " + src + " is not found.";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new FileNotFoundException(error);
+    }
+    if (srcInodes.length == 1) {
+      error = "rename source cannot be the root";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
 
-      // validate the destination
-      if (dst.equals(src)) {
-        throw new FileAlreadyExistsException(
-            "The source "+src+" and destination "+dst+" are the same");
-      }
-      if (srcInode.isLink() && 
-          dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
-        throw new FileAlreadyExistsException(
-            "Cannot rename symlink "+src+" to its target "+dst);
-      }
-      // dst cannot be a directory or a file under src
-      if (dst.startsWith(src) && 
-          dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
-        error = "Rename destination " + dst
-            + " is a directory or file under source " + src;
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new IOException(error);
-      }
-      final byte[][] dstComponents = INode.getPathComponents(dst);
-      final INode[] dstInodes = new INode[dstComponents.length];
-      rootDir.getExistingPathINodes(dstComponents, dstInodes, false);
-      INode dstInode = dstInodes[dstInodes.length - 1];
-      if (dstInodes.length == 1) {
-        error = "rename destination cannot be the root";
+    // validate the destination
+    if (dst.equals(src)) {
+      throw new FileAlreadyExistsException(
+          "The source "+src+" and destination "+dst+" are the same");
+    }
+    if (srcInode.isLink() && 
+        dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
+      throw new FileAlreadyExistsException(
+          "Cannot rename symlink "+src+" to its target "+dst);
+    }
+    // dst cannot be a directory or a file under src
+    if (dst.startsWith(src) && 
+        dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
+      error = "Rename destination " + dst
+          + " is a directory or file under source " + src;
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+    final byte[][] dstComponents = INode.getPathComponents(dst);
+    final INode[] dstInodes = new INode[dstComponents.length];
+    rootDir.getExistingPathINodes(dstComponents, dstInodes, false);
+    INode dstInode = dstInodes[dstInodes.length - 1];
+    if (dstInodes.length == 1) {
+      error = "rename destination cannot be the root";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+    if (dstInode != null) { // Destination exists
+      // It's OK to rename a file to a symlink and vice versa
+      if (dstInode.isDirectory() != srcInode.isDirectory()) {
+        error = "Source " + src + " and destination " + dst
+            + " must both be directories";
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             + error);
         throw new IOException(error);
       }
-      if (dstInode != null) { // Destination exists
-        // It's OK to rename a file to a symlink and vice versa
-        if (dstInode.isDirectory() != srcInode.isDirectory()) {
-          error = "Source " + src + " and destination " + dst
-              + " must both be directories";
-          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-              + error);
-          throw new IOException(error);
-        }
-        if (!overwrite) { // If destination exists, overwrite flag must be true
-          error = "rename destination " + dst + " already exists";
-          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-              + error);
-          throw new FileAlreadyExistsException(error);
-        }
-        List<INode> children = dstInode.isDirectory() ? 
-            ((INodeDirectory) dstInode).getChildrenRaw() : null;
-        if (children != null && children.size() != 0) {
-          error = "rename cannot overwrite non empty destination directory "
-              + dst;
-          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-              + error);
-          throw new IOException(error);
-        }
-      }
-      if (dstInodes[dstInodes.length - 2] == null) {
-        error = "rename destination parent " + dst + " not found.";
+      if (!overwrite) { // If destination exists, overwrite flag must be true
+        error = "rename destination " + dst + " already exists";
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             + error);
-        throw new FileNotFoundException(error);
+        throw new FileAlreadyExistsException(error);
       }
-      if (!dstInodes[dstInodes.length - 2].isDirectory()) {
-        error = "rename destination parent " + dst + " is a file.";
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new ParentNotDirectoryException(error);
-      }
-
-      // Ensure dst has quota to accommodate rename
-      verifyQuotaForRename(srcInodes, dstInodes);
-      INode removedSrc = removeChild(srcInodes, srcInodes.length - 1);
-      if (removedSrc == null) {
-        error = "Failed to rename " + src + " to " + dst
-            + " because the source can not be removed";
+      List<INode> children = dstInode.isDirectory() ? 
+          ((INodeDirectory) dstInode).getChildrenRaw() : null;
+      if (children != null && children.size() != 0) {
+        error = "rename cannot overwrite non empty destination directory "
+            + dst;
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             + error);
         throw new IOException(error);
       }
-      final String srcChildName = removedSrc.getLocalName();
-      String dstChildName = null;
-      INode removedDst = null;
-      try {
-        if (dstInode != null) { // dst exists remove it
-          removedDst = removeChild(dstInodes, dstInodes.length - 1);
-          dstChildName = removedDst.getLocalName();
-        }
+    }
+    if (dstInodes[dstInodes.length - 2] == null) {
+      error = "rename destination parent " + dst + " not found.";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new FileNotFoundException(error);
+    }
+    if (!dstInodes[dstInodes.length - 2].isDirectory()) {
+      error = "rename destination parent " + dst + " is a file.";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new ParentNotDirectoryException(error);
+    }
 
-        INode dstChild = null;
-        removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
-        // add src as dst to complete rename
-        dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
-            removedSrc, UNKNOWN_DISK_SPACE, false);
-
-        int filesDeleted = 0;
-        if (dstChild != null) {
-          removedSrc = null;
-          if (NameNode.stateChangeLog.isDebugEnabled()) {
-            NameNode.stateChangeLog.debug(
-                "DIR* FSDirectory.unprotectedRenameTo: " + src
-                + " is renamed to " + dst);
-          }
-          srcInodes[srcInodes.length - 2].setModificationTime(timestamp);
-          dstInodes[dstInodes.length - 2].setModificationTime(timestamp);
+    // Ensure dst has quota to accommodate rename
+    verifyQuotaForRename(srcInodes, dstInodes);
+    INode removedSrc = removeChild(srcInodes, srcInodes.length - 1);
+    if (removedSrc == null) {
+      error = "Failed to rename " + src + " to " + dst
+          + " because the source can not be removed";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+    final String srcChildName = removedSrc.getLocalName();
+    String dstChildName = null;
+    INode removedDst = null;
+    try {
+      if (dstInode != null) { // dst exists remove it
+        removedDst = removeChild(dstInodes, dstInodes.length - 1);
+        dstChildName = removedDst.getLocalName();
+      }
 
-          // Collect the blocks and remove the lease for previous dst
-          if (removedDst != null) {
-            INode rmdst = removedDst;
-            removedDst = null;
-            List<Block> collectedBlocks = new ArrayList<Block>();
-            filesDeleted = rmdst.collectSubtreeBlocksAndClear(collectedBlocks);
-            getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
-          }
-          return filesDeleted >0;
-        }
-      } finally {
-        if (removedSrc != null) {
-          // Rename failed - restore src
-          removedSrc.setLocalName(srcChildName);
-          addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, removedSrc, 
-              UNKNOWN_DISK_SPACE, false);
+      INode dstChild = null;
+      removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
+      // add src as dst to complete rename
+      dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+          removedSrc, UNKNOWN_DISK_SPACE, false);
+
+      int filesDeleted = 0;
+      if (dstChild != null) {
+        removedSrc = null;
+        if (NameNode.stateChangeLog.isDebugEnabled()) {
+          NameNode.stateChangeLog.debug(
+              "DIR* FSDirectory.unprotectedRenameTo: " + src
+              + " is renamed to " + dst);
         }
+        srcInodes[srcInodes.length - 2].setModificationTime(timestamp);
+        dstInodes[dstInodes.length - 2].setModificationTime(timestamp);
+
+        // Collect the blocks and remove the lease for previous dst
         if (removedDst != null) {
-          // Rename failed - restore dst
-          removedDst.setLocalName(dstChildName);
-          addChildNoQuotaCheck(dstInodes, dstInodes.length - 1, removedDst, 
-              UNKNOWN_DISK_SPACE, false);
-        }
-      }
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + "failed to rename " + src + " to " + dst);
-      throw new IOException("rename from " + src + " to " + dst + " failed.");
-    } finally {
-      writeUnlock();
-    }
+          INode rmdst = removedDst;
+          removedDst = null;
+          List<Block> collectedBlocks = new ArrayList<Block>();
+          filesDeleted = rmdst.collectSubtreeBlocksAndClear(collectedBlocks);
+          getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
+        }
+        return filesDeleted >0;
+      }
+    } finally {
+      if (removedSrc != null) {
+        // Rename failed - restore src
+        removedSrc.setLocalName(srcChildName);
+        addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, removedSrc, 
+            UNKNOWN_DISK_SPACE, false);
+      }
+      if (removedDst != null) {
+        // Rename failed - restore dst
+        removedDst.setLocalName(dstChildName);
+        addChildNoQuotaCheck(dstInodes, dstInodes.length - 1, removedDst, 
+            UNKNOWN_DISK_SPACE, false);
+      }
+    }
+    NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+        + "failed to rename " + src + " to " + dst);
+    throw new IOException("rename from " + src + " to " + dst + " failed.");
   }
 
   /**
@@ -796,10 +802,16 @@ class FSDirectory implements Closeable {
   Block[] setReplication(String src, short replication, int[] oldReplication)
       throws QuotaExceededException, UnresolvedLinkException {
     waitForReady();
-    Block[] fileBlocks = unprotectedSetReplication(src, replication, oldReplication);
-    if (fileBlocks != null)  // log replication change
-      fsImage.getEditLog().logSetReplication(src, replication);
-    return fileBlocks;
+    Block[] fileBlocks = null;
+    writeLock();
+    try {
+      fileBlocks = unprotectedSetReplication(src, replication, oldReplication);
+      if (fileBlocks != null)  // log replication change
+        fsImage.getEditLog().logSetReplication(src, replication);
+      return fileBlocks;
+    } finally {
+      writeUnlock();
+    }
   }
 
   Block[] unprotectedSetReplication(String src, 
@@ -807,37 +819,31 @@ class FSDirectory implements Closeable {
                                     int[] oldReplication
                                     ) throws QuotaExceededException, 
                                     UnresolvedLinkException {
+    assert hasWriteLock();
     if (oldReplication == null) {
       oldReplication = new int[1];
     }
     oldReplication[0] = -1;
-    Block[] fileBlocks = null;
 
-    writeLock();
-    try {
-      INode[] inodes = rootDir.getExistingPathINodes(src, true);
-      INode inode = inodes[inodes.length - 1];
-      if (inode == null) {
-        return null;
-      }
-      assert !inode.isLink();
-      if (inode.isDirectory()) {
-        return null;
-      }
-      INodeFile fileNode = (INodeFile)inode;
-      oldReplication[0] = fileNode.getReplication();
+    INode[] inodes = rootDir.getExistingPathINodes(src, true);
+    INode inode = inodes[inodes.length - 1];
+    if (inode == null) {
+      return null;
+    }
+    assert !inode.isLink();
+    if (inode.isDirectory()) {
+      return null;
+    }
+    INodeFile fileNode = (INodeFile)inode;
+    oldReplication[0] = fileNode.getReplication();
 
-      // check disk quota
-      long dsDelta = (replication - oldReplication[0]) *
-           (fileNode.diskspaceConsumed()/oldReplication[0]);
-      updateCount(inodes, inodes.length-1, 0, dsDelta, true);
+    // check disk quota
+    long dsDelta = (replication - oldReplication[0]) *
+         (fileNode.diskspaceConsumed()/oldReplication[0]);
+    updateCount(inodes, inodes.length-1, 0, dsDelta, true);
 
-      fileNode.setReplication(replication);
-      fileBlocks = fileNode.getBlocks();
-    } finally {
-      writeUnlock();
-    }
-    return fileBlocks;
+    fileNode.setReplication(replication);
+    return fileNode.getBlocks();
   }
 
   /**
@@ -878,64 +884,66 @@ class FSDirectory implements Closeable {
     }
   }
 
-  void setPermission(String src, FsPermission permission
-      ) throws FileNotFoundException, UnresolvedLinkException {
-    unprotectedSetPermission(src, permission);
-    fsImage.getEditLog().logSetPermissions(src, permission);
-  }
-
-  void unprotectedSetPermission(String src, FsPermission permissions) 
-    throws FileNotFoundException, UnresolvedLinkException {
+  void setPermission(String src, FsPermission permission)
+      throws FileNotFoundException, UnresolvedLinkException {
     writeLock();
     try {
-        INode inode = rootDir.getNode(src, true);
-        if (inode == null) {
-            throw new FileNotFoundException("File does not exist: " + src);
-        }
-        inode.setPermission(permissions);
+      unprotectedSetPermission(src, permission);
     } finally {
       writeUnlock();
     }
+    fsImage.getEditLog().logSetPermissions(src, permission);
   }
 
-  void setOwner(String src, String username, String groupname
-      ) throws FileNotFoundException, UnresolvedLinkException {
-    unprotectedSetOwner(src, username, groupname);
-    fsImage.getEditLog().logSetOwner(src, username, groupname);
+  void unprotectedSetPermission(String src, FsPermission permissions) 
+      throws FileNotFoundException, UnresolvedLinkException {
+    assert hasWriteLock();
+    INode inode = rootDir.getNode(src, true);
+    if (inode == null) {
+      throw new FileNotFoundException("File does not exist: " + src);
+    }
+    inode.setPermission(permissions);
   }
 
-  void unprotectedSetOwner(String src, String username, String groupname) 
-    throws FileNotFoundException, UnresolvedLinkException {
+  void setOwner(String src, String username, String groupname)
+      throws FileNotFoundException, UnresolvedLinkException {
     writeLock();
     try {
-      INode inode = rootDir.getNode(src, true);
-      if (inode == null) {
-          throw new FileNotFoundException("File does not exist: " + src);
-      }
-      if (username != null) {
-        inode.setUser(username);
-      }
-      if (groupname != null) {
-        inode.setGroup(groupname);
-      }
+      unprotectedSetOwner(src, username, groupname);
     } finally {
       writeUnlock();
     }
+    fsImage.getEditLog().logSetOwner(src, username, groupname);
+  }
+
+  void unprotectedSetOwner(String src, String username, String groupname) 
+      throws FileNotFoundException, UnresolvedLinkException {
+    assert hasWriteLock();
+    INode inode = rootDir.getNode(src, true);
+    if (inode == null) {
+      throw new FileNotFoundException("File does not exist: " + src);
+    }
+    if (username != null) {
+      inode.setUser(username);
+    }
+    if (groupname != null) {
+      inode.setGroup(groupname);
+    }
   }
 
   /**
    * Concat all the blocks from srcs to trg and delete the srcs files
    */
-  public void concatInternal(String target, String [] srcs) 
+  public void concat(String target, String [] srcs) 
       throws UnresolvedLinkException {
     writeLock();
     try {
       // actual move
       waitForReady();
-
-      unprotectedConcat(target, srcs);
+      long timestamp = now();
+      unprotectedConcat(target, srcs, timestamp);
       // do the commit
-      fsImage.getEditLog().logConcat(target, srcs, now());
+      fsImage.getEditLog().logConcat(target, srcs, timestamp);
     } finally {
       writeUnlock();
     }
@@ -950,8 +958,9 @@ class FSDirectory implements Closeable {
    * Must be public because also called from EditLogs
    * NOTE: - it does not update quota (not needed for concat)
    */
-  public void unprotectedConcat(String target, String [] srcs) 
+  public void unprotectedConcat(String target, String [] srcs, long timestamp) 
       throws UnresolvedLinkException {
+    assert hasWriteLock();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
     }
@@ -981,9 +990,8 @@ class FSDirectory implements Closeable {
       count++;
     }
     
-    long now = now();
-    trgInode.setModificationTimeForce(now);
-    trgParent.setModificationTime(now);
+    trgInode.setModificationTimeForce(timestamp);
+    trgParent.setModificationTime(timestamp);
     // update quota on the parent directory ('count' files removed, 0 space)
     unprotectedUpdateCount(trgINodes, trgINodes.length-1, - count, 0);
   }
@@ -1002,7 +1010,13 @@ class FSDirectory implements Closeable {
     }
     waitForReady();
     long now = now();
-    int filesRemoved = unprotectedDelete(src, collectedBlocks, now);
+    int filesRemoved;
+    writeLock();
+    try {
+      filesRemoved = unprotectedDelete(src, collectedBlocks, now);
+    } finally {
+      writeUnlock();
+    }
     if (filesRemoved <= 0) {
       return false;
     }
@@ -1055,6 +1069,7 @@ class FSDirectory implements Closeable {
    */ 
   void unprotectedDelete(String src, long mtime) 
     throws UnresolvedLinkException {
+    assert hasWriteLock();
     List<Block> collectedBlocks = new ArrayList<Block>();
     int filesRemoved = unprotectedDelete(src, collectedBlocks, mtime);
     if (filesRemoved > 0) {
@@ -1072,43 +1087,39 @@ class FSDirectory implements Closeable {
    */ 
   int unprotectedDelete(String src, List<Block> collectedBlocks, 
       long mtime) throws UnresolvedLinkException {
+    assert hasWriteLock();
     src = normalizePath(src);
 
-    writeLock();
-    try {
-      INode[] inodes =  rootDir.getExistingPathINodes(src, false);
-      INode targetNode = inodes[inodes.length-1];
+    INode[] inodes =  rootDir.getExistingPathINodes(src, false);
+    INode targetNode = inodes[inodes.length-1];
 
-      if (targetNode == null) { // non-existent src
-        if(NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
-              +"failed to remove "+src+" because it does not exist");
-        }
-        return 0;
-      }
-      if (inodes.length == 1) { // src is the root
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
-            "failed to remove " + src +
-            " because the root is not allowed to be deleted");
-        return 0;
-      }
-      int pos = inodes.length - 1;
-      // Remove the node from the namespace
-      targetNode = removeChild(inodes, pos);
-      if (targetNode == null) {
-        return 0;
-      }
-      // set the parent's modification time
-      inodes[pos-1].setModificationTime(mtime);
-      int filesRemoved = targetNode.collectSubtreeBlocksAndClear(collectedBlocks);
-      if (NameNode.stateChangeLog.isDebugEnabled()) {
+    if (targetNode == null) { // non-existent src
+      if(NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
-            +src+" is removed");
+            +"failed to remove "+src+" because it does not exist");
       }
-      return filesRemoved;
-    } finally {
-      writeUnlock();
+      return 0;
+    }
+    if (inodes.length == 1) { // src is the root
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
+          "failed to remove " + src +
+          " because the root is not allowed to be deleted");
+      return 0;
+    }
+    int pos = inodes.length - 1;
+    // Remove the node from the namespace
+    targetNode = removeChild(inodes, pos);
+    if (targetNode == null) {
+      return 0;
+    }
+    // set the parent's modification time
+    inodes[pos-1].setModificationTime(mtime);
+    int filesRemoved = targetNode.collectSubtreeBlocksAndClear(collectedBlocks);
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+          +src+" is removed");
     }
+    return filesRemoved;
   }
 
   /**
@@ -1353,6 +1364,7 @@ class FSDirectory implements Closeable {
   private void updateCount(INode[] inodes, int numOfINodes, 
                            long nsDelta, long dsDelta, boolean checkQuota)
                            throws QuotaExceededException {
+    assert hasWriteLock();
     if (!ready) {
       //still initializing. do not check or update quotas.
       return;
@@ -1377,6 +1389,7 @@ class FSDirectory implements Closeable {
    */ 
   private void updateCountNoQuotaCheck(INode[] inodes, int numOfINodes, 
                            long nsDelta, long dsDelta) {
+    assert hasWriteLock();
     try {
       updateCount(inodes, numOfINodes, nsDelta, dsDelta, false);
     } catch (QuotaExceededException e) {
@@ -1394,6 +1407,7 @@ class FSDirectory implements Closeable {
    */
    void unprotectedUpdateCount(INode[] inodes, int numOfINodes, 
                                       long nsDelta, long dsDelta) {
+     assert hasWriteLock();
     for(int i=0; i < numOfINodes; i++) {
       if (inodes[i].isQuotaSet()) { // a directory with quota
         INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
@@ -1504,17 +1518,14 @@ class FSDirectory implements Closeable {
   INode unprotectedMkdir(String src, PermissionStatus permissions,
                           long timestamp) throws QuotaExceededException,
                           UnresolvedLinkException {
+    assert hasWriteLock();
     byte[][] components = INode.getPathComponents(src);
     INode[] inodes = new INode[components.length];
-    writeLock();
-    try {
-      rootDir.getExistingPathINodes(components, inodes, false);
-      unprotectedMkdir(inodes, inodes.length-1, components[inodes.length-1],
-          permissions, false, timestamp);
-      return inodes[inodes.length-1];
-    } finally {
-      writeUnlock();
-    }
+
+    rootDir.getExistingPathINodes(components, inodes, false);
+    unprotectedMkdir(inodes, inodes.length-1, components[inodes.length-1],
+        permissions, false, timestamp);
+    return inodes[inodes.length-1];
   }
 
   /** create a directory at index pos.
@@ -1524,6 +1535,7 @@ class FSDirectory implements Closeable {
   private void unprotectedMkdir(INode[] inodes, int pos,
       byte[] name, PermissionStatus permission, boolean inheritPermission,
       long timestamp) throws QuotaExceededException {
+    assert hasWriteLock();
     inodes[pos] = addChild(inodes, pos, 
         new INodeDirectory(name, permission, timestamp),
         -1, inheritPermission );
@@ -1854,7 +1866,8 @@ class FSDirectory implements Closeable {
    */
   INodeDirectory unprotectedSetQuota(String src, long nsQuota, long dsQuota)
     throws FileNotFoundException, QuotaExceededException, 
-    UnresolvedLinkException {
+      UnresolvedLinkException {
+    assert hasWriteLock();
     // sanity check
     if ((nsQuota < 0 && nsQuota != FSConstants.QUOTA_DONT_SET && 
          nsQuota < FSConstants.QUOTA_RESET) || 
@@ -1867,50 +1880,45 @@ class FSDirectory implements Closeable {
     
     String srcs = normalizePath(src);
 
-    writeLock();
-    try {
-      INode[] inodes = rootDir.getExistingPathINodes(src, true);
-      INode targetNode = inodes[inodes.length-1];
-      if (targetNode == null) {
-        throw new FileNotFoundException("Directory does not exist: " + srcs);
-      } else if (!targetNode.isDirectory()) {
-        throw new FileNotFoundException("Cannot set quota on a file: " + srcs);  
-      } else if (targetNode.isRoot() && nsQuota == FSConstants.QUOTA_RESET) {
-        throw new IllegalArgumentException("Cannot clear namespace quota on root.");
-      } else { // a directory inode
-        INodeDirectory dirNode = (INodeDirectory)targetNode;
-        long oldNsQuota = dirNode.getNsQuota();
-        long oldDsQuota = dirNode.getDsQuota();
-        if (nsQuota == FSConstants.QUOTA_DONT_SET) {
-          nsQuota = oldNsQuota;
-        }
-        if (dsQuota == FSConstants.QUOTA_DONT_SET) {
-          dsQuota = oldDsQuota;
-        }        
-
-        if (dirNode instanceof INodeDirectoryWithQuota) { 
-          // a directory with quota; so set the quota to the new value
-          ((INodeDirectoryWithQuota)dirNode).setQuota(nsQuota, dsQuota);
-          if (!dirNode.isQuotaSet()) {
-            // will not come here for root because root's nsQuota is always set
-            INodeDirectory newNode = new INodeDirectory(dirNode);
-            INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
-            dirNode = newNode;
-            parent.replaceChild(newNode);
-          }
-        } else {
-          // a non-quota directory; so replace it with a directory with quota
-          INodeDirectoryWithQuota newNode = 
-            new INodeDirectoryWithQuota(nsQuota, dsQuota, dirNode);
-          // non-root directory node; parent != null
+    INode[] inodes = rootDir.getExistingPathINodes(src, true);
+    INode targetNode = inodes[inodes.length-1];
+    if (targetNode == null) {
+      throw new FileNotFoundException("Directory does not exist: " + srcs);
+    } else if (!targetNode.isDirectory()) {
+      throw new FileNotFoundException("Cannot set quota on a file: " + srcs);  
+    } else if (targetNode.isRoot() && nsQuota == FSConstants.QUOTA_RESET) {
+      throw new IllegalArgumentException("Cannot clear namespace quota on root.");
+    } else { // a directory inode
+      INodeDirectory dirNode = (INodeDirectory)targetNode;
+      long oldNsQuota = dirNode.getNsQuota();
+      long oldDsQuota = dirNode.getDsQuota();
+      if (nsQuota == FSConstants.QUOTA_DONT_SET) {
+        nsQuota = oldNsQuota;
+      }
+      if (dsQuota == FSConstants.QUOTA_DONT_SET) {
+        dsQuota = oldDsQuota;
+      }        
+
+      if (dirNode instanceof INodeDirectoryWithQuota) { 
+        // a directory with quota; so set the quota to the new value
+        ((INodeDirectoryWithQuota)dirNode).setQuota(nsQuota, dsQuota);
+        if (!dirNode.isQuotaSet()) {
+          // will not come here for root because root's nsQuota is always set
+          INodeDirectory newNode = new INodeDirectory(dirNode);
           INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
           dirNode = newNode;
           parent.replaceChild(newNode);
         }
-        return (oldNsQuota != nsQuota || oldDsQuota != dsQuota) ? dirNode : null;
+      } else {
+        // a non-quota directory; so replace it with a directory with quota
+        INodeDirectoryWithQuota newNode = 
+          new INodeDirectoryWithQuota(nsQuota, dsQuota, dirNode);
+        // non-root directory node; parent != null
+        INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
+        dirNode = newNode;
+        parent.replaceChild(newNode);
       }
-    } finally {
-      writeUnlock();
+      return (oldNsQuota != nsQuota || oldDsQuota != dsQuota) ? dirNode : null;
     }
   }
   
@@ -1944,27 +1952,31 @@ class FSDirectory implements Closeable {
   }
 
   /**
-   * Sets the access time on the file. Logs it in the transaction log
+   * Sets the access time on the file. Logs it in the transaction log.
    */
   void setTimes(String src, INodeFile inode, long mtime, long atime, boolean force) {
-    if (unprotectedSetTimes(src, inode, mtime, atime, force)) {
-      fsImage.getEditLog().logTimes(src, mtime, atime);
-    }
-  }
-
-  boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force) 
-    throws UnresolvedLinkException {
+    boolean status = false;
     writeLock();
     try {
-      INodeFile inode = getFileINode(src);
-      return unprotectedSetTimes(src, inode, mtime, atime, force);
+      status = unprotectedSetTimes(src, inode, mtime, atime, force);
     } finally {
       writeUnlock();
     }
+    if (status) {
+      fsImage.getEditLog().logTimes(src, mtime, atime);
+    }
+  }
+
+  boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force) 
+      throws UnresolvedLinkException {
+    assert hasWriteLock();
+    INodeFile inode = getFileINode(src);
+    return unprotectedSetTimes(src, inode, mtime, atime, force);
   }
 
   private boolean unprotectedSetTimes(String src, INodeFile inode, long mtime,
                                       long atime, boolean force) {
+    assert hasWriteLock();
     boolean status = false;
     if (mtime != -1) {
       inode.setModificationTimeForce(mtime);
@@ -2043,6 +2055,7 @@ class FSDirectory implements Closeable {
     */
     private HdfsLocatedFileStatus createLocatedFileStatus(
         byte[] path, INode node) throws IOException {
+      assert hasReadLock();
       long size = 0;     // length is zero for directories
       short replication = 0;
       long blocksize = 0;
@@ -2091,8 +2104,14 @@ class FSDirectory implements Closeable {
       }
     }
     final String userName = dirPerms.getUserName();
-    INodeSymlink newNode = unprotectedSymlink(path, target, modTime, modTime,
-      new PermissionStatus(userName, null, FsPermission.getDefault()));         
+    INodeSymlink newNode  = null;
+    writeLock();
+    try {
+      newNode = unprotectedSymlink(path, target, modTime, modTime,
+          new PermissionStatus(userName, null, FsPermission.getDefault()));
+    } finally {
+      writeUnlock();
+    }
     if (newNode == null) {
       NameNode.stateChangeLog.info("DIR* FSDirectory.addSymlink: "
                                    +"failed to add "+path
@@ -2114,14 +2133,10 @@ class FSDirectory implements Closeable {
   INodeSymlink unprotectedSymlink(String path, String target, long modTime, 
                                   long atime, PermissionStatus perm) 
       throws UnresolvedLinkException {
+    assert hasWriteLock();
     INodeSymlink newNode = new INodeSymlink(target, modTime, atime, perm);
     try {
-      writeLock();
-      try {
-        newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE, false);
-      } finally {
-        writeUnlock();
-      }
+      newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE, false);
     } catch (UnresolvedLinkException e) {
       /* All UnresolvedLinkExceptions should have been resolved by now, but we
        * should re-throw them in case that changes so they are not swallowed 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Tue Jun 21 20:09:54 2011
@@ -99,6 +99,9 @@ public class FSEditLogLoader {
         numOpRenewDelegationToken = 0, numOpCancelDelegationToken = 0, 
         numOpUpdateMasterKey = 0, numOpReassignLease = 0, numOpOther = 0;
 
+    fsNamesys.writeLock();
+    fsDir.writeLock();
+
     // Keep track of the file offsets of the last several opcodes.
     // This is handy when manually recovering corrupted edits files.
     PositionTrackingInputStream tracker = new PositionTrackingInputStream(in);
@@ -219,7 +222,8 @@ public class FSEditLogLoader {
             numOpConcatDelete++;
 
             ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
-            fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs);
+            fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs,
+                concatDeleteOp.timestamp);
             break;
           }
           case OP_RENAME_OLD: {
@@ -228,7 +232,7 @@ public class FSEditLogLoader {
             HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
             fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
                                       renameOp.timestamp);
-            fsNamesys.changeLease(renameOp.src, renameOp.dst, dinfo);
+            fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo);
             break;
           }
           case OP_DELETE: {
@@ -319,7 +323,7 @@ public class FSEditLogLoader {
             HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
             fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
                                       renameOp.timestamp, renameOp.options);
-            fsNamesys.changeLease(renameOp.src, renameOp.dst, dinfo);
+            fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo);
             break;
           }
           case OP_GET_DELEGATION_TOKEN: {
@@ -409,6 +413,9 @@ public class FSEditLogLoader {
       String errorMessage = sb.toString();
       FSImage.LOG.error(errorMessage);
       throw new IOException(errorMessage, t);
+    } finally {
+      fsDir.writeUnlock();
+      fsNamesys.writeUnlock();
     }
     if (FSImage.LOG.isDebugEnabled()) {
       FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Tue Jun 21 20:09:54 2011
@@ -268,7 +268,6 @@ public abstract class FSEditLogOp {
     int length;
     String path;
     long timestamp;
-    long atime;
     PermissionStatus permissions;
 
     private MkdirOp() {
@@ -291,9 +290,7 @@ public abstract class FSEditLogOp {
       // However, currently this is not being updated/used because of
       // performance reasons.
       if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
-        this.atime = readLong(in);
-      } else {
-        this.atime = 0;
+        /*unused this.atime = */readLong(in);
       }
 
       if (logVersion <= -11) {

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Tue Jun 21 20:09:54 2011
@@ -241,24 +241,9 @@ public class FSImage implements Closeabl
           + "Please restart NameNode with -upgrade option.");
     }
     
-    // Upgrade to federation requires -upgrade -clusterid <clusterID> option
-    if (startOpt == StartupOption.UPGRADE && 
-        !LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
-      if (startOpt.getClusterId() == null) {
-        throw new IOException(
-            "\nFile system image contains an old layout version "
-                + layoutVersion + ".\nAn upgrade to version "
-                + FSConstants.LAYOUT_VERSION
-                + " is required.\nPlease restart NameNode with "
-                + "-upgrade -clusterid <clusterID> option.");
-      }
-      storage.setClusterID(startOpt.getClusterId());
-      
-      // Create new block pool Id
-      storage.setBlockPoolID(storage.newBlockPoolID());
-    }
-    
-    // check whether distributed upgrade is reguired and/or should be continued
+    storage.processStartupOptionsForUpgrade(startOpt, layoutVersion);
+
+    // check whether distributed upgrade is required and/or should be continued
     storage.verifyDistributedUpgradeProgress(startOpt);
 
     // 2. Format unformatted dirs.