You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/12 00:55:03 UTC

[29/50] [abbrv] hadoop git commit: HDFS-6757. Simplify lease manager with INodeID. Contributed by Haohui Mai.

HDFS-6757. Simplify lease manager with INodeID. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/00fe1ed3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/00fe1ed3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/00fe1ed3

Branch: refs/heads/HDFS-7240
Commit: 00fe1ed3a4b3ee35fe24be257ec36445d2f44d63
Parents: 6471d18
Author: Haohui Mai <wh...@apache.org>
Authored: Fri May 8 23:04:31 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri May 8 23:04:31 2015 -0700

----------------------------------------------------------------------
 .../hdfs/server/namenode/FSDirDeleteOp.java     |  22 +-
 .../hdfs/server/namenode/FSDirRenameOp.java     |  16 +-
 .../hdfs/server/namenode/FSEditLogLoader.java   |  13 +-
 .../hdfs/server/namenode/FSImageFormat.java     |  46 ++-
 .../server/namenode/FSImageFormatPBINode.java   |  23 +-
 .../hdfs/server/namenode/FSNamesystem.java      |  87 ++----
 .../hadoop/hdfs/server/namenode/INode.java      |  19 +-
 .../hdfs/server/namenode/INodeDirectory.java    |  34 ++-
 .../hadoop/hdfs/server/namenode/INodeFile.java  |  23 +-
 .../hadoop/hdfs/server/namenode/INodeMap.java   |  12 +-
 .../hdfs/server/namenode/INodeReference.java    |  55 ++--
 .../hdfs/server/namenode/INodeSymlink.java      |  14 +-
 .../hdfs/server/namenode/LeaseManager.java      | 300 ++++++-------------
 .../snapshot/AbstractINodeDiffList.java         |   2 +-
 .../snapshot/DirectorySnapshottableFeature.java |   2 +-
 .../snapshot/DirectoryWithSnapshotFeature.java  |  69 +++--
 .../snapshot/FileWithSnapshotFeature.java       |   2 +-
 .../namenode/snapshot/SnapshotManager.java      |   4 +-
 .../java/org/apache/hadoop/hdfs/TestLease.java  |   4 +-
 .../hdfs/server/namenode/NameNodeAdapter.java   |  21 +-
 .../namenode/TestDiskspaceQuotaUpdate.java      |  11 +-
 .../hdfs/server/namenode/TestFSImage.java       |   2 +-
 .../hdfs/server/namenode/TestFSNamesystem.java  |   2 +-
 .../server/namenode/TestGetBlockLocations.java  |   3 +-
 .../hdfs/server/namenode/TestLeaseManager.java  |  54 ++--
 .../hdfs/server/namenode/TestSaveNamespace.java |   3 +-
 ...tINodeFileUnderConstructionWithSnapshot.java |  29 ++
 27 files changed, 414 insertions(+), 458 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
index 2192c24..c31d75f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
@@ -41,7 +41,8 @@ class FSDirDeleteOp {
    */
   static long delete(
       FSDirectory fsd, INodesInPath iip, BlocksMapUpdateInfo collectedBlocks,
-      List<INode> removedINodes, long mtime) throws IOException {
+      List<INode> removedINodes, List<Long> removedUCFiles,
+      long mtime) throws IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + iip.getPath());
     }
@@ -54,7 +55,7 @@ class FSDirDeleteOp {
         List<INodeDirectory> snapshottableDirs = new ArrayList<>();
         FSDirSnapshotOp.checkSnapshot(iip.getLastINode(), snapshottableDirs);
         filesRemoved = unprotectedDelete(fsd, iip, collectedBlocks,
-                                         removedINodes, mtime);
+                                         removedINodes, removedUCFiles, mtime);
         fsd.getFSNamesystem().removeSnapshottableDirs(snapshottableDirs);
       }
     } finally {
@@ -118,6 +119,7 @@ class FSDirDeleteOp {
     FSNamesystem fsn = fsd.getFSNamesystem();
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     List<INode> removedINodes = new ChunkedArrayList<>();
+    List<Long> removedUCFiles = new ChunkedArrayList<>();
 
     final INodesInPath iip = fsd.getINodesInPath4Write(
         FSDirectory.normalizePath(src), false);
@@ -127,11 +129,11 @@ class FSDirDeleteOp {
     List<INodeDirectory> snapshottableDirs = new ArrayList<>();
     FSDirSnapshotOp.checkSnapshot(iip.getLastINode(), snapshottableDirs);
     long filesRemoved = unprotectedDelete(
-        fsd, iip, collectedBlocks, removedINodes, mtime);
+        fsd, iip, collectedBlocks, removedINodes, removedUCFiles, mtime);
     fsn.removeSnapshottableDirs(snapshottableDirs);
 
     if (filesRemoved >= 0) {
-      fsn.removeLeasesAndINodes(src, removedINodes, false);
+      fsn.removeLeasesAndINodes(removedUCFiles, removedINodes, false);
       fsn.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
     }
   }
@@ -163,18 +165,19 @@ class FSDirDeleteOp {
     FSDirectory fsd = fsn.getFSDirectory();
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     List<INode> removedINodes = new ChunkedArrayList<>();
+    List<Long> removedUCFiles = new ChunkedArrayList<>();
 
     long mtime = now();
     // Unlink the target directory from directory tree
     long filesRemoved = delete(
-        fsd, iip, collectedBlocks, removedINodes, mtime);
+        fsd, iip, collectedBlocks, removedINodes, removedUCFiles, mtime);
     if (filesRemoved < 0) {
       return null;
     }
     fsd.getEditLog().logDelete(src, mtime, logRetryCache);
     incrDeletedFileCount(filesRemoved);
 
-    fsn.removeLeasesAndINodes(src, removedINodes, true);
+    fsn.removeLeasesAndINodes(removedUCFiles, removedINodes, true);
 
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* Namesystem.delete: "
@@ -212,12 +215,13 @@ class FSDirDeleteOp {
    * @param iip the inodes resolved from the path
    * @param collectedBlocks blocks collected from the deleted path
    * @param removedINodes inodes that should be removed from inodeMap
+   * @param removedUCFiles inodes whose leases need to be released
    * @param mtime the time the inode is removed
    * @return the number of inodes deleted; 0 if no inodes are deleted.
    */
   private static long unprotectedDelete(
       FSDirectory fsd, INodesInPath iip, BlocksMapUpdateInfo collectedBlocks,
-      List<INode> removedINodes, long mtime) {
+      List<INode> removedINodes, List<Long> removedUCFiles, long mtime) {
     assert fsd.hasWriteLock();
 
     // check if target node exists
@@ -248,11 +252,11 @@ class FSDirDeleteOp {
     // collect block and update quota
     if (!targetNode.isInLatestSnapshot(latestSnapshot)) {
       targetNode.destroyAndCollectBlocks(fsd.getBlockStoragePolicySuite(),
-        collectedBlocks, removedINodes);
+        collectedBlocks, removedINodes, removedUCFiles);
     } else {
       QuotaCounts counts = targetNode.cleanSubtree(
         fsd.getBlockStoragePolicySuite(), CURRENT_STATE_ID,
-          latestSnapshot, collectedBlocks, removedINodes);
+          latestSnapshot, collectedBlocks, removedINodes, removedUCFiles);
       removed = counts.getNameSpace();
       fsd.updateCountNoQuotaCheck(iip, iip.length() -1, counts.negation());
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
index 4a20a62..d5faa78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
@@ -680,8 +680,6 @@ class FSDirRenameOp {
       srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshotId());
       final INode dstParent = dstParentIIP.getLastINode();
       dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshotId());
-      // update moved lease with new filename
-      fsd.getFSNamesystem().unprotectedChangeLease(src, dst);
     }
 
     void restoreSource() throws QuotaExceededException {
@@ -731,16 +729,20 @@ class FSDirRenameOp {
         throws QuotaExceededException {
       Preconditions.checkState(oldDstChild != null);
       List<INode> removedINodes = new ChunkedArrayList<>();
+      List<Long> removedUCFiles = new ChunkedArrayList<>();
       final boolean filesDeleted;
       if (!oldDstChild.isInLatestSnapshot(dstIIP.getLatestSnapshotId())) {
-        oldDstChild.destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
+        oldDstChild.destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes,
+                                            removedUCFiles);
         filesDeleted = true;
       } else {
-        filesDeleted = oldDstChild.cleanSubtree(bsps, Snapshot.CURRENT_STATE_ID,
-            dstIIP.getLatestSnapshotId(), collectedBlocks, removedINodes)
-            .getNameSpace() >= 0;
+        filesDeleted = oldDstChild.cleanSubtree(
+            bsps, Snapshot.CURRENT_STATE_ID,
+            dstIIP.getLatestSnapshotId(), collectedBlocks,
+            removedINodes, removedUCFiles).getNameSpace() >= 0;
       }
-      fsd.getFSNamesystem().removeLeasesAndINodes(src, removedINodes, false);
+      fsd.getFSNamesystem().removeLeasesAndINodes(
+          removedUCFiles, removedINodes, false);
       return filesDeleted;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index eaa2e77..7964188 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -29,6 +29,7 @@ import java.util.EnumMap;
 import java.util.EnumSet;
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -374,7 +375,7 @@ public class FSEditLogLoader {
             addCloseOp.clientMachine,
             addCloseOp.storagePolicyId);
         iip = INodesInPath.replace(iip, iip.length() - 1, newFile);
-        fsNamesys.leaseManager.addLease(addCloseOp.clientName, path);
+        fsNamesys.leaseManager.addLease(addCloseOp.clientName, newFile.getId());
 
         // add the op into retry cache if necessary
         if (toAddRetryCache) {
@@ -446,9 +447,9 @@ public class FSEditLogLoader {
             "File is not under construction: " + path);
       }
       // One might expect that you could use removeLease(holder, path) here,
-      // but OP_CLOSE doesn't serialize the holder. So, remove by path.
+      // but OP_CLOSE doesn't serialize the holder. So, remove the inode.
       if (file.isUnderConstruction()) {
-        fsNamesys.leaseManager.removeLeaseWithPrefixPath(path);
+        fsNamesys.leaseManager.removeLeases(Lists.newArrayList(file.getId()));
         file.toCompleteFile(file.getModificationTime());
       }
       break;
@@ -701,8 +702,8 @@ public class FSEditLogLoader {
           renameReservedPathsOnUpgrade(reassignLeaseOp.path, logVersion);
       INodeFile pendingFile = fsDir.getINode(path).asFile();
       Preconditions.checkState(pendingFile.isUnderConstruction());
-      fsNamesys.reassignLeaseInternal(lease,
-          path, reassignLeaseOp.newHolder, pendingFile);
+      fsNamesys.reassignLeaseInternal(lease, reassignLeaseOp.newHolder,
+              pendingFile);
       break;
     }
     case OP_START_LOG_SEGMENT:
@@ -739,7 +740,7 @@ public class FSEditLogLoader {
       collectedBlocks.clear();
       fsNamesys.dir.removeFromInodeMap(removedINodes);
       removedINodes.clear();
-      
+
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(deleteSnapshotOp.rpcClientId,
             deleteSnapshotOp.rpcCallId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
index d1d007f..ec2babd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
@@ -966,8 +966,7 @@ public class FSImageFormat {
         }
 
         if (!inSnapshot) {
-          namesystem.leaseManager.addLease(cons
-              .getFileUnderConstructionFeature().getClientName(), path);
+          namesystem.leaseManager.addLease(uc.getClientName(), oldnode.getId());
         }
       }
     }
@@ -1297,7 +1296,7 @@ public class FSImageFormat {
         // paths, so that when loading fsimage we do not put them into the lease
         // map. In the future, we can remove this hack when we can bump the
         // layout version.
-        sourceNamesystem.saveFilesUnderConstruction(out, snapshotUCMap);
+        saveFilesUnderConstruction(sourceNamesystem, out, snapshotUCMap);
 
         context.checkCancelled();
         sourceNamesystem.saveSecretManagerStateCompat(out, sdPath);
@@ -1448,5 +1447,46 @@ public class FSImageFormat {
         counter.increment();
       }
     }
+
+    /**
+     * Serializes leases.
+     */
+    void saveFilesUnderConstruction(FSNamesystem fsn, DataOutputStream out,
+                                    Map<Long, INodeFile> snapshotUCMap) throws IOException {
+      // This is run by an inferior thread of saveNamespace, which holds a read
+      // lock on our behalf. If we took the read lock here, we could block
+      // for fairness if a writer is waiting on the lock.
+      final LeaseManager leaseManager = fsn.getLeaseManager();
+      final FSDirectory dir = fsn.getFSDirectory();
+      synchronized (leaseManager) {
+        Collection<Long> filesWithUC = leaseManager.getINodeIdWithLeases();
+        for (Long id : filesWithUC) {
+          // TODO: for HDFS-5428, because of rename operations, some
+          // under-construction files that are
+          // in the current fs directory can also be captured in the
+          // snapshotUCMap. We should remove them from the snapshotUCMap.
+          snapshotUCMap.remove(id);
+        }
+        out.writeInt(filesWithUC.size() + snapshotUCMap.size()); // write the size
+
+        for (Long id : filesWithUC) {
+          INodeFile file = dir.getInode(id).asFile();
+          String path = file.getFullPathName();
+          FSImageSerialization.writeINodeUnderConstruction(
+                  out, file, path);
+        }
+
+        for (Map.Entry<Long, INodeFile> entry : snapshotUCMap.entrySet()) {
+          // for those snapshot INodeFileUC, we use "/.reserved/.inodes/<inodeid>"
+          // as their paths
+          StringBuilder b = new StringBuilder();
+          b.append(FSDirectory.DOT_RESERVED_PATH_PREFIX)
+                  .append(Path.SEPARATOR).append(FSDirectory.DOT_INODES_STRING)
+                  .append(Path.SEPARATOR).append(entry.getValue().getId());
+          FSImageSerialization.writeINodeUnderConstruction(
+                  out, entry.getValue(), b.toString());
+        }
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index b758458..d966c69 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -279,7 +280,8 @@ public final class FSImageFormatPBINode {
         INodeFile file = dir.getInode(entry.getInodeId()).asFile();
         FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
         Preconditions.checkState(uc != null); // file must be under-construction
-        fsn.leaseManager.addLease(uc.getClientName(), entry.getFullPath());
+        fsn.leaseManager.addLease(uc.getClientName(),
+                entry.getInodeId());
       }
     }
 
@@ -576,10 +578,21 @@ public final class FSImageFormatPBINode {
     }
 
     void serializeFilesUCSection(OutputStream out) throws IOException {
-      Map<String, INodeFile> ucMap = fsn.getFilesUnderConstruction();
-      for (Map.Entry<String, INodeFile> entry : ucMap.entrySet()) {
-        String path = entry.getKey();
-        INodeFile file = entry.getValue();
+      Collection<Long> filesWithUC = fsn.getLeaseManager()
+              .getINodeIdWithLeases();
+      for (Long id : filesWithUC) {
+        INode inode = fsn.getFSDirectory().getInode(id);
+        if (inode == null) {
+          LOG.warn("Fail to find inode " + id + " when saving the leases.");
+          continue;
+        }
+        INodeFile file = inode.asFile();
+        if (!file.isUnderConstruction()) {
+          LOG.warn("Fail to save the lease for inode id " + id
+                       + " as the file is not under construction");
+          continue;
+        }
+        String path = file.getFullPathName();
         FileUnderConstructionEntry.Builder b = FileUnderConstructionEntry
             .newBuilder().setInodeId(file.getId()).setFullPath(path);
         FileUnderConstructionEntry e = b.build();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 60495af..ef069d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -2087,12 +2087,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
                                Block newBlock)
       throws IOException {
     INodeFile file = iip.getLastINode().asFile();
-    String src = iip.getPath();
     file.recordModification(iip.getLatestSnapshotId());
     file.toUnderConstruction(leaseHolder, clientMachine);
     assert file.isUnderConstruction() : "inode should be under construction.";
     leaseManager.addLease(
-        file.getFileUnderConstructionFeature().getClientName(), src);
+        file.getFileUnderConstructionFeature().getClientName(), file.getId());
     boolean shouldRecoverNow = (newBlock == null);
     BlockInfoContiguous oldBlock = file.getLastBlock();
     boolean shouldCopyOnTruncate = shouldCopyOnTruncate(file, oldBlock);
@@ -2568,13 +2567,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       } else {
         if (overwrite) {
           toRemoveBlocks = new BlocksMapUpdateInfo();
-          List<INode> toRemoveINodes = new ChunkedArrayList<INode>();
-          long ret = FSDirDeleteOp.delete(dir, iip, toRemoveBlocks,
-                                          toRemoveINodes, now());
+          List<INode> toRemoveINodes = new ChunkedArrayList<>();
+          List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
+          long ret = FSDirDeleteOp.delete(
+              dir, iip, toRemoveBlocks, toRemoveINodes,
+              toRemoveUCFiles, now());
           if (ret >= 0) {
             iip = INodesInPath.replace(iip, iip.length() - 1, null);
             FSDirDeleteOp.incrDeletedFileCount(ret);
-            removeLeasesAndINodes(src, toRemoveINodes, true);
+            removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
           }
         } else {
           // If lease soft limit time is expired, recover the lease
@@ -2601,7 +2602,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         throw new IOException("Unable to add " + src +  " to namespace");
       }
       leaseManager.addLease(newNode.getFileUnderConstructionFeature()
-          .getClientName(), src);
+          .getClientName(), newNode.getId());
 
       // Set encryption attributes if necessary
       if (feInfo != null) {
@@ -2745,7 +2746,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     file.toUnderConstruction(leaseHolder, clientMachine);
 
     leaseManager.addLease(
-        file.getFileUnderConstructionFeature().getClientName(), src);
+        file.getFileUnderConstructionFeature().getClientName(), file.getId());
 
     LocatedBlock ret = null;
     if (!newBlock) {
@@ -2897,7 +2898,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       Lease lease = leaseManager.getLease(holder);
 
       if (!force && lease != null) {
-        Lease leaseFile = leaseManager.getLeaseByPath(src);
+        Lease leaseFile = leaseManager.getLease(file);
         if (leaseFile != null && leaseFile.equals(lease)) {
           // We found the lease for this file but the original
           // holder is trying to obtain it again.
@@ -3758,15 +3759,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   
   /**
    * Remove leases and inodes related to a given path
-   * @param src The given path
+   * @param removedUCFiles INodes whose leases need to be released
    * @param removedINodes Containing the list of inodes to be removed from
    *                      inodesMap
    * @param acquireINodeMapLock Whether to acquire the lock for inode removal
    */
-  void removeLeasesAndINodes(String src, List<INode> removedINodes,
+  void removeLeasesAndINodes(List<Long> removedUCFiles,
+      List<INode> removedINodes,
       final boolean acquireINodeMapLock) {
     assert hasWriteLock();
-    leaseManager.removeLeaseWithPrefixPath(src);
+    leaseManager.removeLeases(removedUCFiles);
     // remove inodes from inodesMap
     if (removedINodes != null) {
       if (acquireINodeMapLock) {
@@ -4156,14 +4158,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       return lease;
     // The following transaction is not synced. Make sure it's sync'ed later.
     logReassignLease(lease.getHolder(), src, newHolder);
-    return reassignLeaseInternal(lease, src, newHolder, pendingFile);
+    return reassignLeaseInternal(lease, newHolder, pendingFile);
   }
   
-  Lease reassignLeaseInternal(Lease lease, String src, String newHolder,
-      INodeFile pendingFile) {
+  Lease reassignLeaseInternal(Lease lease, String newHolder, INodeFile pendingFile) {
     assert hasWriteLock();
     pendingFile.getFileUnderConstructionFeature().setClientName(newHolder);
-    return leaseManager.reassignLease(lease, src, newHolder);
+    return leaseManager.reassignLease(lease, pendingFile, newHolder);
   }
 
   private void commitOrCompleteLastBlock(final INodeFile fileINode,
@@ -4191,7 +4192,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
     FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
     Preconditions.checkArgument(uc != null);
-    leaseManager.removeLease(uc.getClientName(), src);
+    leaseManager.removeLease(uc.getClientName(), pendingFile);
     
     pendingFile.recordModification(latestSnapshot);
 
@@ -6401,58 +6402,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     persistBlocks(src, pendingFile, logRetryCache);
   }
 
-  // rename was successful. If any part of the renamed subtree had
-  // files that were being written to, update with new filename.
-  void unprotectedChangeLease(String src, String dst) {
-    assert hasWriteLock();
-    leaseManager.changeLease(src, dst);
-  }
-
-  /**
-   * Serializes leases.
-   */
-  void saveFilesUnderConstruction(DataOutputStream out,
-      Map<Long, INodeFile> snapshotUCMap) throws IOException {
-    // This is run by an inferior thread of saveNamespace, which holds a read
-    // lock on our behalf. If we took the read lock here, we could block
-    // for fairness if a writer is waiting on the lock.
-    synchronized (leaseManager) {
-      Map<String, INodeFile> nodes = leaseManager.getINodesUnderConstruction();
-      for (Map.Entry<String, INodeFile> entry : nodes.entrySet()) {
-        // TODO: for HDFS-5428, because of rename operations, some
-        // under-construction files that are
-        // in the current fs directory can also be captured in the
-        // snapshotUCMap. We should remove them from the snapshotUCMap.
-        snapshotUCMap.remove(entry.getValue().getId());
-      }
-
-      out.writeInt(nodes.size() + snapshotUCMap.size()); // write the size
-      for (Map.Entry<String, INodeFile> entry : nodes.entrySet()) {
-        FSImageSerialization.writeINodeUnderConstruction(
-            out, entry.getValue(), entry.getKey());
-      }
-      for (Map.Entry<Long, INodeFile> entry : snapshotUCMap.entrySet()) {
-        // for those snapshot INodeFileUC, we use "/.reserved/.inodes/<inodeid>"
-        // as their paths
-        StringBuilder b = new StringBuilder();
-        b.append(FSDirectory.DOT_RESERVED_PATH_PREFIX)
-            .append(Path.SEPARATOR).append(FSDirectory.DOT_INODES_STRING)
-            .append(Path.SEPARATOR).append(entry.getValue().getId());
-        FSImageSerialization.writeINodeUnderConstruction(
-            out, entry.getValue(), b.toString());
-      }
-    }
-  }
-
-  /**
-   * @return all the under-construction files in the lease map
-   */
-  Map<String, INodeFile> getFilesUnderConstruction() {
-    synchronized (leaseManager) {
-      return leaseManager.getINodesUnderConstruction();
-    }
-  }
-
   /**
    * Register a Backup name-node, verifying that it belongs
    * to the correct namespace, and adding it to the set of

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
index f8efd76..b65879f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
@@ -390,7 +390,7 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
    * @param bsps
    *          block storage policy suite to calculate intended storage type usage
    * @param snapshotId
-   *          The id of the snapshot to delete. 
+   *          The id of the snapshot to delete.
    *          {@link Snapshot#CURRENT_STATE_ID} means to delete the current
    *          file/directory.
    * @param priorSnapshotId
@@ -401,14 +401,16 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
    *          blocks collected from the descents for further block
    *          deletion/update will be added to the given map.
    * @param removedINodes
-   *          INodes collected from the descents for further cleaning up of 
+   *          INodes collected from the descents for further cleaning up of
    *          inodeMap
+   * @param removedUCFiles
+   *          INodes whose leases need to be released
    * @return quota usage delta when deleting a snapshot
    */
-  public abstract QuotaCounts cleanSubtree(final BlockStoragePolicySuite bsps,
-      final int snapshotId,
+  public abstract QuotaCounts cleanSubtree(
+      final BlockStoragePolicySuite bsps, final int snapshotId,
       int priorSnapshotId, BlocksMapUpdateInfo collectedBlocks,
-      List<INode> removedINodes);
+      List<INode> removedINodes, List<Long> removedUCFiles);
   
   /**
    * Destroy self and clear everything! If the INode is a file, this method
@@ -416,7 +418,6 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
    * directory, the method goes down the subtree and collects blocks from the
    * descents, and clears its parent/children references as well. The method
    * also clears the diff list if the INode contains snapshot diff list.
-   *
    * @param bsps
    *          block storage policy suite to calculate intended storage type usage
    *          This is needed because INodeReference#destroyAndCollectBlocks() needs
@@ -427,10 +428,12 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
    * @param removedINodes
    *          INodes collected from the descents for further cleaning up of
    *          inodeMap
+   * @param removedUCFiles
+   *          INodes whose leases need to be released
    */
   public abstract void destroyAndCollectBlocks(
-      BlockStoragePolicySuite bsps,
-      BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes);
+      BlockStoragePolicySuite bsps, BlocksMapUpdateInfo collectedBlocks,
+      List<INode> removedINodes, List<Long> removedUCFiles);
 
   /** Compute {@link ContentSummary}. Blocking call */
   public final ContentSummary computeContentSummary(BlockStoragePolicySuite bsps) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
index 098594d..fa63889 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
@@ -753,10 +753,11 @@ public class INodeDirectory extends INodeWithAdditionalFields
   }
 
   /** Call cleanSubtree(..) recursively down the subtree. */
-  public QuotaCounts cleanSubtreeRecursively(final BlockStoragePolicySuite bsps,
-      final int snapshot,
-      int prior, final BlocksMapUpdateInfo collectedBlocks,
-      final List<INode> removedINodes, final Map<INode, INode> excludedNodes) {
+  public QuotaCounts cleanSubtreeRecursively(
+      final BlockStoragePolicySuite bsps, final int snapshot, int prior,
+      final BlocksMapUpdateInfo collectedBlocks,
+      final List<INode> removedINodes, List<Long> removedUCFiles,
+      final Map<INode, INode> excludedNodes) {
     QuotaCounts counts = new QuotaCounts.Builder().build();
     // in case of deletion snapshot, since this call happens after we modify
     // the diff list, the snapshot to be deleted has been combined or renamed
@@ -771,7 +772,7 @@ public class INodeDirectory extends INodeWithAdditionalFields
         continue;
       } else {
         QuotaCounts childCounts = child.cleanSubtree(bsps, snapshot, prior,
-            collectedBlocks, removedINodes);
+            collectedBlocks, removedINodes, removedUCFiles);
         counts.add(childCounts);
       }
     }
@@ -779,15 +780,17 @@ public class INodeDirectory extends INodeWithAdditionalFields
   }
 
   @Override
-  public void destroyAndCollectBlocks(final BlockStoragePolicySuite bsps,
+  public void destroyAndCollectBlocks(
+      final BlockStoragePolicySuite bsps,
       final BlocksMapUpdateInfo collectedBlocks,
-      final List<INode> removedINodes) {
+      final List<INode> removedINodes, List<Long> removedUCFiles) {
     final DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
     if (sf != null) {
-      sf.clear(bsps, this, collectedBlocks, removedINodes);
+      sf.clear(bsps, this, collectedBlocks, removedINodes, removedUCFiles);
     }
     for (INode child : getChildrenList(Snapshot.CURRENT_STATE_ID)) {
-      child.destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
+      child.destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes,
+                                    removedUCFiles);
     }
     if (getAclFeature() != null) {
       AclStorage.removeAclFeature(getAclFeature());
@@ -797,15 +800,15 @@ public class INodeDirectory extends INodeWithAdditionalFields
   }
   
   @Override
-  public QuotaCounts cleanSubtree(final BlockStoragePolicySuite bsps,
-      final int snapshotId, int priorSnapshotId,
+  public QuotaCounts cleanSubtree(
+      final BlockStoragePolicySuite bsps, final int snapshotId, int priorSnapshotId,
       final BlocksMapUpdateInfo collectedBlocks,
-      final List<INode> removedINodes) {
+      final List<INode> removedINodes, List<Long> removedUCFiles) {
     DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
     // there is snapshot data
     if (sf != null) {
       return sf.cleanDirectory(bsps, this, snapshotId, priorSnapshotId,
-          collectedBlocks, removedINodes);
+          collectedBlocks, removedINodes, removedUCFiles);
     }
     // there is no snapshot data
     if (priorSnapshotId == Snapshot.NO_SNAPSHOT_ID
@@ -813,12 +816,13 @@ public class INodeDirectory extends INodeWithAdditionalFields
       // destroy the whole subtree and collect blocks that should be deleted
       QuotaCounts counts = new QuotaCounts.Builder().build();
       this.computeQuotaUsage(bsps, counts, true);
-      destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
+      destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes,
+                              removedUCFiles);
       return counts; 
     } else {
       // process recursively down the subtree
       QuotaCounts counts = cleanSubtreeRecursively(bsps, snapshotId, priorSnapshotId,
-          collectedBlocks, removedINodes, null);
+          collectedBlocks, removedINodes, removedUCFiles, null);
       if (isQuotaSet()) {
         getDirectoryWithQuotaFeature().addSpaceConsumed2Cache(counts.negation());
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 110bd71..1d9c0ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -494,28 +494,33 @@ public class INodeFile extends INodeWithAdditionalFields
   }
 
   @Override
-  public QuotaCounts cleanSubtree(BlockStoragePolicySuite bsps, final int snapshot,
-                                  int priorSnapshotId,
+  public QuotaCounts cleanSubtree(
+      BlockStoragePolicySuite bsps, final int snapshot, int priorSnapshotId,
       final BlocksMapUpdateInfo collectedBlocks,
-      final List<INode> removedINodes) {
+      final List<INode> removedINodes, List<Long> removedUCFiles) {
     FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
     if (sf != null) {
       return sf.cleanFile(bsps, this, snapshot, priorSnapshotId, collectedBlocks,
           removedINodes);
     }
     QuotaCounts counts = new QuotaCounts.Builder().build();
+
     if (snapshot == CURRENT_STATE_ID) {
       if (priorSnapshotId == NO_SNAPSHOT_ID) {
         // this only happens when deleting the current file and the file is not
         // in any snapshot
         computeQuotaUsage(bsps, counts, false);
-        destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
+        destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes,
+                                removedUCFiles);
       } else {
+        FileUnderConstructionFeature uc = getFileUnderConstructionFeature();
         // when deleting the current file and the file is in snapshot, we should
         // clean the 0-sized block if the file is UC
-        FileUnderConstructionFeature uc = getFileUnderConstructionFeature();
         if (uc != null) {
           uc.cleanZeroSizeBlock(this, collectedBlocks);
+          if (removedUCFiles != null) {
+            removedUCFiles.add(getId());
+          }
         }
       }
     }
@@ -523,8 +528,9 @@ public class INodeFile extends INodeWithAdditionalFields
   }
 
   @Override
-  public void destroyAndCollectBlocks(BlockStoragePolicySuite bsps,
-      BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
+  public void destroyAndCollectBlocks(
+      BlockStoragePolicySuite bsps, BlocksMapUpdateInfo collectedBlocks,
+      final List<INode> removedINodes, List<Long> removedUCFiles) {
     if (blocks != null && collectedBlocks != null) {
       for (BlockInfoContiguous blk : blocks) {
         collectedBlocks.addDeleteBlock(blk);
@@ -542,6 +548,9 @@ public class INodeFile extends INodeWithAdditionalFields
       sf.getDiffs().destroyAndCollectSnapshotBlocks(collectedBlocks);
       sf.clearDiffs();
     }
+    if (isUnderConstruction() && removedUCFiles != null) {
+      removedUCFiles.add(getId());
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
index 7b1332b..5f16bd6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
@@ -97,8 +97,9 @@ public class INodeMap {
       }
       
       @Override
-      public void destroyAndCollectBlocks(BlockStoragePolicySuite bsps,
-          BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes) {
+      public void destroyAndCollectBlocks(
+          BlockStoragePolicySuite bsps, BlocksMapUpdateInfo collectedBlocks,
+          List<INode> removedINodes, List<Long> removedUCFiles) {
         // Nothing to do
       }
 
@@ -116,9 +117,10 @@ public class INodeMap {
       }
       
       @Override
-      public QuotaCounts cleanSubtree(BlockStoragePolicySuite bsps,
-          int snapshotId, int priorSnapshotId,
-          BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes) {
+      public QuotaCounts cleanSubtree(
+          BlockStoragePolicySuite bsps, int snapshotId, int priorSnapshotId,
+          BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes,
+          List<Long> removedUCFiles) {
           return null;
       }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
index b33a93c..5008dc0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
@@ -302,19 +302,20 @@ public abstract class INodeReference extends INode {
   }
 
   @Override // used by WithCount
-  public QuotaCounts cleanSubtree(BlockStoragePolicySuite bsps, int snapshot,
-      int prior, BlocksMapUpdateInfo collectedBlocks,
-      final List<INode> removedINodes) {
+  public QuotaCounts cleanSubtree(
+      BlockStoragePolicySuite bsps, int snapshot, int prior, BlocksMapUpdateInfo collectedBlocks,
+      final List<INode> removedINodes, List<Long> removedUCFiles) {
     return referred.cleanSubtree(bsps, snapshot, prior, collectedBlocks,
-        removedINodes);
+        removedINodes, removedUCFiles);
   }
 
   @Override // used by WithCount
   public void destroyAndCollectBlocks(
-      BlockStoragePolicySuite bsps,
-      BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
+      BlockStoragePolicySuite bsps, BlocksMapUpdateInfo collectedBlocks,
+      final List<INode> removedINodes, List<Long> removedUCFiles) {
     if (removeReference(this) <= 0) {
-      referred.destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
+      referred.destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes,
+                                       removedUCFiles);
     }
   }
 
@@ -542,9 +543,9 @@ public abstract class INodeReference extends INode {
     }
     
     @Override
-    public QuotaCounts cleanSubtree(BlockStoragePolicySuite bsps,
-        final int snapshot, int prior, final BlocksMapUpdateInfo collectedBlocks,
-        final List<INode> removedINodes) {
+    public QuotaCounts cleanSubtree(
+        BlockStoragePolicySuite bsps, final int snapshot, int prior, final BlocksMapUpdateInfo collectedBlocks,
+        final List<INode> removedINodes, List<Long> removedUCFiles) {
       // since WithName node resides in deleted list acting as a snapshot copy,
       // the parameter snapshot must be non-null
       Preconditions.checkArgument(snapshot != Snapshot.CURRENT_STATE_ID);
@@ -560,7 +561,7 @@ public abstract class INodeReference extends INode {
       }
 
       QuotaCounts counts = getReferredINode().cleanSubtree(bsps, snapshot, prior,
-          collectedBlocks, removedINodes);
+          collectedBlocks, removedINodes, removedUCFiles);
       INodeReference ref = getReferredINode().getParentReference();
       if (ref != null) {
         try {
@@ -581,13 +582,13 @@ public abstract class INodeReference extends INode {
     }
     
     @Override
-    public void destroyAndCollectBlocks(BlockStoragePolicySuite bsps,
-        BlocksMapUpdateInfo collectedBlocks,
-        final List<INode> removedINodes) {
+    public void destroyAndCollectBlocks(
+        BlockStoragePolicySuite bsps, BlocksMapUpdateInfo collectedBlocks,
+        final List<INode> removedINodes, List<Long> removedUCFiles) {
       int snapshot = getSelfSnapshot();
       if (removeReference(this) <= 0) {
         getReferredINode().destroyAndCollectBlocks(bsps, collectedBlocks,
-            removedINodes);
+            removedINodes, removedUCFiles);
       } else {
         int prior = getPriorSnapshot(this);
         INode referred = getReferredINode().asReference().getReferredINode();
@@ -607,7 +608,7 @@ public abstract class INodeReference extends INode {
           }
           try {
             QuotaCounts counts = referred.cleanSubtree(bsps, snapshot, prior,
-                collectedBlocks, removedINodes);
+                collectedBlocks, removedINodes, removedUCFiles);
             INodeReference ref = getReferredINode().getParentReference();
             if (ref != null) {
               ref.addSpaceConsumed(counts.negation(), true);
@@ -661,13 +662,16 @@ public abstract class INodeReference extends INode {
     }
     
     @Override
-    public QuotaCounts cleanSubtree(BlockStoragePolicySuite bsps, int snapshot, int prior,
-        BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes) {
+    public QuotaCounts cleanSubtree(
+        BlockStoragePolicySuite bsps, int snapshot, int prior,
+        BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes,
+        List<Long> removedUCFiles) {
       if (snapshot == Snapshot.CURRENT_STATE_ID
           && prior == Snapshot.NO_SNAPSHOT_ID) {
         QuotaCounts counts = new QuotaCounts.Builder().build();
         this.computeQuotaUsage(bsps, counts, true);
-        destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
+        destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes,
+                                removedUCFiles);
         return counts;
       } else {
         // if prior is NO_SNAPSHOT_ID, we need to check snapshot belonging to 
@@ -684,7 +688,7 @@ public abstract class INodeReference extends INode {
           return new QuotaCounts.Builder().build();
         }
         return getReferredINode().cleanSubtree(bsps, snapshot, prior,
-            collectedBlocks, removedINodes);
+            collectedBlocks, removedINodes, removedUCFiles);
       }
     }
     
@@ -699,11 +703,12 @@ public abstract class INodeReference extends INode {
      * WithName nodes.
      */
     @Override
-    public void destroyAndCollectBlocks(BlockStoragePolicySuite bsps,
-        BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
+    public void destroyAndCollectBlocks(
+        BlockStoragePolicySuite bsps, BlocksMapUpdateInfo collectedBlocks,
+        final List<INode> removedINodes, List<Long> removedUCFiles) {
       if (removeReference(this) <= 0) {
         getReferredINode().destroyAndCollectBlocks(bsps, collectedBlocks,
-            removedINodes);
+            removedINodes, removedUCFiles);
       } else {
         // we will clean everything, including files, directories, and 
         // snapshots, that were created after this prior snapshot
@@ -726,7 +731,7 @@ public abstract class INodeReference extends INode {
           // compute quota usage updates before calling this destroy
           // function, we use true for countDiffChange
           referred.cleanSubtree(bsps, snapshot, prior, collectedBlocks,
-              removedINodes);
+              removedINodes, removedUCFiles);
         } else if (referred.isDirectory()) {
           // similarly, if referred is a directory, it must be an
           // INodeDirectory with snapshot
@@ -734,7 +739,7 @@ public abstract class INodeReference extends INode {
           Preconditions.checkState(dir.isWithSnapshot());
           try {
             DirectoryWithSnapshotFeature.destroyDstSubtree(bsps, dir, snapshot,
-                prior, collectedBlocks, removedINodes);
+                prior, collectedBlocks, removedINodes, removedUCFiles);
           } catch (QuotaExceededException e) {
             LOG.error("should not exceed quota while snapshot deletion", e);
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
index 21a9e4f..7ce893f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
@@ -73,21 +73,23 @@ public class INodeSymlink extends INodeWithAdditionalFields {
   }
   
   @Override
-  public QuotaCounts cleanSubtree(BlockStoragePolicySuite bsps,
-      final int snapshotId, int priorSnapshotId,
+  public QuotaCounts cleanSubtree(
+      BlockStoragePolicySuite bsps, final int snapshotId, int priorSnapshotId,
       final BlocksMapUpdateInfo collectedBlocks,
-      final List<INode> removedINodes) {
+      final List<INode> removedINodes, List<Long> removedUCFiles) {
     if (snapshotId == Snapshot.CURRENT_STATE_ID
         && priorSnapshotId == Snapshot.NO_SNAPSHOT_ID) {
-      destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
+      destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes,
+                              removedUCFiles);
     }
     return new QuotaCounts.Builder().nameSpace(1).build();
   }
   
   @Override
-  public void destroyAndCollectBlocks(final BlockStoragePolicySuite bsps,
+  public void destroyAndCollectBlocks(
+      final BlockStoragePolicySuite bsps,
       final BlocksMapUpdateInfo collectedBlocks,
-      final List<INode> removedINodes) {
+      final List<INode> removedINodes, List<Long> removedUCFiles) {
     removedINodes.add(this);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
index c6a92be..ade2312 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
@@ -22,20 +22,17 @@ import static org.apache.hadoop.util.Time.monotonicNow;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.util.Daemon;
@@ -78,15 +75,17 @@ public class LeaseManager {
   // Used for handling lock-leases
   // Mapping: leaseHolder -> Lease
   //
-  private final SortedMap<String, Lease> leases = new TreeMap<String, Lease>();
+  private final SortedMap<String, Lease> leases = new TreeMap<>();
   // Set of: Lease
-  private final NavigableSet<Lease> sortedLeases = new TreeSet<Lease>();
-
-  // 
-  // Map path names to leases. It is protected by the sortedLeases lock.
-  // The map stores pathnames in lexicographical order.
-  //
-  private final SortedMap<String, Lease> sortedLeasesByPath = new TreeMap<String, Lease>();
+  private final PriorityQueue<Lease> sortedLeases = new PriorityQueue<>(512,
+      new Comparator<Lease>() {
+        @Override
+        public int compare(Lease o1, Lease o2) {
+          return Long.signum(o1.getLastUpdate() - o2.getLastUpdate());
+        }
+  });
+  // INodeID -> Lease
+  private final HashMap<Long, Lease> leasesById = new HashMap<>();
 
   private Daemon lmthread;
   private volatile boolean shouldRunMonitor;
@@ -97,60 +96,44 @@ public class LeaseManager {
     return leases.get(holder);
   }
 
-  @VisibleForTesting
-  int getNumSortedLeases() {return sortedLeases.size();}
-
   /**
    * This method iterates through all the leases and counts the number of blocks
    * which are not COMPLETE. The FSNamesystem read lock MUST be held before
    * calling this method.
-   * @return
    */
   synchronized long getNumUnderConstructionBlocks() {
     assert this.fsnamesystem.hasReadLock() : "The FSNamesystem read lock wasn't"
       + "acquired before counting under construction blocks";
     long numUCBlocks = 0;
-    for (Lease lease : sortedLeases) {
-      for (String path : lease.getPaths()) {
-        final INodeFile cons;
-        try {
-          cons = this.fsnamesystem.getFSDirectory().getINode(path).asFile();
-          Preconditions.checkState(cons.isUnderConstruction());
-        } catch (UnresolvedLinkException e) {
-          throw new AssertionError("Lease files should reside on this FS");
-        }
-        BlockInfoContiguous[] blocks = cons.getBlocks();
-        if(blocks == null)
-          continue;
-        for(BlockInfoContiguous b : blocks) {
-          if(!b.isComplete())
-            numUCBlocks++;
-        }
+    for (Long id : getINodeIdWithLeases()) {
+      final INodeFile cons = fsnamesystem.getFSDirectory().getInode(id).asFile();
+      Preconditions.checkState(cons.isUnderConstruction());
+      BlockInfoContiguous[] blocks = cons.getBlocks();
+      if(blocks == null) {
+        continue;
+      }
+      for(BlockInfoContiguous b : blocks) {
+        if(!b.isComplete())
+          numUCBlocks++;
       }
     }
     LOG.info("Number of blocks under construction: " + numUCBlocks);
     return numUCBlocks;
   }
 
+  Collection<Long> getINodeIdWithLeases() {return leasesById.keySet();}
+
   /** @return the lease containing src */
-  public Lease getLeaseByPath(String src) {return sortedLeasesByPath.get(src);}
+  public synchronized Lease getLease(INodeFile src) {return leasesById.get(src.getId());}
 
   /** @return the number of leases currently in the system */
+  @VisibleForTesting
   public synchronized int countLease() {return sortedLeases.size();}
 
-  /** @return the number of paths contained in all leases */
-  synchronized int countPath() {
-    int count = 0;
-    for(Lease lease : sortedLeases) {
-      count += lease.getPaths().size();
-    }
-    return count;
-  }
-  
   /**
    * Adds (or re-adds) the lease for the specified file.
    */
-  synchronized Lease addLease(String holder, String src) {
+  synchronized Lease addLease(String holder, long inodeId) {
     Lease lease = getLease(holder);
     if (lease == null) {
       lease = new Lease(holder);
@@ -159,23 +142,24 @@ public class LeaseManager {
     } else {
       renewLease(lease);
     }
-    sortedLeasesByPath.put(src, lease);
-    lease.paths.add(src);
+    leasesById.put(inodeId, lease);
+    lease.files.add(inodeId);
     return lease;
   }
 
   /**
    * Remove the specified lease and src.
    */
-  synchronized void removeLease(Lease lease, String src) {
-    sortedLeasesByPath.remove(src);
-    if (!lease.removePath(src)) {
+  private synchronized void removeLease(Lease lease, long inodeId) {
+    leasesById.remove(inodeId);
+    if (!lease.removeFile(inodeId)) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug(src + " not found in lease.paths (=" + lease.paths + ")");
+        LOG.debug("inode " + inodeId + " not found in lease.files (=" + lease
+                + ")");
       }
     }
 
-    if (!lease.hasPath()) {
+    if (!lease.hasFiles()) {
       leases.remove(lease.holder);
       if (!sortedLeases.remove(lease)) {
         LOG.error(lease + " not found in sortedLeases");
@@ -186,31 +170,32 @@ public class LeaseManager {
   /**
    * Remove the lease for the specified holder and src
    */
-  synchronized void removeLease(String holder, String src) {
+  synchronized void removeLease(String holder, INodeFile src) {
     Lease lease = getLease(holder);
     if (lease != null) {
-      removeLease(lease, src);
+      removeLease(lease, src.getId());
     } else {
       LOG.warn("Removing non-existent lease! holder=" + holder +
-          " src=" + src);
+          " src=" + src.getFullPathName());
     }
   }
 
   synchronized void removeAllLeases() {
     sortedLeases.clear();
-    sortedLeasesByPath.clear();
+    leasesById.clear();
     leases.clear();
   }
 
   /**
    * Reassign lease for file src to the new holder.
    */
-  synchronized Lease reassignLease(Lease lease, String src, String newHolder) {
+  synchronized Lease reassignLease(Lease lease, INodeFile src,
+                                   String newHolder) {
     assert newHolder != null : "new lease holder is null";
     if (lease != null) {
-      removeLease(lease, src);
+      removeLease(lease, src.getId());
     }
-    return addLease(newHolder, src);
+    return addLease(newHolder, src.getId());
   }
 
   /**
@@ -243,10 +228,10 @@ public class LeaseManager {
    * checks in.  If the client dies and allows its lease to
    * expire, all the corresponding locks can be released.
    *************************************************************/
-  class Lease implements Comparable<Lease> {
+  class Lease {
     private final String holder;
     private long lastUpdate;
-    private final Collection<String> paths = new TreeSet<String>();
+    private final HashSet<Long> files = new HashSet<>();
   
     /** Only LeaseManager object can create a lease */
     private Lease(String holder) {
@@ -269,127 +254,43 @@ public class LeaseManager {
     }
 
     /** Does this lease contain any path? */
-    boolean hasPath() {return !paths.isEmpty();}
+    boolean hasFiles() {return !files.isEmpty();}
 
-    boolean removePath(String src) {
-      return paths.remove(src);
+    boolean removeFile(long inodeId) {
+      return files.remove(inodeId);
     }
 
     @Override
     public String toString() {
       return "[Lease.  Holder: " + holder
-          + ", pendingcreates: " + paths.size() + "]";
+          + ", pending creates: " + files.size() + "]";
     }
-  
-    @Override
-    public int compareTo(Lease o) {
-      Lease l1 = this;
-      Lease l2 = o;
-      long lu1 = l1.lastUpdate;
-      long lu2 = l2.lastUpdate;
-      if (lu1 < lu2) {
-        return -1;
-      } else if (lu1 > lu2) {
-        return 1;
-      } else {
-        return l1.holder.compareTo(l2.holder);
-      }
-    }
-  
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof Lease)) {
-        return false;
-      }
-      Lease obj = (Lease) o;
-      if (lastUpdate == obj.lastUpdate &&
-          holder.equals(obj.holder)) {
-        return true;
-      }
-      return false;
-    }
-  
+
     @Override
     public int hashCode() {
       return holder.hashCode();
     }
     
-    Collection<String> getPaths() {
-      return paths;
-    }
+    private Collection<Long> getFiles() { return files; }
 
     String getHolder() {
       return holder;
     }
 
-    void replacePath(String oldpath, String newpath) {
-      paths.remove(oldpath);
-      paths.add(newpath);
-    }
-    
     @VisibleForTesting
     long getLastUpdate() {
       return lastUpdate;
     }
   }
 
-  synchronized void changeLease(String src, String dst) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(getClass().getSimpleName() + ".changelease: " +
-               " src=" + src + ", dest=" + dst);
-    }
-
-    final int len = src.length();
-    for(Map.Entry<String, Lease> entry
-        : findLeaseWithPrefixPath(src, sortedLeasesByPath).entrySet()) {
-      final String oldpath = entry.getKey();
-      final Lease lease = entry.getValue();
-      // replace stem of src with new destination
-      final String newpath = dst + oldpath.substring(len);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("changeLease: replacing " + oldpath + " with " + newpath);
-      }
-      lease.replacePath(oldpath, newpath);
-      sortedLeasesByPath.remove(oldpath);
-      sortedLeasesByPath.put(newpath, lease);
-    }
-  }
-
-  synchronized void removeLeaseWithPrefixPath(String prefix) {
-    for(Map.Entry<String, Lease> entry
-        : findLeaseWithPrefixPath(prefix, sortedLeasesByPath).entrySet()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(LeaseManager.class.getSimpleName()
-            + ".removeLeaseWithPrefixPath: entry=" + entry);
-      }
-      removeLease(entry.getValue(), entry.getKey());    
-    }
-  }
-
-  static private Map<String, Lease> findLeaseWithPrefixPath(
-      String prefix, SortedMap<String, Lease> path2lease) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(LeaseManager.class.getSimpleName() + ".findLease: prefix=" + prefix);
-    }
-
-    final Map<String, Lease> entries = new HashMap<String, Lease>();
-    int srclen = prefix.length();
-    
-    // prefix may ended with '/'
-    if (prefix.charAt(srclen - 1) == Path.SEPARATOR_CHAR) {
-      srclen -= 1;
-    }
-
-    for(Map.Entry<String, Lease> entry : path2lease.tailMap(prefix).entrySet()) {
-      final String p = entry.getKey();
-      if (!p.startsWith(prefix)) {
-        return entries;
-      }
-      if (p.length() == srclen || p.charAt(srclen) == Path.SEPARATOR_CHAR) {
-        entries.put(entry.getKey(), entry.getValue());
+  @VisibleForTesting
+  synchronized void removeLeases(Collection<Long> inodes) {
+    for (long inode : inodes) {
+      Lease lease = leasesById.get(inode);
+      if (lease != null) {
+        removeLease(lease, inode);
       }
     }
-    return entries;
   }
 
   public void setLeasePeriod(long softLimit, long hardLimit) {
@@ -428,30 +329,13 @@ public class LeaseManager {
           if (LOG.isDebugEnabled()) {
             LOG.debug(name + " is interrupted", ie);
           }
+        } catch(Throwable e) {
+          LOG.warn("Unexpected throwable: ", e);
         }
       }
     }
   }
 
-  /**
-   * Get the list of inodes corresponding to valid leases.
-   * @return list of inodes
-   */
-  Map<String, INodeFile> getINodesUnderConstruction() {
-    Map<String, INodeFile> inodes = new TreeMap<String, INodeFile>();
-    for (String p : sortedLeasesByPath.keySet()) {
-      // verify that path exists in namespace
-      try {
-        INodeFile node = INodeFile.valueOf(fsnamesystem.dir.getINode(p), p);
-        Preconditions.checkState(node.isUnderConstruction());
-        inodes.put(p, node);
-      } catch (IOException ioe) {
-        LOG.error(ioe);
-      }
-    }
-    return inodes;
-  }
-  
   /** Check the leases beginning from the oldest.
    *  @return true is sync is needed.
    */
@@ -459,34 +343,35 @@ public class LeaseManager {
   synchronized boolean checkLeases() {
     boolean needSync = false;
     assert fsnamesystem.hasWriteLock();
-    Lease leaseToCheck = null;
-    try {
-      leaseToCheck = sortedLeases.first();
-    } catch(NoSuchElementException e) {}
-
-    while(leaseToCheck != null) {
-      if (!leaseToCheck.expiredHardLimit()) {
-        break;
-      }
 
+    while(!sortedLeases.isEmpty() && sortedLeases.peek().expiredHardLimit()) {
+      Lease leaseToCheck = sortedLeases.poll();
       LOG.info(leaseToCheck + " has expired hard limit");
 
-      final List<String> removing = new ArrayList<String>();
-      // need to create a copy of the oldest lease paths, because 
-      // internalReleaseLease() removes paths corresponding to empty files,
+      final List<Long> removing = new ArrayList<>();
+      // need to create a copy of the oldest lease files, because
+      // internalReleaseLease() removes files corresponding to empty files,
       // i.e. it needs to modify the collection being iterated over
       // causing ConcurrentModificationException
-      String[] leasePaths = new String[leaseToCheck.getPaths().size()];
-      leaseToCheck.getPaths().toArray(leasePaths);
-      for(String p : leasePaths) {
+      Collection<Long> files = leaseToCheck.getFiles();
+      Long[] leaseINodeIds = files.toArray(new Long[files.size()]);
+      FSDirectory fsd = fsnamesystem.getFSDirectory();
+      String p = null;
+      for(Long id : leaseINodeIds) {
         try {
-          INodesInPath iip = fsnamesystem.getFSDirectory().getINodesInPath(p,
-              true);
-          boolean completed = fsnamesystem.internalReleaseLease(leaseToCheck, p,
-              iip, HdfsServerConstants.NAMENODE_LEASE_HOLDER);
+          INodesInPath iip = INodesInPath.fromINode(fsd.getInode(id));
+          p = iip.getPath();
+          // Sanity check to make sure the path is correct
+          if (!p.startsWith("/")) {
+            throw new IOException("Invalid path in the lease " + p);
+          }
+          boolean completed = fsnamesystem.internalReleaseLease(
+              leaseToCheck, p, iip,
+              HdfsServerConstants.NAMENODE_LEASE_HOLDER);
           if (LOG.isDebugEnabled()) {
             if (completed) {
-              LOG.debug("Lease recovery for " + p + " is complete. File closed.");
+              LOG.debug("Lease recovery for inode " + id + " is complete. " +
+                            "File closed.");
             } else {
               LOG.debug("Started block recovery " + p + " lease " + leaseToCheck);
             }
@@ -498,22 +383,15 @@ public class LeaseManager {
         } catch (IOException e) {
           LOG.error("Cannot release the path " + p + " in the lease "
               + leaseToCheck, e);
-          removing.add(p);
+          removing.add(id);
         }
       }
 
-      for(String p : removing) {
-        removeLease(leaseToCheck, p);
+      for(Long id : removing) {
+        removeLease(leaseToCheck, id);
       }
-      leaseToCheck = sortedLeases.higher(leaseToCheck);
     }
 
-    try {
-      if(leaseToCheck != sortedLeases.first()) {
-        LOG.warn("Unable to release hard-limit expired lease: "
-          + sortedLeases.first());
-      }
-    } catch(NoSuchElementException e) {}
     return needSync;
   }
 
@@ -522,7 +400,7 @@ public class LeaseManager {
     return getClass().getSimpleName() + "= {"
         + "\n leases=" + leases
         + "\n sortedLeases=" + sortedLeases
-        + "\n sortedLeasesByPath=" + sortedLeasesByPath
+        + "\n leasesById=" + leasesById
         + "\n}";
   }
 
@@ -552,9 +430,15 @@ public class LeaseManager {
    * its leases immediately. This is for use by unit tests.
    */
   @VisibleForTesting
-  void triggerMonitorCheckNow() {
+  public void triggerMonitorCheckNow() {
     Preconditions.checkState(lmthread != null,
         "Lease monitor is not running");
     lmthread.interrupt();
   }
+
+  @VisibleForTesting
+  public void runLeaseChecks() {
+    checkLeases();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
index 5bd4ed5..fb13e09 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
@@ -65,7 +65,7 @@ abstract class AbstractINodeDiffList<N extends INode,
    * @param snapshot The id of the snapshot to be deleted
    * @param prior The id of the snapshot taken before the to-be-deleted snapshot
    * @param collectedBlocks Used to collect information for blocksMap update
-   * @return delta in namespace. 
+   * @return delta in namespace.
    */
   public final QuotaCounts deleteSnapshotDiff(BlockStoragePolicySuite bsps,
       final int snapshot,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
index fa1bf94..dc58856 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
@@ -216,7 +216,7 @@ public class DirectorySnapshottableFeature extends DirectoryWithSnapshotFeature
       int prior = Snapshot.findLatestSnapshot(snapshotRoot, snapshot.getId());
       try {
         QuotaCounts counts = snapshotRoot.cleanSubtree(bsps, snapshot.getId(),
-            prior, collectedBlocks, removedINodes);
+            prior, collectedBlocks, removedINodes, null);
         INodeDirectory parent = snapshotRoot.getParent();
         if (parent != null) {
           // there will not be any WithName node corresponding to the deleted

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
index 95f9d8a..bd2dc2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
@@ -97,15 +97,15 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
 
     /** clear the created list */
     private QuotaCounts destroyCreatedList(
-        final BlockStoragePolicySuite bsps,
-        final INodeDirectory currentINode,
+        final BlockStoragePolicySuite bsps, final INodeDirectory currentINode,
         final BlocksMapUpdateInfo collectedBlocks,
-        final List<INode> removedINodes) {
+        final List<INode> removedINodes, List<Long> removedUCFiles) {
       QuotaCounts counts = new QuotaCounts.Builder().build();
       final List<INode> createdList = getList(ListType.CREATED);
       for (INode c : createdList) {
         c.computeQuotaUsage(bsps, counts, true);
-        c.destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
+        c.destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes,
+                                  removedUCFiles);
         // c should be contained in the children list, remove it
         currentINode.removeChild(c);
       }
@@ -117,12 +117,13 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
     private QuotaCounts destroyDeletedList(
         final BlockStoragePolicySuite bsps,
         final BlocksMapUpdateInfo collectedBlocks,
-        final List<INode> removedINodes) {
+        final List<INode> removedINodes, List<Long> removedUCFiles) {
       QuotaCounts counts = new QuotaCounts.Builder().build();
       final List<INode> deletedList = getList(ListType.DELETED);
       for (INode d : deletedList) {
         d.computeQuotaUsage(bsps, counts, false);
-        d.destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
+        d.destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes,
+                                  removedUCFiles);
       }
       deletedList.clear();
       return counts;
@@ -210,8 +211,8 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
 
     @Override
     QuotaCounts combinePosteriorAndCollectBlocks(
-        final BlockStoragePolicySuite bsps,
-        final INodeDirectory currentDir, final DirectoryDiff posterior,
+        final BlockStoragePolicySuite bsps, final INodeDirectory currentDir,
+        final DirectoryDiff posterior,
         final BlocksMapUpdateInfo collectedBlocks,
         final List<INode> removedINodes) {
       final QuotaCounts counts = new QuotaCounts.Builder().build();
@@ -221,7 +222,8 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
         public void process(INode inode) {
           if (inode != null) {
             inode.computeQuotaUsage(bsps, counts, false);
-            inode.destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
+            inode.destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes,
+                                          null);
           }
         }
       });
@@ -324,7 +326,8 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
         BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
       // this diff has been deleted
       QuotaCounts counts = new QuotaCounts.Builder().build();
-      counts.add(diff.destroyDeletedList(bsps, collectedBlocks, removedINodes));
+      counts.add(diff.destroyDeletedList(bsps, collectedBlocks, removedINodes,
+                                         null));
       INodeDirectoryAttributes snapshotINode = getSnapshotINode();
       if (snapshotINode != null && snapshotINode.getAclFeature() != null) {
         AclStorage.removeAclFeature(snapshotINode.getAclFeature());
@@ -411,21 +414,23 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
   public static void destroyDstSubtree(
       final BlockStoragePolicySuite bsps, INode inode, final int snapshot,
       final int prior, final BlocksMapUpdateInfo collectedBlocks,
-      final List<INode> removedINodes) throws QuotaExceededException {
+      final List<INode> removedINodes, List<Long> removedUCFiles) throws QuotaExceededException {
     Preconditions.checkArgument(prior != Snapshot.NO_SNAPSHOT_ID);
     if (inode.isReference()) {
       if (inode instanceof INodeReference.WithName
           && snapshot != Snapshot.CURRENT_STATE_ID) {
         // this inode has been renamed before the deletion of the DstReference
         // subtree
-        inode.cleanSubtree(bsps, snapshot, prior, collectedBlocks, removedINodes);
+        inode.cleanSubtree(bsps, snapshot, prior, collectedBlocks, removedINodes,
+                           removedUCFiles);
       } else { 
         // for DstReference node, continue this process to its subtree
         destroyDstSubtree(bsps, inode.asReference().getReferredINode(), snapshot,
-            prior, collectedBlocks, removedINodes);
+            prior, collectedBlocks, removedINodes, removedUCFiles);
       }
     } else if (inode.isFile()) {
-      inode.cleanSubtree(bsps, snapshot, prior, collectedBlocks, removedINodes);
+      inode.cleanSubtree(bsps, snapshot, prior, collectedBlocks, removedINodes,
+                         removedUCFiles);
     } else if (inode.isDirectory()) {
       Map<INode, INode> excludedNodes = null;
       INodeDirectory dir = inode.asDirectory();
@@ -445,7 +450,7 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
         priorDiff = diffList.getDiffById(prior);
         if (priorDiff != null && priorDiff.getSnapshotId() == prior) {
           priorDiff.diff.destroyCreatedList(bsps, dir, collectedBlocks,
-              removedINodes);
+              removedINodes, removedUCFiles);
         }
       }
       for (INode child : inode.asDirectory().getChildrenList(prior)) {
@@ -453,7 +458,7 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
           continue;
         }
         destroyDstSubtree(bsps, child, snapshot, prior, collectedBlocks,
-            removedINodes);
+            removedINodes, removedUCFiles);
       }
     }
   }
@@ -466,13 +471,13 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
    * @param post The post snapshot.
    * @param prior The id of the prior snapshot.
    * @param collectedBlocks Used to collect blocks for later deletion.
+   * @param removedUCFiles
    * @return Quota usage update.
    */
   private static QuotaCounts cleanDeletedINode(
-      final BlockStoragePolicySuite bsps, INode inode,
-      final int post, final int prior,
+      final BlockStoragePolicySuite bsps, INode inode, final int post, final int prior,
       final BlocksMapUpdateInfo collectedBlocks,
-      final List<INode> removedINodes) {
+      final List<INode> removedINodes, List<Long> removedUCFiles) {
     QuotaCounts counts = new QuotaCounts.Builder().build();
     Deque<INode> queue = new ArrayDeque<INode>();
     queue.addLast(inode);
@@ -481,7 +486,8 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
       if (topNode instanceof INodeReference.WithName) {
         INodeReference.WithName wn = (INodeReference.WithName) topNode;
         if (wn.getLastSnapshotId() >= post) {
-          wn.cleanSubtree(bsps, post, prior, collectedBlocks, removedINodes);
+          wn.cleanSubtree(bsps, post, prior, collectedBlocks, removedINodes,
+                          removedUCFiles);
         }
         // For DstReference node, since the node is not in the created list of
         // prior, we should treat it as regular file/dir
@@ -500,7 +506,7 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
           if (priorDiff != null && priorDiff.getSnapshotId() == prior) {
             priorChildrenDiff = priorDiff.getChildrenDiff();
             counts.add(priorChildrenDiff.destroyCreatedList(bsps, dir,
-                collectedBlocks, removedINodes));
+                collectedBlocks, removedINodes, removedUCFiles));
           }
         }
         
@@ -631,7 +637,8 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
   }
 
   public void clear(BlockStoragePolicySuite bsps, INodeDirectory currentINode,
-      final BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
+      final BlocksMapUpdateInfo collectedBlocks, final List<INode>
+      removedINodes, final List<Long> removedUCFiles) {
     // destroy its diff list
     for (DirectoryDiff diff : diffs) {
       diff.destroyDiffAndCollectBlocks(bsps, currentINode, collectedBlocks,
@@ -721,10 +728,10 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
     }
   }
 
-  public QuotaCounts cleanDirectory(final BlockStoragePolicySuite bsps, final INodeDirectory currentINode,
-      final int snapshot, int prior,
-      final BlocksMapUpdateInfo collectedBlocks,
-      final List<INode> removedINodes) {
+  public QuotaCounts cleanDirectory(
+      final BlockStoragePolicySuite bsps, final INodeDirectory currentINode,
+      final int snapshot, int prior, final BlocksMapUpdateInfo collectedBlocks,
+      final List<INode> removedINodes, List<Long> removedUCFiles) {
     QuotaCounts counts = new QuotaCounts.Builder().build();
     Map<INode, INode> priorCreated = null;
     Map<INode, INode> priorDeleted = null;
@@ -734,10 +741,10 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
       DirectoryDiff lastDiff = diffs.getLast();
       if (lastDiff != null) {
         counts.add(lastDiff.diff.destroyCreatedList(bsps, currentINode,
-            collectedBlocks, removedINodes));
+            collectedBlocks, removedINodes, removedUCFiles));
       }
       counts.add(currentINode.cleanSubtreeRecursively(bsps, snapshot, prior,
-          collectedBlocks, removedINodes, priorDeleted));
+          collectedBlocks, removedINodes, removedUCFiles, priorDeleted));
     } else {
       // update prior
       prior = getDiffs().updatePrior(snapshot, prior);
@@ -756,7 +763,7 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
       counts.add(getDiffs().deleteSnapshotDiff(bsps, snapshot, prior,
           currentINode, collectedBlocks, removedINodes));
       counts.add(currentINode.cleanSubtreeRecursively(bsps, snapshot, prior,
-          collectedBlocks, removedINodes, priorDeleted));
+          collectedBlocks, removedINodes, removedUCFiles, priorDeleted));
 
       // check priorDiff again since it may be created during the diff deletion
       if (prior != Snapshot.NO_SNAPSHOT_ID) {
@@ -773,7 +780,7 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
                 ListType.CREATED)) {
               if (priorCreated.containsKey(cNode)) {
                 counts.add(cNode.cleanSubtree(bsps, snapshot, Snapshot.NO_SNAPSHOT_ID,
-                    collectedBlocks, removedINodes));
+                    collectedBlocks, removedINodes, removedUCFiles));
               }
             }
           }
@@ -790,7 +797,7 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
               ListType.DELETED)) {
             if (priorDeleted == null || !priorDeleted.containsKey(dNode)) {
               counts.add(cleanDeletedINode(bsps, dNode, snapshot, prior,
-                  collectedBlocks, removedINodes));
+                  collectedBlocks, removedINodes, removedUCFiles));
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
index c4406a0..b42b745 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
@@ -213,7 +213,7 @@ public class FileWithSnapshotFeature implements INode.Feature {
       final BlocksMapUpdateInfo info, final List<INode> removedINodes) {
     // check if everything is deleted.
     if (isCurrentFileDeleted() && getDiffs().asList().isEmpty()) {
-      file.destroyAndCollectBlocks(bsps, info, removedINodes);
+      file.destroyAndCollectBlocks(bsps, info, removedINodes, null);
       return;
     }
     // find max file size.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
index 802d64a..27d2986 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
@@ -228,7 +228,7 @@ public class SnapshotManager implements SnapshotStatsMXBean {
   /**
    * Delete a snapshot for a snapshottable directory
    * @param snapshotName Name of the snapshot to be deleted
-   * @param collectedBlocks Used to collect information to update blocksMap 
+   * @param collectedBlocks Used to collect information to update blocksMap
    * @throws IOException
    */
   public void deleteSnapshot(final INodesInPath iip, final String snapshotName,
@@ -266,7 +266,7 @@ public class SnapshotManager implements SnapshotStatsMXBean {
   public int getNumSnapshots() {
     return numSnapshots.get();
   }
-  
+
   void setNumSnapshots(int num) {
     numSnapshots.set(num);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
index 90dc0a7..985f43e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
@@ -59,8 +59,8 @@ import org.mockito.Mockito;
 
 public class TestLease {
   static boolean hasLease(MiniDFSCluster cluster, Path src) {
-    return NameNodeAdapter.getLeaseManager(cluster.getNamesystem()
-        ).getLeaseByPath(src.toString()) != null;
+    return NameNodeAdapter.getLeaseForPath(cluster.getNameNode(),
+            src.toString()) != null;
   }
 
   static int leaseCount(MiniDFSCluster cluster) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00fe1ed3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 2540834..4ca5eda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -135,8 +135,19 @@ public class NameNodeAdapter {
     namesystem.leaseManager.triggerMonitorCheckNow();
   }
 
+  public static Lease getLeaseForPath(NameNode nn, String path) {
+    final FSNamesystem fsn = nn.getNamesystem();
+    INode inode;
+    try {
+      inode = fsn.getFSDirectory().getINode(path, false);
+    } catch (UnresolvedLinkException e) {
+      throw new RuntimeException("Lease manager should not support symlinks");
+    }
+    return inode == null ? null : fsn.leaseManager.getLease((INodeFile) inode);
+  }
+
   public static String getLeaseHolderForPath(NameNode namenode, String path) {
-    Lease l = namenode.getNamesystem().leaseManager.getLeaseByPath(path);
+    Lease l = getLeaseForPath(namenode, path);
     return l == null? null: l.getHolder();
   }
 
@@ -145,12 +156,8 @@ public class NameNodeAdapter {
    *   or -1 in the case that the lease doesn't exist.
    */
   public static long getLeaseRenewalTime(NameNode nn, String path) {
-    LeaseManager lm = nn.getNamesystem().leaseManager;
-    Lease l = lm.getLeaseByPath(path);
-    if (l == null) {
-      return -1;
-    }
-    return l.getLastUpdate();
+    Lease l = getLeaseForPath(nn, path);
+    return l == null ? -1 : l.getLastUpdate();
   }
 
   /**