You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by wa...@apache.org on 2014/07/07 22:44:09 UTC

svn commit: r1608603 [3/6] - in /hadoop/common/branches/fs-encryption/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop-...

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Mon Jul  7 20:43:56 2014
@@ -77,7 +77,6 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
@@ -130,7 +129,6 @@ public class FSDirectory implements Clos
       XAttrHelper.buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, null);
 
   INodeDirectory rootDir;
-  FSImage fsImage;  
   private final FSNamesystem namesystem;
   private volatile boolean skipQuotaCheck = false; //skip while consuming edits
   private final int maxComponentLength;
@@ -212,11 +210,10 @@ public class FSDirectory implements Clos
    */
   private final NameCache<ByteArray> nameCache;
 
-  FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
+  FSDirectory(FSNamesystem ns, Configuration conf) {
     this.dirLock = new ReentrantReadWriteLock(true); // fair
     rootDir = createRoot(ns);
     inodeMap = INodeMap.newInstance(rootDir);
-    this.fsImage = fsImage;
     int configuredLimit = conf.getInt(
         DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
     this.lsLimit = configuredLimit>0 ?
@@ -275,9 +272,7 @@ public class FSDirectory implements Clos
    * Shutdown the filestore
    */
   @Override
-  public void close() throws IOException {
-    fsImage.close();
-  }
+  public void close() throws IOException {}
 
   void markNameCacheInitialized() {
     writeLock();
@@ -306,8 +301,8 @@ public class FSDirectory implements Clos
    * @throws SnapshotAccessControlException 
    */
   INodeFile addFile(String path, PermissionStatus permissions,
-      short replication, long preferredBlockSize, String clientName,
-      String clientMachine, DatanodeDescriptor clientNode)
+                    short replication, long preferredBlockSize,
+                    String clientName, String clientMachine)
     throws FileAlreadyExistsException, QuotaExceededException,
       UnresolvedLinkException, SnapshotAccessControlException, AclException {
 
@@ -315,7 +310,7 @@ public class FSDirectory implements Clos
     INodeFile newNode = new INodeFile(namesystem.allocateNewInodeId(), null,
         permissions, modTime, modTime, BlockInfo.EMPTY_ARRAY, replication,
         preferredBlockSize);
-    newNode.toUnderConstruction(clientName, clientMachine, clientNode);
+    newNode.toUnderConstruction(clientName, clientMachine);
 
     boolean added = false;
     writeLock();
@@ -353,7 +348,7 @@ public class FSDirectory implements Clos
       newNode = new INodeFile(id, null, permissions, modificationTime,
           modificationTime, BlockInfo.EMPTY_ARRAY, replication,
           preferredBlockSize);
-      newNode.toUnderConstruction(clientName, clientMachine, null);
+      newNode.toUnderConstruction(clientName, clientMachine);
 
     } else {
       newNode = new INodeFile(id, null, permissions, modificationTime, atime,
@@ -517,55 +512,30 @@ public class FSDirectory implements Clos
     assert hasWriteLock();
     INodesInPath srcIIP = getINodesInPath4Write(src, false);
     final INode srcInode = srcIIP.getLastINode();
-    
-    // check the validation of the source
-    if (srcInode == null) {
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + "failed to rename " + src + " to " + dst
-          + " because source does not exist");
-      return false;
-    } 
-    if (srcIIP.getINodes().length == 1) {
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          +"failed to rename "+src+" to "+dst+ " because source is the root");
+    try {
+      validateRenameSource(src, srcIIP);
+    } catch (SnapshotException e) {
+      throw e;
+    } catch (IOException ignored) {
       return false;
     }
-    
-    // srcInode and its subtree cannot contain snapshottable directories with
-    // snapshots
-    List<INodeDirectorySnapshottable> snapshottableDirs = 
-        new ArrayList<INodeDirectorySnapshottable>();
-    checkSnapshot(srcInode, snapshottableDirs);
-    
+
     if (isDir(dst)) {
       dst += Path.SEPARATOR + new Path(src).getName();
     }
-    
-    // check the validity of the destination
+
+    // validate the destination
     if (dst.equals(src)) {
       return true;
     }
-    if (srcInode.isSymlink() && 
-        dst.equals(srcInode.asSymlink().getSymlinkString())) {
-      throw new FileAlreadyExistsException(
-          "Cannot rename symlink "+src+" to its target "+dst);
-    }
-    
-    // dst cannot be directory or a file under src
-    if (dst.startsWith(src) && 
-        dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + "failed to rename " + src + " to " + dst
-          + " because destination starts with src");
+
+    try {
+      validateRenameDestination(src, dst, srcInode);
+    } catch (IOException ignored) {
       return false;
     }
-    
-    byte[][] dstComponents = INode.getPathComponents(dst);
-    INodesInPath dstIIP = getExistingPathINodes(dstComponents);
-    if (dstIIP.isSnapshot()) {
-      throw new SnapshotAccessControlException(
-          "Modification on RO snapshot is disallowed");
-    }
+
+    INodesInPath dstIIP = getINodesInPath4Write(dst, false);
     if (dstIIP.getLastINode() != null) {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
                                    +"failed to rename "+src+" to "+dst+ 
@@ -584,42 +554,10 @@ public class FSDirectory implements Clos
     // Ensure dst has quota to accommodate rename
     verifyFsLimitsForRename(srcIIP, dstIIP);
     verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
-    
+
+    RenameOperation tx = new RenameOperation(src, dst, srcIIP, dstIIP);
+
     boolean added = false;
-    INode srcChild = srcIIP.getLastINode();
-    final byte[] srcChildName = srcChild.getLocalNameBytes();
-    final boolean isSrcInSnapshot = srcChild.isInLatestSnapshot(
-        srcIIP.getLatestSnapshotId());
-    final boolean srcChildIsReference = srcChild.isReference();
-    
-    // Record the snapshot on srcChild. After the rename, before any new 
-    // snapshot is taken on the dst tree, changes will be recorded in the latest
-    // snapshot of the src tree.
-    if (isSrcInSnapshot) {
-      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshotId());
-      srcIIP.setLastINode(srcChild);
-    }
-    
-    // check srcChild for reference
-    final INodeReference.WithCount withCount;
-    Quota.Counts oldSrcCounts = Quota.Counts.newInstance();
-    int srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference()
-        .getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
-    if (isSrcInSnapshot) {
-      final INodeReference.WithName withName = 
-          srcIIP.getINode(-2).asDirectory().replaceChild4ReferenceWithName(
-              srcChild, srcIIP.getLatestSnapshotId()); 
-      withCount = (INodeReference.WithCount) withName.getReferredINode();
-      srcChild = withName;
-      srcIIP.setLastINode(srcChild);
-      // get the counts before rename
-      withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
-    } else if (srcChildIsReference) {
-      // srcChild is reference but srcChild is not in latest snapshot
-      withCount = (WithCount) srcChild.asReference().getReferredINode();
-    } else {
-      withCount = null;
-    }
 
     try {
       // remove src
@@ -630,84 +568,22 @@ public class FSDirectory implements Clos
             + " because the source can not be removed");
         return false;
       }
-      
-      if (dstParent.getParent() == null) {
-        // src and dst file/dir are in the same directory, and the dstParent has
-        // been replaced when we removed the src. Refresh the dstIIP and
-        // dstParent.
-        dstIIP = getExistingPathINodes(dstComponents);
-        dstParent = dstIIP.getINode(-2);
-      }
-      
-      // add src to the destination
-      
-      srcChild = srcIIP.getLastINode();
-      final byte[] dstChildName = dstIIP.getLastLocalName();
-      final INode toDst;
-      if (withCount == null) {
-        srcChild.setLocalName(dstChildName);
-        toDst = srcChild;
-      } else {
-        withCount.getReferredINode().setLocalName(dstChildName);
-        int dstSnapshotId = dstIIP.getLatestSnapshotId();
-        toDst = new INodeReference.DstReference(
-            dstParent.asDirectory(), withCount, dstSnapshotId);
-      }
-      
-      added = addLastINodeNoQuotaCheck(dstIIP, toDst);
+
+      added = tx.addSourceToDestination();
       if (added) {
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: " 
               + src + " is renamed to " + dst);
         }
-        // update modification time of dst and the parent of src
-        final INode srcParent = srcIIP.getINode(-2);
-        srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshotId());
-        dstParent = dstIIP.getINode(-2); // refresh dstParent
-        dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshotId());
-        // update moved leases with new filename
-        getFSNamesystem().unprotectedChangeLease(src, dst);     
-
-        // update the quota usage in src tree
-        if (isSrcInSnapshot) {
-          // get the counts after rename
-          Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
-              Quota.Counts.newInstance(), false);
-          newSrcCounts.subtract(oldSrcCounts);
-          srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
-              newSrcCounts.get(Quota.DISKSPACE), false);
-        }
+
+        tx.updateMtimeAndLease(timestamp);
+        tx.updateQuotasInSourceTree();
         
         return true;
       }
     } finally {
       if (!added) {
-        final INodeDirectory srcParent = srcIIP.getINode(-2).asDirectory();
-        final INode oldSrcChild = srcChild;
-        // put it back
-        if (withCount == null) {
-          srcChild.setLocalName(srcChildName);
-        } else if (!srcChildIsReference) { // src must be in snapshot
-          // the withCount node will no longer be used thus no need to update
-          // its reference number here
-          srcChild = withCount.getReferredINode();
-          srcChild.setLocalName(srcChildName);
-        } else {
-          withCount.removeReference(oldSrcChild.asReference());
-          srcChild = new INodeReference.DstReference(
-              srcParent, withCount, srcRefDstSnapshot);
-          withCount.getReferredINode().setLocalName(srcChildName);
-        }
-        
-        if (isSrcInSnapshot) {
-          // srcParent must have snapshot feature since isSrcInSnapshot is true
-          // and src node has been removed from srcParent 
-          srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild);
-        } else {
-          // original srcChild is not in latest snapshot, we only need to add
-          // the srcChild back
-          addLastINodeNoQuotaCheck(srcIIP, srcChild);
-        }
+        tx.restoreSource();
       }
     }
     NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@@ -730,53 +606,21 @@ public class FSDirectory implements Clos
       FileNotFoundException, ParentNotDirectoryException,
       QuotaExceededException, UnresolvedLinkException, IOException {
     assert hasWriteLock();
-    boolean overwrite = false;
-    if (null != options) {
-      for (Rename option : options) {
-        if (option == Rename.OVERWRITE) {
-          overwrite = true;
-        }
-      }
-    }
+    boolean overwrite = options != null && Arrays.asList(options).contains
+            (Rename.OVERWRITE);
+
     final String error;
     final INodesInPath srcIIP = getINodesInPath4Write(src, false);
     final INode srcInode = srcIIP.getLastINode();
-    // validate source
-    if (srcInode == null) {
-      error = "rename source " + src + " is not found.";
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new FileNotFoundException(error);
-    }
-    if (srcIIP.getINodes().length == 1) {
-      error = "rename source cannot be the root";
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new IOException(error);
-    }
-    // srcInode and its subtree cannot contain snapshottable directories with
-    // snapshots
-    checkSnapshot(srcInode, null);
-    
+    validateRenameSource(src, srcIIP);
+
     // validate the destination
     if (dst.equals(src)) {
       throw new FileAlreadyExistsException(
           "The source "+src+" and destination "+dst+" are the same");
     }
-    if (srcInode.isSymlink() && 
-        dst.equals(srcInode.asSymlink().getSymlinkString())) {
-      throw new FileAlreadyExistsException(
-          "Cannot rename symlink "+src+" to its target "+dst);
-    }
-    // dst cannot be a directory or a file under src
-    if (dst.startsWith(src) && 
-        dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
-      error = "Rename destination " + dst
-          + " is a directory or file under source " + src;
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new IOException(error);
-    }
+    validateRenameDestination(src, dst, srcInode);
+
     INodesInPath dstIIP = getINodesInPath4Write(dst, false);
     if (dstIIP.getINodes().length == 1) {
       error = "rename destination cannot be the root";
@@ -790,30 +634,7 @@ public class FSDirectory implements Clos
     List<INodeDirectorySnapshottable> snapshottableDirs = 
         new ArrayList<INodeDirectorySnapshottable>();
     if (dstInode != null) { // Destination exists
-      // It's OK to rename a file to a symlink and vice versa
-      if (dstInode.isDirectory() != srcInode.isDirectory()) {
-        error = "Source " + src + " and destination " + dst
-            + " must both be directories";
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new IOException(error);
-      }
-      if (!overwrite) { // If destination exists, overwrite flag must be true
-        error = "rename destination " + dst + " already exists";
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new FileAlreadyExistsException(error);
-      }
-      if (dstInode.isDirectory()) {
-        final ReadOnlyList<INode> children = dstInode.asDirectory()
-            .getChildrenList(Snapshot.CURRENT_STATE_ID);
-        if (!children.isEmpty()) {
-          error = "rename destination directory is not empty: " + dst;
-          NameNode.stateChangeLog.warn(
-              "DIR* FSDirectory.unprotectedRenameTo: " + error);
-          throw new IOException(error);
-        }
-      }
+      validateRenameOverwrite(src, dst, overwrite, srcInode, dstInode);
       checkSnapshot(dstInode, snapshottableDirs);
     }
 
@@ -835,40 +656,8 @@ public class FSDirectory implements Clos
     verifyFsLimitsForRename(srcIIP, dstIIP);
     verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
 
-    INode srcChild = srcIIP.getLastINode();
-    final byte[] srcChildName = srcChild.getLocalNameBytes();
-    final boolean isSrcInSnapshot = srcChild.isInLatestSnapshot(
-        srcIIP.getLatestSnapshotId());
-    final boolean srcChildIsReference = srcChild.isReference();
-    
-    // Record the snapshot on srcChild. After the rename, before any new 
-    // snapshot is taken on the dst tree, changes will be recorded in the latest
-    // snapshot of the src tree.
-    if (isSrcInSnapshot) {
-      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshotId());
-      srcIIP.setLastINode(srcChild);
-    }
-    
-    // check srcChild for reference
-    final INodeReference.WithCount withCount;
-    int srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference()
-        .getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
-    Quota.Counts oldSrcCounts = Quota.Counts.newInstance();    
-    if (isSrcInSnapshot) {
-      final INodeReference.WithName withName = srcIIP.getINode(-2).asDirectory()
-          .replaceChild4ReferenceWithName(srcChild, srcIIP.getLatestSnapshotId()); 
-      withCount = (INodeReference.WithCount) withName.getReferredINode();
-      srcChild = withName;
-      srcIIP.setLastINode(srcChild);
-      // get the counts before rename
-      withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
-    } else if (srcChildIsReference) {
-      // srcChild is reference but srcChild is not in latest snapshot
-      withCount = (WithCount) srcChild.asReference().getReferredINode();
-    } else {
-      withCount = null;
-    }
-    
+    RenameOperation tx = new RenameOperation(src, dst, srcIIP, dstIIP);
+
     boolean undoRemoveSrc = true;
     final long removedSrc = removeLastINode(srcIIP);
     if (removedSrc == -1) {
@@ -879,13 +668,6 @@ public class FSDirectory implements Clos
       throw new IOException(error);
     }
     
-    if (dstParent.getParent() == null) {
-      // src and dst file/dir are in the same directory, and the dstParent has
-      // been replaced when we removed the src. Refresh the dstIIP and
-      // dstParent.
-      dstIIP = getINodesInPath4Write(dst, false);
-    }
-    
     boolean undoRemoveDst = false;
     INode removedDst = null;
     long removedNum = 0;
@@ -896,23 +678,9 @@ public class FSDirectory implements Clos
           undoRemoveDst = true;
         }
       }
-      
-      srcChild = srcIIP.getLastINode();
-
-      final byte[] dstChildName = dstIIP.getLastLocalName();
-      final INode toDst;
-      if (withCount == null) {
-        srcChild.setLocalName(dstChildName);
-        toDst = srcChild;
-      } else {
-        withCount.getReferredINode().setLocalName(dstChildName);
-        int dstSnapshotId = dstIIP.getLatestSnapshotId();
-        toDst = new INodeReference.DstReference(
-            dstIIP.getINode(-2).asDirectory(), withCount, dstSnapshotId);
-      }
 
       // add src as dst to complete rename
-      if (addLastINodeNoQuotaCheck(dstIIP, toDst)) {
+      if (tx.addSourceToDestination()) {
         undoRemoveSrc = false;
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug(
@@ -920,12 +688,7 @@ public class FSDirectory implements Clos
               + " is renamed to " + dst);
         }
 
-        final INode srcParent = srcIIP.getINode(-2);
-        srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshotId());
-        dstParent = dstIIP.getINode(-2);
-        dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshotId());
-        // update moved lease with new filename
-        getFSNamesystem().unprotectedChangeLease(src, dst);
+        tx.updateMtimeAndLease(timestamp);
 
         // Collect the blocks and remove the lease for previous dst
         long filesDeleted = -1;
@@ -947,47 +710,15 @@ public class FSDirectory implements Clos
           // deleted. Need to update the SnapshotManager.
           namesystem.removeSnapshottableDirs(snapshottableDirs);
         }
-        
-        // update the quota usage in src tree
-        if (isSrcInSnapshot) {
-          // get the counts after rename
-          Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
-              Quota.Counts.newInstance(), false);
-          newSrcCounts.subtract(oldSrcCounts);
-          srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
-              newSrcCounts.get(Quota.DISKSPACE), false);
-        }
-        
+
+        tx.updateQuotasInSourceTree();
         return filesDeleted >= 0;
       }
     } finally {
       if (undoRemoveSrc) {
-        // Rename failed - restore src
-        final INodeDirectory srcParent = srcIIP.getINode(-2).asDirectory();
-        final INode oldSrcChild = srcChild;
-        // put it back
-        if (withCount == null) {
-          srcChild.setLocalName(srcChildName);
-        } else if (!srcChildIsReference) { // src must be in snapshot
-          // the withCount node will no longer be used thus no need to update
-          // its reference number here
-          srcChild = withCount.getReferredINode();
-          srcChild.setLocalName(srcChildName);
-        } else {
-          withCount.removeReference(oldSrcChild.asReference());
-          srcChild = new INodeReference.DstReference(
-              srcParent, withCount, srcRefDstSnapshot);
-          withCount.getReferredINode().setLocalName(srcChildName);
-        }
-        
-        if (srcParent.isWithSnapshot()) {
-          srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild);
-        } else {
-          // srcParent is not an INodeDirectoryWithSnapshot, we only need to add
-          // the srcChild back
-          addLastINodeNoQuotaCheck(srcIIP, srcChild);
-        }
+        tx.restoreSource();
       }
+
       if (undoRemoveDst) {
         // Rename failed - restore dst
         if (dstParent.isDirectory() && dstParent.asDirectory().isWithSnapshot()) {
@@ -1008,7 +739,200 @@ public class FSDirectory implements Clos
         + "failed to rename " + src + " to " + dst);
     throw new IOException("rename from " + src + " to " + dst + " failed.");
   }
-  
+
+  private static void validateRenameOverwrite(String src, String dst,
+                                              boolean overwrite,
+                                              INode srcInode, INode dstInode)
+          throws IOException {
+    String error;// It's OK to rename a file to a symlink and vice versa
+    if (dstInode.isDirectory() != srcInode.isDirectory()) {
+      error = "Source " + src + " and destination " + dst
+          + " must both be directories";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+    if (!overwrite) { // If destination exists, overwrite flag must be true
+      error = "rename destination " + dst + " already exists";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new FileAlreadyExistsException(error);
+    }
+    if (dstInode.isDirectory()) {
+      final ReadOnlyList<INode> children = dstInode.asDirectory()
+          .getChildrenList(Snapshot.CURRENT_STATE_ID);
+      if (!children.isEmpty()) {
+        error = "rename destination directory is not empty: " + dst;
+        NameNode.stateChangeLog.warn(
+            "DIR* FSDirectory.unprotectedRenameTo: " + error);
+        throw new IOException(error);
+      }
+    }
+  }
+
+  private static void validateRenameDestination(String src, String dst, INode srcInode)
+          throws IOException {
+    String error;
+    if (srcInode.isSymlink() &&
+        dst.equals(srcInode.asSymlink().getSymlinkString())) {
+      throw new FileAlreadyExistsException(
+          "Cannot rename symlink "+src+" to its target "+dst);
+    }
+    // dst cannot be a directory or a file under src
+    if (dst.startsWith(src) &&
+        dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
+      error = "Rename destination " + dst
+          + " is a directory or file under source " + src;
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+  }
+
+  private static void validateRenameSource(String src, INodesInPath srcIIP)
+          throws IOException {
+    String error;
+    final INode srcInode = srcIIP.getLastINode();
+    // validate source
+    if (srcInode == null) {
+      error = "rename source " + src + " is not found.";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new FileNotFoundException(error);
+    }
+    if (srcIIP.getINodes().length == 1) {
+      error = "rename source cannot be the root";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+    // srcInode and its subtree cannot contain snapshottable directories with
+    // snapshots
+    checkSnapshot(srcInode, null);
+  }
+
+
+
+  private class RenameOperation {
+    private final INodesInPath srcIIP;
+    private final INodesInPath dstIIP;
+    private final String src;
+    private final String dst;
+
+    private INode srcChild;
+    private final INodeReference.WithCount withCount;
+    private final int srcRefDstSnapshot;
+    private final INodeDirectory srcParent;
+    private final byte[] srcChildName;
+    private final boolean isSrcInSnapshot;
+    private final boolean srcChildIsReference;
+    private final Quota.Counts oldSrcCounts;
+
+    private RenameOperation(String src, String dst, INodesInPath srcIIP, INodesInPath dstIIP)
+            throws QuotaExceededException {
+      this.srcIIP = srcIIP;
+      this.dstIIP = dstIIP;
+      this.src = src;
+      this.dst = dst;
+      srcChild = srcIIP.getLastINode();
+      srcChildName = srcChild.getLocalNameBytes();
+      isSrcInSnapshot = srcChild.isInLatestSnapshot(
+              srcIIP.getLatestSnapshotId());
+      srcChildIsReference = srcChild.isReference();
+      srcParent = srcIIP.getINode(-2).asDirectory();
+
+      // Record the snapshot on srcChild. After the rename, before any new
+      // snapshot is taken on the dst tree, changes will be recorded in the latest
+      // snapshot of the src tree.
+      if (isSrcInSnapshot) {
+        srcChild = srcChild.recordModification(srcIIP.getLatestSnapshotId());
+      }
+
+      // check srcChild for reference
+      srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference()
+              .getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
+      oldSrcCounts = Quota.Counts.newInstance();
+      if (isSrcInSnapshot) {
+        final INodeReference.WithName withName = srcIIP.getINode(-2).asDirectory()
+                .replaceChild4ReferenceWithName(srcChild, srcIIP.getLatestSnapshotId());
+        withCount = (INodeReference.WithCount) withName.getReferredINode();
+        srcChild = withName;
+        srcIIP.setLastINode(srcChild);
+        // get the counts before rename
+        withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
+      } else if (srcChildIsReference) {
+        // srcChild is reference but srcChild is not in latest snapshot
+        withCount = (WithCount) srcChild.asReference().getReferredINode();
+      } else {
+        withCount = null;
+      }
+    }
+
+    boolean addSourceToDestination() {
+      final INode dstParent = dstIIP.getINode(-2);
+      srcChild = srcIIP.getLastINode();
+      final byte[] dstChildName = dstIIP.getLastLocalName();
+      final INode toDst;
+      if (withCount == null) {
+        srcChild.setLocalName(dstChildName);
+        toDst = srcChild;
+      } else {
+        withCount.getReferredINode().setLocalName(dstChildName);
+        int dstSnapshotId = dstIIP.getLatestSnapshotId();
+        toDst = new INodeReference.DstReference(
+                dstParent.asDirectory(), withCount, dstSnapshotId);
+      }
+      return addLastINodeNoQuotaCheck(dstIIP, toDst);
+    }
+
+    void updateMtimeAndLease(long timestamp) throws QuotaExceededException {
+      srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshotId());
+      final INode dstParent = dstIIP.getINode(-2);
+      dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshotId());
+      // update moved lease with new filename
+      getFSNamesystem().unprotectedChangeLease(src, dst);
+    }
+
+    void restoreSource() throws QuotaExceededException {
+      // Rename failed - restore src
+      final INode oldSrcChild = srcChild;
+      // put it back
+      if (withCount == null) {
+        srcChild.setLocalName(srcChildName);
+      } else if (!srcChildIsReference) { // src must be in snapshot
+        // the withCount node will no longer be used thus no need to update
+        // its reference number here
+        srcChild = withCount.getReferredINode();
+        srcChild.setLocalName(srcChildName);
+      } else {
+        withCount.removeReference(oldSrcChild.asReference());
+        srcChild = new INodeReference.DstReference(
+                srcParent, withCount, srcRefDstSnapshot);
+        withCount.getReferredINode().setLocalName(srcChildName);
+      }
+
+      if (isSrcInSnapshot) {
+        srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild);
+      } else {
+        // srcParent is not an INodeDirectoryWithSnapshot, we only need to add
+        // the srcChild back
+        addLastINodeNoQuotaCheck(srcIIP, srcChild);
+      }
+    }
+
+    void updateQuotasInSourceTree() throws QuotaExceededException {
+      // update the quota usage in src tree
+      if (isSrcInSnapshot) {
+        // get the counts after rename
+        Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
+                Quota.Counts.newInstance(), false);
+        newSrcCounts.subtract(oldSrcCounts);
+        srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
+                newSrcCounts.get(Quota.DISKSPACE), false);
+      }
+    }
+  }
+
   boolean isInAnEZ(INodesInPath iip)
     throws UnresolvedLinkException, SnapshotAccessControlException {
     readLock();
@@ -1441,14 +1365,14 @@ public class FSDirectory implements Clos
    *                          but do not have snapshots yet
    */
   private static void checkSnapshot(INode target,
-      List<INodeDirectorySnapshottable> snapshottableDirs) throws IOException {
+      List<INodeDirectorySnapshottable> snapshottableDirs) throws SnapshotException {
     if (target.isDirectory()) {
       INodeDirectory targetDir = target.asDirectory();
       if (targetDir.isSnapshottable()) {
         INodeDirectorySnapshottable ssTargetDir = 
             (INodeDirectorySnapshottable) targetDir;
         if (ssTargetDir.getNumSnapshots() > 0) {
-          throw new IOException("The directory " + ssTargetDir.getFullPathName()
+          throw new SnapshotException("The directory " + ssTargetDir.getFullPathName()
               + " cannot be deleted since " + ssTargetDir.getFullPathName()
               + " is snapshottable and already has snapshots");
         } else {
@@ -2164,10 +2088,6 @@ public class FSDirectory implements Clos
     if (!parent.removeChild(last, latestSnapshot)) {
       return -1;
     }
-    INodeDirectory newParent = last.getParent();
-    if (parent != newParent) {
-      iip.setINode(-2, newParent);
-    }
     
     if (!last.isInLatestSnapshot(latestSnapshot)) {
       final Quota.Counts counts = last.computeQuotaUsage();
@@ -3039,7 +2959,7 @@ public class FSDirectory implements Clos
    * Note that this method cannot handle scenarios where the inode is in a
    * snapshot.
    */
-  static byte[][] getPathComponents(INode inode) {
+  public static byte[][] getPathComponents(INode inode) {
     List<byte[]> components = new ArrayList<byte[]>();
     components.add(0, inode.getLocalNameBytes());
     while(inode.getParent() != null) {
@@ -3050,53 +2970,6 @@ public class FSDirectory implements Clos
   }
 
   /**
-   * The same functionality with {@link #getPathComponents(INode)}, but can
-   * handle snapshots.
-   */
-  public static byte[][] getPathComponentsWithSnapshot(INode inode) {
-    List<byte[]> components = new ArrayList<byte[]>();
-    boolean inSnapshot = false;
-    int snapshotId = Snapshot.CURRENT_STATE_ID;
-    do {
-      if (inode instanceof INodeReference.WithCount) {
-        // identify the corresponding WithName or DstReference node
-        inode = ((WithCount) inode).getParentRef(snapshotId);
-      } else { // normal INode and WithName/DstReference
-        if (inode instanceof INodeDirectory
-            && inode.asDirectory().isSnapshottable() && inSnapshot
-            && snapshotId != Snapshot.CURRENT_STATE_ID) {
-          INodeDirectorySnapshottable sdir = (INodeDirectorySnapshottable) inode
-              .asDirectory();
-          Snapshot snapshot = sdir.getSnapshotById(snapshotId);
-          if (snapshot != null) {
-            components.add(0, snapshot.getRoot().getLocalNameBytes());
-            components.add(0, HdfsConstants.DOT_SNAPSHOT_DIR_BYTES);
-            // the snapshot has been found, thus no need to check snapshottable
-            // directory afterwards
-            inSnapshot = false;
-          }
-        }
-        INode parent = inode.getParentReference() != null ? inode
-            .getParentReference() : inode.getParent();
-        if (parent != null && parent instanceof INodeDirectory) {
-          int sid = parent.asDirectory().searchChild(inode);
-          Preconditions.checkState(sid != Snapshot.NO_SNAPSHOT_ID);
-          if (sid != Snapshot.CURRENT_STATE_ID
-              && snapshotId == Snapshot.CURRENT_STATE_ID) {
-            snapshotId = sid;
-            inSnapshot = true;
-          }
-          components.add(0, inode.getLocalNameBytes());
-        } else if (parent == null) { // root
-          components.add(0, inode.getLocalNameBytes());
-        }
-        inode = parent;
-      }
-    } while (inode != null);
-    return components.toArray(new byte[components.size()][]);
-  }
-
-  /**
    * @return path components for reserved path, else null.
    */
   static byte[][] getPathComponentsForReservedPath(String src) {

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Mon Jul  7 20:43:56 2014
@@ -376,8 +376,7 @@ public class FSEditLogLoader {
                 "for append");
           }
           LocatedBlock lb = fsNamesys.prepareFileForWrite(path,
-              oldFile, addCloseOp.clientName, addCloseOp.clientMachine, null,
-              false, iip.getLatestSnapshotId(), false);
+              oldFile, addCloseOp.clientName, addCloseOp.clientMachine, false, iip.getLatestSnapshotId(), false);
           newFile = INodeFile.valueOf(fsDir.getINode(path),
               path, true);
           
@@ -740,7 +739,13 @@ public class FSEditLogLoader {
     }
     case OP_ROLLING_UPGRADE_FINALIZE: {
       final long finalizeTime = ((RollingUpgradeOp) op).getTime();
-      fsNamesys.finalizeRollingUpgradeInternal(finalizeTime);
+      if (fsNamesys.isRollingUpgrade()) {
+        // Only do it when NN is actually doing rolling upgrade.
+        // We can get FINALIZE without corresponding START, if NN is restarted
+        // before this op is consumed and a new checkpoint is created.
+        fsNamesys.finalizeRollingUpgradeInternal(finalizeTime);
+      }
+      fsNamesys.getFSImage().updateStorageVersion();
       fsNamesys.getFSImage().renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK,
           NameNodeFile.IMAGE);
       break;

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Mon Jul  7 20:43:56 2014
@@ -40,6 +40,7 @@ import static org.apache.hadoop.hdfs.ser
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_DIRECTIVE;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_POOL;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_XATTR;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
@@ -55,7 +56,6 @@ import static org.apache.hadoop.hdfs.ser
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_QUOTA;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_REPLICATION;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_XATTR;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_XATTR;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_START_LOG_SEGMENT;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SYMLINK;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES;
@@ -81,6 +81,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrCodec;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclEntryScope;
@@ -88,7 +89,6 @@ import org.apache.hadoop.fs.permission.A
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -116,7 +116,7 @@ import org.apache.hadoop.io.WritableFact
 import org.apache.hadoop.ipc.ClientId;
 import org.apache.hadoop.ipc.RpcConstants;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.DataChecksum;
 import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
 import org.xml.sax.helpers.AttributesImpl;
@@ -3784,7 +3784,7 @@ public abstract class FSEditLogOp {
 
     public Writer(DataOutputBuffer out) {
       this.buf = out;
-      this.checksum = new PureJavaCrc32();
+      this.checksum = DataChecksum.newCrc32();
     }
 
     /**
@@ -3835,7 +3835,7 @@ public abstract class FSEditLogOp {
       this.logVersion = logVersion;
       if (NameNodeLayoutVersion.supports(
           LayoutVersion.Feature.EDITS_CHESKUM, logVersion)) {
-        this.checksum = new PureJavaCrc32();
+        this.checksum = DataChecksum.newCrc32();
       } else {
         this.checksum = null;
       }

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Mon Jul  7 20:43:56 2014
@@ -214,6 +214,13 @@ public class FSImage implements Closeabl
 
 
     int layoutVersion = storage.getLayoutVersion();
+    if (startOpt == StartupOption.METADATAVERSION) {
+      System.out.println("HDFS Image Version: " + layoutVersion);
+      System.out.println("Software format version: " +
+        HdfsConstants.NAMENODE_LAYOUT_VERSION);
+      return false;
+    }
+
     if (layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) {
       NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
     }
@@ -289,6 +296,12 @@ public class FSImage implements Closeabl
                       storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
       StorageState curState;
+      if (startOpt == StartupOption.METADATAVERSION) {
+        /* All we need is the layout version. */
+        storage.readProperties(sd);
+        return true;
+      }
+
       try {
         curState = sd.analyzeStorage(startOpt, storage);
         // sd is locked but not opened
@@ -495,7 +508,6 @@ public class FSImage implements Closeabl
     FSImage realImage = target.getFSImage();
     FSImage ckptImage = new FSImage(conf, 
                                     checkpointDirs, checkpointEditsDirs);
-    target.dir.fsImage = ckptImage;
     // load from the checkpoint dirs
     try {
       ckptImage.recoverTransitionRead(StartupOption.REGULAR, target, null);
@@ -507,7 +519,6 @@ public class FSImage implements Closeabl
     realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
     realImage.initEditLog(StartupOption.IMPORT);
 
-    target.dir.fsImage = realImage;
     realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
 
     // and save it but keep the same checkpointTime
@@ -1011,6 +1022,13 @@ public class FSImage implements Closeabl
   }
   
   /**
+   * Update version of all storage directories.
+   */
+  public synchronized void updateStorageVersion() throws IOException {
+    storage.writeAll();
+  }
+
+  /**
    * @see #saveNamespace(FSNamesystem, Canceler)
    */
   public synchronized void saveNamespace(FSNamesystem source)

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Mon Jul  7 20:43:56 2014
@@ -781,7 +781,7 @@ public class FSImageFormat {
       final INodeFile file = new INodeFile(inodeId, localName, permissions,
           modificationTime, atime, blocks, replication, blockSize);
       if (underConstruction) {
-        file.toUnderConstruction(clientName, clientMachine, null);
+        file.toUnderConstruction(clientName, clientMachine);
       }
         return fileDiffs == null ? file : new INodeFile(file, fileDiffs);
       } else if (numBlocks == -1) {
@@ -933,8 +933,7 @@ public class FSImageFormat {
         }
 
         FileUnderConstructionFeature uc = cons.getFileUnderConstructionFeature();
-        oldnode.toUnderConstruction(uc.getClientName(), uc.getClientMachine(),
-            uc.getClientNode());
+        oldnode.toUnderConstruction(uc.getClientName(), uc.getClientMachine());
         if (oldnode.numBlocks() > 0) {
           BlockInfo ucBlock = cons.getLastBlock();
           // we do not replace the inode, just replace the last block of oldnode

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java Mon Jul  7 20:43:56 2014
@@ -53,7 +53,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.XAttrCompactProto;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.XAttrFeatureProto;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
-import org.apache.hadoop.hdfs.server.namenode.XAttrFeature;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
 import com.google.common.base.Preconditions;
@@ -299,8 +298,7 @@ public final class FSImageFormatPBINode 
       // under-construction information
       if (f.hasFileUC()) {
         INodeSection.FileUnderConstructionFeature uc = f.getFileUC();
-        file.toUnderConstruction(uc.getClientName(), uc.getClientMachine(),
-            null);
+        file.toUnderConstruction(uc.getClientName(), uc.getClientMachine());
         if (blocks.length > 0) {
           BlockInfo lastBlk = file.getLastBlock();
           // replace the last block of file

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Mon Jul  7 20:43:56 2014
@@ -149,7 +149,7 @@ public class FSImageSerialization {
 
     INodeFile file = new INodeFile(inodeId, name, perm, modificationTime,
         modificationTime, blocks, blockReplication, preferredBlockSize);
-    file.toUnderConstruction(clientName, clientMachine, null);
+    file.toUnderConstruction(clientName, clientMachine);
     return file;
   }
 

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon Jul  7 20:43:56 2014
@@ -85,7 +85,18 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
 import static org.apache.hadoop.util.Time.now;
 
-import java.io.*;
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.URI;
@@ -179,7 +190,6 @@ import org.apache.hadoop.hdfs.protocol.R
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
-import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -217,7 +227,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable.SnapshotDiffInfo;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
@@ -531,6 +540,9 @@ public class FSNamesystem implements Nam
 
   private volatile boolean imageLoaded = false;
   private final Condition cond;
+
+  private final FSImage fsImage;
+
   /**
    * Notify that loading of this FSDirectory is complete, and
    * it is imageLoaded for use
@@ -755,6 +767,7 @@ public class FSNamesystem implements Nam
     LOG.info("fsLock is fair:" + fair);
     fsLock = new FSNamesystemLock(fair);
     cond = fsLock.writeLock().newCondition();
+    this.fsImage = fsImage;
     try {
       resourceRecheckInterval = conf.getLong(
           DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
@@ -844,7 +857,7 @@ public class FSNamesystem implements Nam
           DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
 
       this.dtSecretManager = createDelegationTokenSecretManager(conf);
-      this.dir = new FSDirectory(fsImage, this, conf);
+      this.dir = new FSDirectory(this, conf);
       this.snapshotManager = new SnapshotManager(dir);
       this.cacheManager = new CacheManager(this, conf, blockManager);
       this.safeMode = new SafeModeInfo(conf);
@@ -1108,7 +1121,7 @@ public class FSNamesystem implements Nam
     LOG.info("Starting services required for active state");
     writeLock();
     try {
-      FSEditLog editLog = dir.fsImage.getEditLog();
+      FSEditLog editLog = getFSImage().getEditLog();
       
       if (!editLog.isOpenForWrite()) {
         // During startup, we're already open for write during initialization.
@@ -1137,12 +1150,12 @@ public class FSNamesystem implements Nam
               metaSaveAsString());
         }
         
-        long nextTxId = dir.fsImage.getLastAppliedTxId() + 1;
+        long nextTxId = getFSImage().getLastAppliedTxId() + 1;
         LOG.info("Will take over writing edit logs at txnid " + 
             nextTxId);
         editLog.setNextTxId(nextTxId);
 
-        dir.fsImage.editLog.openForWrite();
+        getFSImage().editLog.openForWrite();
       }
 
       // Enable quota checks.
@@ -1217,13 +1230,13 @@ public class FSNamesystem implements Nam
         ((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop();
         nnEditLogRoller.interrupt();
       }
-      if (dir != null && dir.fsImage != null) {
-        if (dir.fsImage.editLog != null) {
-          dir.fsImage.editLog.close();
+      if (dir != null && getFSImage() != null) {
+        if (getFSImage().editLog != null) {
+          getFSImage().editLog.close();
         }
         // Update the fsimage with the last txid that we wrote
         // so that the tailer starts from the right spot.
-        dir.fsImage.updateLastAppliedTxIdFromWritten();
+        getFSImage().updateLastAppliedTxIdFromWritten();
       }
       if (cacheManager != null) {
         cacheManager.stopMonitorThread();
@@ -1246,9 +1259,9 @@ public class FSNamesystem implements Nam
    */
   void startStandbyServices(final Configuration conf) throws IOException {
     LOG.info("Starting services required for standby state");
-    if (!dir.fsImage.editLog.isOpenForRead()) {
+    if (!getFSImage().editLog.isOpenForRead()) {
       // During startup, we're already open for read.
-      dir.fsImage.editLog.initSharedJournalsForRead();
+      getFSImage().editLog.initSharedJournalsForRead();
     }
     
     blockManager.setPostponeBlocksFromFuture(true);
@@ -1295,8 +1308,8 @@ public class FSNamesystem implements Nam
     if (editLogTailer != null) {
       editLogTailer.stop();
     }
-    if (dir != null && dir.fsImage != null && dir.fsImage.editLog != null) {
-      dir.fsImage.editLog.close();
+    if (dir != null && getFSImage() != null && getFSImage().editLog != null) {
+      getFSImage().editLog.close();
     }
   }
   
@@ -1539,9 +1552,9 @@ public class FSNamesystem implements Nam
    * Version of @see #getNamespaceInfo() that is not protected by a lock.
    */
   NamespaceInfo unprotectedGetNamespaceInfo() {
-    return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
+    return new NamespaceInfo(getFSImage().getStorage().getNamespaceID(),
         getClusterId(), getBlockPoolId(),
-        dir.fsImage.getStorage().getCTime());
+        getFSImage().getStorage().getCTime());
   }
 
   /**
@@ -1559,12 +1572,10 @@ public class FSNamesystem implements Nam
       try {
         stopActiveServices();
         stopStandbyServices();
-        if (dir != null) {
-          dir.close();
-        }
       } catch (IOException ie) {
-        LOG.error("Error closing FSDirectory", ie);
+      } finally {
         IOUtils.cleanup(LOG, dir);
+        IOUtils.cleanup(LOG, fsImage);
       }
     }
   }
@@ -2520,9 +2531,6 @@ public class FSNamesystem implements Nam
       }
 
       checkFsObjectLimit();
-      final DatanodeDescriptor clientNode = 
-          blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
-
       INodeFile newNode = null;
 
       // Always do an implicit mkdirs for parent directory tree.
@@ -2530,7 +2538,7 @@ public class FSNamesystem implements Nam
       if (parent != null && mkdirsRecursively(parent.toString(),
               permissions, true, now())) {
         newNode = dir.addFile(src, permissions, replication, blockSize,
-                holder, clientMachine, clientNode);
+                holder, clientMachine);
       }
 
       if (newNode == null) {
@@ -2611,10 +2619,8 @@ public class FSNamesystem implements Nam
         throw new IOException("append: lastBlock=" + lastBlock +
             " of src=" + src + " is not sufficiently replicated yet.");
       }
-      final DatanodeDescriptor clientNode = 
-          blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
-      return prepareFileForWrite(src, myFile, holder, clientMachine, clientNode,
-          true, iip.getLatestSnapshotId(), logRetryCache);
+      return prepareFileForWrite(src, myFile, holder, clientMachine, true,
+              iip.getLatestSnapshotId(), logRetryCache);
     } catch (IOException ie) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
       throw ie;
@@ -2629,7 +2635,6 @@ public class FSNamesystem implements Nam
    * @param file existing file object
    * @param leaseHolder identifier of the lease holder on this file
    * @param clientMachine identifier of the client machine
-   * @param clientNode if the client is collocated with a DN, that DN's descriptor
    * @param writeToEditLog whether to persist this change to the edit log
    * @param logRetryCache whether to record RPC ids in editlog for retry cache
    *                      rebuilding
@@ -2638,12 +2643,12 @@ public class FSNamesystem implements Nam
    * @throws IOException
    */
   LocatedBlock prepareFileForWrite(String src, INodeFile file,
-      String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
-      boolean writeToEditLog, int latestSnapshot, boolean logRetryCache)
+                                   String leaseHolder, String clientMachine,
+                                   boolean writeToEditLog,
+                                   int latestSnapshot, boolean logRetryCache)
       throws IOException {
     file = file.recordModification(latestSnapshot);
-    final INodeFile cons = file.toUnderConstruction(leaseHolder, clientMachine,
-        clientNode);
+    final INodeFile cons = file.toUnderConstruction(leaseHolder, clientMachine);
 
     leaseManager.addLease(cons.getFileUnderConstructionFeature()
         .getClientName(), src);
@@ -2652,7 +2657,7 @@ public class FSNamesystem implements Nam
     if (ret != null) {
       // update the quota: use the preferred block size for UC block
       final long diff = file.getPreferredBlockSize() - ret.getBlockSize();
-      dir.updateSpaceConsumed(src, 0, diff);
+      dir.updateSpaceConsumed(src, 0, diff * file.getBlockReplication());
     }
 
     if (writeToEditLog) {
@@ -2724,10 +2729,10 @@ public class FSNamesystem implements Nam
       // We found the lease for this file. And surprisingly the original
       // holder is trying to recreate this file. This should never occur.
       //
+
       if (!force && lease != null) {
         Lease leaseFile = leaseManager.getLeaseByPath(src);
-        if ((leaseFile != null && leaseFile.equals(lease)) ||
-            lease.getHolder().equals(holder)) { 
+        if (leaseFile != null && leaseFile.equals(lease)) {
           throw new AlreadyBeingCreatedException(
             "failed to create file " + src + " for " + holder +
             " for client " + clientMachine +
@@ -2915,7 +2920,8 @@ public class FSNamesystem implements Nam
             + maxBlocksPerFile);
       }
       blockSize = pendingFile.getPreferredBlockSize();
-      clientNode = pendingFile.getFileUnderConstructionFeature().getClientNode();
+      clientNode = blockManager.getDatanodeManager().getDatanodeByHost(
+              pendingFile.getFileUnderConstructionFeature().getClientMachine());
       replication = pendingFile.getFileReplication();
     } finally {
       readUnlock();
@@ -3121,7 +3127,9 @@ public class FSNamesystem implements Nam
         if (inode != null) src = inode.getFullPathName();
       }
       final INodeFile file = checkLease(src, clientName, inode, fileId);
-      clientnode = file.getFileUnderConstructionFeature().getClientNode();
+      String clientMachine = file.getFileUnderConstructionFeature()
+              .getClientMachine();
+      clientnode = blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
       preferredblocksize = file.getPreferredBlockSize();
 
       //find datanode storages
@@ -3881,7 +3889,7 @@ public class FSNamesystem implements Nam
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       status = mkdirsInternal(pc, src, permissions, createParent);
       if (status) {
-        resultingStat = dir.getFileInfo(src, false);
+        resultingStat = getAuditFileInfo(src, false);
       }
     } finally {
       writeUnlock();
@@ -4644,7 +4652,7 @@ public class FSNamesystem implements Nam
    * @return registration ID
    */
   String getRegistrationID() {
-    return Storage.getRegistrationID(dir.fsImage.getStorage());
+    return Storage.getRegistrationID(getFSImage().getStorage());
   }
 
   /**
@@ -4860,7 +4868,7 @@ public class FSNamesystem implements Nam
   }
 
   public FSImage getFSImage() {
-    return dir.fsImage;
+    return fsImage;
   }
 
   public FSEditLog getEditLog() {
@@ -7185,7 +7193,7 @@ public class FSNamesystem implements Nam
 
   @Override  // NameNodeMXBean
   public String getClusterId() {
-    return dir.fsImage.getStorage().getClusterID();
+    return getFSImage().getStorage().getClusterID();
   }
   
   @Override  // NameNodeMXBean
@@ -7626,7 +7634,7 @@ public class FSNamesystem implements Nam
    */
   SnapshotDiffReport getSnapshotDiffReport(String path,
       String fromSnapshot, String toSnapshot) throws IOException {
-    SnapshotDiffInfo diffs = null;
+    SnapshotDiffReport diffs;
     checkOperation(OperationCategory.READ);
     final FSPermissionChecker pc = getPermissionChecker();
     readLock();
@@ -7640,13 +7648,11 @@ public class FSNamesystem implements Nam
     } finally {
       readUnlock();
     }
-    
+
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(true, "computeSnapshotDiff", null, null, null);
     }
-    return diffs != null ? diffs.generateReport() : new SnapshotDiffReport(
-        path, fromSnapshot, toSnapshot,
-        Collections.<DiffReportEntry> emptyList());
+    return diffs;
   }
   
   private void checkSubtreeReadPermission(final FSPermissionChecker pc,
@@ -7865,6 +7871,7 @@ public class FSNamesystem implements Nam
         // roll the edit log to make sure the standby NameNode can tail
         getFSImage().rollEditLog();
       }
+      getFSImage().updateStorageVersion();
       getFSImage().renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK,
           NameNodeFile.IMAGE);
     } finally {
@@ -8449,10 +8456,7 @@ public class FSNamesystem implements Nam
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set XAttr on " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
-      if (isPermissionEnabled) {
-        checkOwner(pc, src);
-        checkPathAccess(pc, src, FsAction.WRITE);
-      }
+      checkXAttrChangeAccess(src, xAttr, pc);
       List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
       xAttrs.add(xAttr);
       dir.setXAttrs(src, xAttrs, flag);
@@ -8572,10 +8576,7 @@ public class FSNamesystem implements Nam
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove XAttr entry on " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
-      if (isPermissionEnabled) {
-        checkOwner(pc, src);
-        checkPathAccess(pc, src, FsAction.WRITE);
-      }
+      checkXAttrChangeAccess(src, xAttr, pc);
 
       List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
       xAttrs.add(xAttr);
@@ -8594,6 +8595,21 @@ public class FSNamesystem implements Nam
     logAuditEvent(true, "removeXAttr", src, null, resultingStat);
   }
 
+  private void checkXAttrChangeAccess(String src, XAttr xAttr,
+      FSPermissionChecker pc) throws UnresolvedLinkException,
+      AccessControlException {
+    if (isPermissionEnabled && xAttr.getNameSpace() == XAttr.NameSpace.USER) {
+      final INode inode = dir.getINode(src);
+      if (inode.isDirectory() && inode.getFsPermission().getStickyBit()) {
+        if (!pc.isSuperUser()) {
+          checkOwner(pc, src);
+        }
+      } else {
+        checkPathAccess(pc, src, FsAction.WRITE);
+      }
+    }
+  }
+
   /**
    * Default AuditLogger implementation; used when no access logger is
    * defined in the config file. It can also be explicitly listed in the

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java Mon Jul  7 20:43:56 2014
@@ -32,15 +32,10 @@ import org.apache.hadoop.hdfs.server.nam
 public class FileUnderConstructionFeature implements INode.Feature {
   private String clientName; // lease holder
   private final String clientMachine;
-  // if client is a cluster node too.
-  private final DatanodeDescriptor clientNode;
 
-  public FileUnderConstructionFeature(final String clientName,
-      final String clientMachine,
-      final DatanodeDescriptor clientNode) {
+  public FileUnderConstructionFeature(final String clientName, final String clientMachine) {
     this.clientName = clientName;
     this.clientMachine = clientMachine;
-    this.clientNode = clientNode;
   }
 
   public String getClientName() {
@@ -55,10 +50,6 @@ public class FileUnderConstructionFeatur
     return clientMachine;
   }
 
-  public DatanodeDescriptor getClientNode() {
-    return clientNode;
-  }
-
   /**
    * Update the length for the last block
    *

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Mon Jul  7 20:43:56 2014
@@ -26,11 +26,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.fs.PathIsNotDirectoryException;
-import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiffList;
@@ -365,7 +363,7 @@ public class INodeDirectory extends INod
    * children list nor in any snapshot; otherwise the snapshot id of the
    * corresponding snapshot diff list.
    */
-  int searchChild(INode inode) {
+  public int searchChild(INode inode) {
     INode child = getChild(inode.getLocalNameBytes(), Snapshot.CURRENT_STATE_ID);
     if (child != inode) {
       // inode is not in parent's children list, thus inode must be in
@@ -764,7 +762,9 @@ public class INodeDirectory extends INod
   public boolean metadataEquals(INodeDirectoryAttributes other) {
     return other != null
         && getQuotaCounts().equals(other.getQuotaCounts())
-        && getPermissionLong() == other.getPermissionLong();
+        && getPermissionLong() == other.getPermissionLong()
+        && getAclFeature() == other.getAclFeature()
+        && getXAttrFeature() == other.getXAttrFeature();
   }
   
   /*

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java Mon Jul  7 20:43:56 2014
@@ -53,8 +53,10 @@ public interface INodeDirectoryAttribute
     @Override
     public boolean metadataEquals(INodeDirectoryAttributes other) {
       return other != null
-          && this.getQuotaCounts().equals(other.getQuotaCounts())
-          && getPermissionLong() == other.getPermissionLong();
+          && getQuotaCounts().equals(other.getQuotaCounts())
+          && getPermissionLong() == other.getPermissionLong()
+          && getAclFeature() == other.getAclFeature()
+          && getXAttrFeature() == other.getXAttrFeature();
     }
   }
 

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Mon Jul  7 20:43:56 2014
@@ -33,7 +33,6 @@ import org.apache.hadoop.hdfs.protocol.Q
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiff;
@@ -144,6 +143,15 @@ public class INodeFile extends INodeWith
     return this;
   }
 
+  @Override
+  public boolean metadataEquals(INodeFileAttributes other) {
+    return other != null
+        && getHeaderLong()== other.getHeaderLong()
+        && getPermissionLong() == other.getPermissionLong()
+        && getAclFeature() == other.getAclFeature()
+        && getXAttrFeature() == other.getXAttrFeature();
+  }
+
   /* Start of Under-Construction Feature */
 
   /**
@@ -161,12 +169,11 @@ public class INodeFile extends INodeWith
   }
 
   /** Convert this file to an {@link INodeFileUnderConstruction}. */
-  INodeFile toUnderConstruction(String clientName, String clientMachine,
-      DatanodeDescriptor clientNode) {
+  INodeFile toUnderConstruction(String clientName, String clientMachine) {
     Preconditions.checkState(!isUnderConstruction(),
         "file is already under construction");
     FileUnderConstructionFeature uc = new FileUnderConstructionFeature(
-        clientName, clientMachine, clientNode);
+        clientName, clientMachine);
     addFeature(uc);
     return this;
   }

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java Mon Jul  7 20:43:56 2014
@@ -36,6 +36,8 @@ public interface INodeFileAttributes ext
   /** @return the header as a long. */
   public long getHeaderLong();
 
+  public boolean metadataEquals(INodeFileAttributes other);
+
   /** A copy of the inode file attributes */
   public static class SnapshotCopy extends INodeAttributes.SnapshotCopy
       implements INodeFileAttributes {
@@ -70,5 +72,14 @@ public interface INodeFileAttributes ext
     public long getHeaderLong() {
       return header;
     }
+
+    @Override
+    public boolean metadataEquals(INodeFileAttributes other) {
+      return other != null
+          && getHeaderLong()== other.getHeaderLong()
+          && getPermissionLong() == other.getPermissionLong()
+          && getAclFeature() == other.getAclFeature()
+          && getXAttrFeature() == other.getXAttrFeature();
+    }
   }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java Mon Jul  7 20:43:56 2014
@@ -435,21 +435,28 @@ public abstract class INodeReference ext
       }
     }
 
+    /**
+     * @return the WithName/DstReference node contained in the given snapshot.
+     */
     public INodeReference getParentRef(int snapshotId) {
-      // when the given snapshotId is CURRENT_STATE_ID, it is possible that we
-      // do not know where the corresponding inode belongs, thus we simply
-      // return the last reference node
-      if (snapshotId == Snapshot.CURRENT_STATE_ID) {
-        return this.getParentReference() != null ? this.getParentReference()
-            : this.getLastWithName();
-      }
-      // otherwise we search the withNameList
-      for (int i = 0; i < withNameList.size(); i++) {
-        if (snapshotId <= withNameList.get(i).lastSnapshotId) {
-          return withNameList.get(i);
+      int start = 0;
+      int end = withNameList.size() - 1;
+      while (start < end) {
+        int mid = start + (end - start) / 2;
+        int sid = withNameList.get(mid).lastSnapshotId; 
+        if (sid == snapshotId) {
+          return withNameList.get(mid);
+        } else if (sid < snapshotId) {
+          start = mid + 1;
+        } else {
+          end = mid;
         }
       }
-      return this.getParentReference();
+      if (withNameList.get(start).lastSnapshotId >= snapshotId) {
+        return withNameList.get(start);
+      } else {
+        return this.getParentReference();
+      }
     }
   }
   

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Mon Jul  7 20:43:56 2014
@@ -201,25 +201,28 @@ public class NameNode implements NameNod
   };
   
   private static final String USAGE = "Usage: java NameNode ["
-      + StartupOption.BACKUP.getName() + "] | ["
-      + StartupOption.CHECKPOINT.getName() + "] | ["
+      + StartupOption.BACKUP.getName() + "] | \n\t["
+      + StartupOption.CHECKPOINT.getName() + "] | \n\t["
       + StartupOption.FORMAT.getName() + " ["
       + StartupOption.CLUSTERID.getName() + " cid ] ["
       + StartupOption.FORCE.getName() + "] ["
-      + StartupOption.NONINTERACTIVE.getName() + "] ] | ["
+      + StartupOption.NONINTERACTIVE.getName() + "] ] | \n\t["
       + StartupOption.UPGRADE.getName() + 
         " [" + StartupOption.CLUSTERID.getName() + " cid]" +
-        " [" + StartupOption.RENAMERESERVED.getName() + "<k-v pairs>] ] | ["
-      + StartupOption.ROLLBACK.getName() + "] | ["
+        " [" + StartupOption.RENAMERESERVED.getName() + "<k-v pairs>] ] | \n\t["
+      + StartupOption.ROLLBACK.getName() + "] | \n\t["
       + StartupOption.ROLLINGUPGRADE.getName() + " <"
       + RollingUpgradeStartupOption.DOWNGRADE.name().toLowerCase() + "|"
-      + RollingUpgradeStartupOption.ROLLBACK.name().toLowerCase() + "> ] | ["
-      + StartupOption.FINALIZE.getName() + "] | ["
-      + StartupOption.IMPORT.getName() + "] | ["
-      + StartupOption.INITIALIZESHAREDEDITS.getName() + "] | ["
-      + StartupOption.BOOTSTRAPSTANDBY.getName() + "] | ["
-      + StartupOption.RECOVER.getName() + " [ " + StartupOption.FORCE.getName()
-      + " ] ]";
+      + RollingUpgradeStartupOption.ROLLBACK.name().toLowerCase() + "> ] | \n\t["
+      + StartupOption.FINALIZE.getName() + "] | \n\t["
+      + StartupOption.IMPORT.getName() + "] | \n\t["
+      + StartupOption.INITIALIZESHAREDEDITS.getName() + "] | \n\t["
+      + StartupOption.BOOTSTRAPSTANDBY.getName() + "] | \n\t["
+      + StartupOption.RECOVER.getName() + " [ "
+      + StartupOption.FORCE.getName() + "] ] | \n\t["
+      + StartupOption.METADATAVERSION.getName() + " ] "
+      + " ]";
+
   
   public long getProtocolVersion(String protocol, 
                                  long clientVersion) throws IOException {
@@ -598,7 +601,8 @@ public class NameNode implements NameNod
     
     pauseMonitor = new JvmPauseMonitor(conf);
     pauseMonitor.start();
-
+    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
+    
     startCommonServices(conf);
   }
   
@@ -830,7 +834,7 @@ public class NameNode implements NameNod
   /** get FSImage */
   @VisibleForTesting
   public FSImage getFSImage() {
-    return namesystem.dir.fsImage;
+    return namesystem.getFSImage();
   }
 
   /**
@@ -1140,7 +1144,7 @@ public class NameNode implements NameNod
         return true;
       }
     }
-    nsys.dir.fsImage.doRollback(nsys);
+    nsys.getFSImage().doRollback(nsys);
     return false;
   }
 
@@ -1265,6 +1269,8 @@ public class NameNode implements NameNod
               "can't understand option \"" + args[i] + "\"");
           }
         }
+      } else if (StartupOption.METADATAVERSION.getName().equalsIgnoreCase(cmd)) {
+        startOpt = StartupOption.METADATAVERSION;
       } else {
         return null;
       }
@@ -1317,6 +1323,21 @@ public class NameNode implements NameNod
     }
   }
 
+  /**
+   * Verify that configured directories exist, then print the metadata versions
+   * of the software and the image.
+   *
+   * @param conf configuration to use
+   * @throws IOException
+   */
+  private static boolean printMetadataVersion(Configuration conf)
+    throws IOException {
+    final FSImage fsImage = new FSImage(conf);
+    final FSNamesystem fs = new FSNamesystem(conf, fsImage, false);
+    return fsImage.recoverTransitionRead(
+      StartupOption.METADATAVERSION, fs, null);
+  }
+
   public static NameNode createNameNode(String argv[], Configuration conf)
       throws IOException {
     LOG.info("createNameNode " + Arrays.asList(argv));
@@ -1381,6 +1402,11 @@ public class NameNode implements NameNod
         NameNode.doRecovery(startOpt, conf);
         return null;
       }
+      case METADATAVERSION: {
+        printMetadataVersion(conf);
+        terminate(0);
+        return null; // avoid javac warning
+      }
       default: {
         DefaultMetricsSystem.initialize("NameNode");
         return new NameNode(conf);

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Mon Jul  7 20:43:56 2014
@@ -1049,6 +1049,11 @@ class NameNodeRpcServer implements Namen
     boolean noStaleStorages = false;
     for(StorageBlockReport r : reports) {
       final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks());
+      //
+      // BlockManager.processReport accumulates information of prior calls
+      // for the same node and storage, so the value returned by the last
+      // call of this loop is the final updated value for noStaleStorage.
+      //
       noStaleStorages = bm.processReport(nodeReg, r.getStorage(), poolId, blocks);
       metrics.incrStorageBlockReportOps();
     }

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java Mon Jul  7 20:43:56 2014
@@ -34,7 +34,8 @@ import com.google.common.collect.Lists;
  * USER - extended user attributes: these can be assigned to files and
  * directories to store arbitrary additional information. The access
  * permissions for user attributes are defined by the file permission
- * bits.
+ * bits. For sticky directories, only the owner and privileged user can 
+ * write attributes.
  * <br>
  * TRUSTED - trusted extended attributes: these are visible/accessible
  * only to/by the super user.

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java Mon Jul  7 20:43:56 2014
@@ -98,7 +98,11 @@ public class NameNodeMetrics {
   @Metric("GetImageServlet putImage")
   MutableRate putImage;
 
-  NameNodeMetrics(String processName, String sessionId, int[] intervals) {
+  JvmMetrics jvmMetrics = null;
+  
+  NameNodeMetrics(String processName, String sessionId, int[] intervals,
+      final JvmMetrics jvmMetrics) {
+    this.jvmMetrics = jvmMetrics;
     registry.tag(ProcessName, processName).tag(SessionId, sessionId);
     
     final int len = intervals.length;
@@ -124,14 +128,19 @@ public class NameNodeMetrics {
     String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
     String processName = r.toString();
     MetricsSystem ms = DefaultMetricsSystem.instance();
-    JvmMetrics.create(processName, sessionId, ms);
+    JvmMetrics jm = JvmMetrics.create(processName, sessionId, ms);
     
     // Percentile measurement is off by default, by watching no intervals
     int[] intervals = 
         conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
-    return ms.register(new NameNodeMetrics(processName, sessionId, intervals));
+    return ms.register(new NameNodeMetrics(processName, sessionId,
+        intervals, jm));
   }
 
+  public JvmMetrics getJvmMetrics() {
+    return jvmMetrics;
+  }
+  
   public void shutdown() {
     DefaultMetricsSystem.shutdown();
   }

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java?rev=1608603&r1=1608602&r2=1608603&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java Mon Jul  7 20:43:56 2014
@@ -227,32 +227,34 @@ abstract class AbstractINodeDiffList<N e
     D diff = getDiffById(snapshotId);
     return diff == null ? Snapshot.CURRENT_STATE_ID : diff.getSnapshotId();
   }
-  
-  /**
-   * Check if changes have happened between two snapshots.
-   * @param earlier The snapshot taken earlier
-   * @param later The snapshot taken later
-   * @return Whether or not modifications (including diretory/file metadata
-   *         change, file creation/deletion under the directory) have happened
-   *         between snapshots.
-   */
-  final boolean changedBetweenSnapshots(Snapshot earlier, Snapshot later) {
+
+  final int[] changedBetweenSnapshots(Snapshot from, Snapshot to) {
+    Snapshot earlier = from;
+    Snapshot later = to;
+    if (Snapshot.ID_COMPARATOR.compare(from, to) > 0) {
+      earlier = to;
+      later = from;
+    }
+
     final int size = diffs.size();
     int earlierDiffIndex = Collections.binarySearch(diffs, earlier.getId());
+    int laterDiffIndex = later == null ? size : Collections
+        .binarySearch(diffs, later.getId());
     if (-earlierDiffIndex - 1 == size) {
       // if the earlierSnapshot is after the latest SnapshotDiff stored in
       // diffs, no modification happened after the earlierSnapshot
-      return false;
+      return null;
     }
-    if (later != null) {
-      int laterDiffIndex = Collections.binarySearch(diffs, later.getId());
-      if (laterDiffIndex == -1 || laterDiffIndex == 0) {
-        // if the laterSnapshot is the earliest SnapshotDiff stored in diffs, or
-        // before it, no modification happened before the laterSnapshot
-        return false;
-      }
+    if (laterDiffIndex == -1 || laterDiffIndex == 0) {
+      // if the laterSnapshot is the earliest SnapshotDiff stored in diffs, or
+      // before it, no modification happened before the laterSnapshot
+      return null;
     }
-    return true;
+    earlierDiffIndex = earlierDiffIndex < 0 ? (-earlierDiffIndex - 1)
+        : earlierDiffIndex;
+    laterDiffIndex = laterDiffIndex < 0 ? (-laterDiffIndex - 1)
+        : laterDiffIndex;
+    return new int[]{earlierDiffIndex, laterDiffIndex};
   }
 
   /**