You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/01/17 21:56:09 UTC

hadoop git commit: HDFS-7573. Consolidate the implementation of delete() into a single class. Contributed by Haohui Mai.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 2908fe4ec -> 24315e7d3


HDFS-7573. Consolidate the implementation of delete() into a single class. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/24315e7d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/24315e7d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/24315e7d

Branch: refs/heads/trunk
Commit: 24315e7d374a1ddd4329b64350cf96fc9ab6f59c
Parents: 2908fe4
Author: Haohui Mai <wh...@apache.org>
Authored: Sat Dec 27 07:21:52 2014 +0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Sat Jan 17 12:56:04 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/namenode/FSDirDeleteOp.java     | 246 +++++++++++++++++++
 .../hdfs/server/namenode/FSDirRenameOp.java     |   5 +-
 .../hdfs/server/namenode/FSDirectory.java       | 136 ----------
 .../hdfs/server/namenode/FSEditLogLoader.java   |   6 +-
 .../hdfs/server/namenode/FSNamesystem.java      | 132 +++-------
 6 files changed, 286 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/24315e7d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 6af1a52..71e68c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -522,6 +522,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7189. Add trace spans for DFSClient metadata operations. (Colin P.
     McCabe via yliu)
 
+    HDFS-7573. Consolidate the implementation of delete() into a single class.
+    (wheat9)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24315e7d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
new file mode 100644
index 0000000..c93d1f6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
+import org.apache.hadoop.util.ChunkedArrayList;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
+import static org.apache.hadoop.util.Time.now;
+
+class FSDirDeleteOp {
+  /**
+   * Delete the target directory and collect the blocks under it
+   *
+   * @param iip the INodesInPath instance containing all the INodes for the path
+   * @param collectedBlocks Blocks under the deleted directory
+   * @param removedINodes INodes that should be removed from inodeMap
+   * @return the number of files that have been removed
+   */
+  static long delete(
+      FSDirectory fsd, INodesInPath iip, BlocksMapUpdateInfo collectedBlocks,
+      List<INode> removedINodes, long mtime) throws IOException {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + iip.getPath());
+    }
+    final long filesRemoved;
+    fsd.writeLock();
+    try {
+      if (!deleteAllowed(iip, iip.getPath()) ) {
+        filesRemoved = -1;
+      } else {
+        List<INodeDirectory> snapshottableDirs = new ArrayList<>();
+        FSDirSnapshotOp.checkSnapshot(iip.getLastINode(), snapshottableDirs);
+        filesRemoved = unprotectedDelete(fsd, iip, collectedBlocks,
+                                         removedINodes, mtime);
+        fsd.getFSNamesystem().removeSnapshottableDirs(snapshottableDirs);
+      }
+    } finally {
+      fsd.writeUnlock();
+    }
+    return filesRemoved;
+  }
+
+  /**
+   * Remove a file/directory from the namespace.
+   * <p>
+   * For large directories, deletion is incremental. The blocks under
+   * the directory are collected and deleted a small number at a time holding
+   * the {@link FSNamesystem} lock.
+   * <p>
+   * For small directory or file the deletion is done in one shot.
+   *
+   */
+  static BlocksMapUpdateInfo delete(
+      FSNamesystem fsn, String src, boolean recursive, boolean logRetryCache)
+      throws IOException {
+    FSDirectory fsd = fsn.getFSDirectory();
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+
+    src = fsd.resolvePath(pc, src, pathComponents);
+    final INodesInPath iip = fsd.getINodesInPath4Write(src, false);
+    if (!recursive && fsd.isNonEmptyDirectory(iip)) {
+      throw new PathIsNotEmptyDirectoryException(src + " is non empty");
+    }
+    if (fsd.isPermissionEnabled()) {
+      fsd.checkPermission(pc, iip, false, null, FsAction.WRITE, null,
+                          FsAction.ALL, true);
+    }
+
+    return deleteInternal(fsn, src, iip, logRetryCache);
+  }
+
+  /**
+   * Delete a path from the name space
+   * Update the count at each ancestor directory with quota
+   * <br>
+   * Note: This is to be used by
+   * {@link org.apache.hadoop.hdfs.server.namenode.FSEditLog} only.
+   * <br>
+   * @param src a string representation of a path to an inode
+   * @param mtime the time the inode is removed
+   */
+  static void deleteForEditLog(FSDirectory fsd, String src, long mtime)
+      throws IOException {
+    assert fsd.hasWriteLock();
+    FSNamesystem fsn = fsd.getFSNamesystem();
+    BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
+    List<INode> removedINodes = new ChunkedArrayList<>();
+
+    final INodesInPath iip = fsd.getINodesInPath4Write(
+        FSDirectory.normalizePath(src), false);
+    if (!deleteAllowed(iip, src)) {
+      return;
+    }
+    List<INodeDirectory> snapshottableDirs = new ArrayList<>();
+    FSDirSnapshotOp.checkSnapshot(iip.getLastINode(), snapshottableDirs);
+    long filesRemoved = unprotectedDelete(
+        fsd, iip, collectedBlocks, removedINodes, mtime);
+    fsn.removeSnapshottableDirs(snapshottableDirs);
+
+    if (filesRemoved >= 0) {
+      fsn.removeLeasesAndINodes(src, removedINodes, false);
+      fsn.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
+    }
+  }
+
+  /**
+   * Remove a file/directory from the namespace.
+   * <p>
+   * For large directories, deletion is incremental. The blocks under
+   * the directory are collected and deleted a small number at a time holding
+   * the {@link org.apache.hadoop.hdfs.server.namenode.FSNamesystem} lock.
+   * <p>
+   * For small directory or file the deletion is done in one shot.
+   */
+  static BlocksMapUpdateInfo deleteInternal(
+      FSNamesystem fsn, String src, INodesInPath iip, boolean logRetryCache)
+      throws IOException {
+    assert fsn.hasWriteLock();
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
+    }
+
+    FSDirectory fsd = fsn.getFSDirectory();
+    BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
+    List<INode> removedINodes = new ChunkedArrayList<>();
+
+    long mtime = now();
+    // Unlink the target directory from directory tree
+    long filesRemoved = delete(
+        fsd, iip, collectedBlocks, removedINodes, mtime);
+    if (filesRemoved < 0) {
+      return null;
+    }
+    fsd.getEditLog().logDelete(src, mtime, logRetryCache);
+    incrDeletedFileCount(filesRemoved);
+
+    fsn.removeLeasesAndINodes(src, removedINodes, true);
+    fsd.getEditLog().logSync();
+
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* Namesystem.delete: "
+                                        + src +" is removed");
+    }
+    return collectedBlocks;
+  }
+
+  static void incrDeletedFileCount(long count) {
+    NameNode.getNameNodeMetrics().incrFilesDeleted(count);
+  }
+
+  private static boolean deleteAllowed(final INodesInPath iip,
+      final String src) {
+    if (iip.length() < 1 || iip.getLastINode() == null) {
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug(
+            "DIR* FSDirectory.unprotectedDelete: failed to remove "
+                + src + " because it does not exist");
+      }
+      return false;
+    } else if (iip.length() == 1) { // src is the root
+      NameNode.stateChangeLog.warn(
+          "DIR* FSDirectory.unprotectedDelete: failed to remove " + src +
+              " because the root is not allowed to be deleted");
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Delete a path from the name space
+   * Update the count at each ancestor directory with quota
+   * @param iip the inodes resolved from the path
+   * @param collectedBlocks blocks collected from the deleted path
+   * @param removedINodes inodes that should be removed from inodeMap
+   * @param mtime the time the inode is removed
+   * @return the number of inodes deleted; 0 if no inodes are deleted.
+   */
+  private static long unprotectedDelete(
+      FSDirectory fsd, INodesInPath iip, BlocksMapUpdateInfo collectedBlocks,
+      List<INode> removedINodes, long mtime) throws QuotaExceededException {
+    assert fsd.hasWriteLock();
+
+    // check if target node exists
+    INode targetNode = iip.getLastINode();
+    if (targetNode == null) {
+      return -1;
+    }
+
+    // record modification
+    final int latestSnapshot = iip.getLatestSnapshotId();
+    targetNode.recordModification(latestSnapshot);
+
+    // Remove the node from the namespace
+    long removed = fsd.removeLastINode(iip);
+    if (removed == -1) {
+      return -1;
+    }
+
+    // set the parent's modification time
+    final INodeDirectory parent = targetNode.getParent();
+    parent.updateModificationTime(mtime, latestSnapshot);
+    if (removed == 0) {
+      return 0;
+    }
+
+    // collect block
+    if (!targetNode.isInLatestSnapshot(latestSnapshot)) {
+      targetNode.destroyAndCollectBlocks(collectedBlocks, removedINodes);
+    } else {
+      Quota.Counts counts = targetNode.cleanSubtree(CURRENT_STATE_ID,
+          latestSnapshot, collectedBlocks, removedINodes, true);
+      parent.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
+          -counts.get(Quota.DISKSPACE), true);
+      removed = counts.get(Quota.NAMESPACE);
+    }
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+          + iip.getPath() + " is removed");
+    }
+    return removed;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24315e7d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
index 2023b1e..b994104 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
@@ -281,7 +281,7 @@ class FSDirRenameOp {
     try {
       if (unprotectedRenameTo(fsd, src, dst, srcIIP, dstIIP, mtime,
           collectedBlocks, options)) {
-        fsd.getFSNamesystem().incrDeletedFileCount(1);
+        FSDirDeleteOp.incrDeletedFileCount(1);
       }
     } finally {
       fsd.writeUnlock();
@@ -731,8 +731,7 @@ class FSDirRenameOp {
             dstIIP.getLatestSnapshotId(), collectedBlocks, removedINodes,
             true).get(Quota.NAMESPACE) >= 0;
       }
-      fsd.getFSNamesystem().removePathAndBlocks(src, null, removedINodes,
-          false);
+      fsd.getFSNamesystem().removeLeasesAndINodes(src, removedINodes, false);
       return filesDeleted;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24315e7d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 9c33e06..61ef19f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -58,9 +58,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ByteArray;
-import org.apache.hadoop.util.ChunkedArrayList;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
@@ -538,54 +536,6 @@ public class FSDirectory implements Closeable {
   }
 
   /**
-   * Delete the target directory and collect the blocks under it
-   *
-   * @param iip the INodesInPath instance containing all the INodes for the path
-   * @param collectedBlocks Blocks under the deleted directory
-   * @param removedINodes INodes that should be removed from {@link #inodeMap}
-   * @return the number of files that have been removed
-   */
-  long delete(INodesInPath iip, BlocksMapUpdateInfo collectedBlocks,
-              List<INode> removedINodes, long mtime) throws IOException {
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + iip.getPath());
-    }
-    final long filesRemoved;
-    writeLock();
-    try {
-      if (!deleteAllowed(iip, iip.getPath()) ) {
-        filesRemoved = -1;
-      } else {
-        List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
-        FSDirSnapshotOp.checkSnapshot(iip.getLastINode(), snapshottableDirs);
-        filesRemoved = unprotectedDelete(iip, collectedBlocks,
-            removedINodes, mtime);
-        namesystem.removeSnapshottableDirs(snapshottableDirs);
-      }
-    } finally {
-      writeUnlock();
-    }
-    return filesRemoved;
-  }
-  
-  private static boolean deleteAllowed(final INodesInPath iip,
-      final String src) {
-    if (iip.length() < 1 || iip.getLastINode() == null) {
-      if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
-            + "failed to remove " + src + " because it does not exist");
-      }
-      return false;
-    } else if (iip.length() == 1) { // src is the root
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: "
-          + "failed to remove " + src
-          + " because the root is not allowed to be deleted");
-      return false;
-    }
-    return true;
-  }
-  
-  /**
    * @return true if the path is a non-empty directory; otherwise, return false.
    */
   boolean isNonEmptyDirectory(INodesInPath inodesInPath) {
@@ -604,92 +554,6 @@ public class FSDirectory implements Closeable {
   }
 
   /**
-   * Delete a path from the name space
-   * Update the count at each ancestor directory with quota
-   * <br>
-   * Note: This is to be used by {@link FSEditLog} only.
-   * <br>
-   * @param src a string representation of a path to an inode
-   * @param mtime the time the inode is removed
-   * @throws SnapshotAccessControlException if path is in RO snapshot
-   */
-  void unprotectedDelete(String src, long mtime) throws UnresolvedLinkException,
-      QuotaExceededException, SnapshotAccessControlException, IOException {
-    assert hasWriteLock();
-    BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-    List<INode> removedINodes = new ChunkedArrayList<INode>();
-
-    final INodesInPath inodesInPath = getINodesInPath4Write(
-        normalizePath(src), false);
-    long filesRemoved = -1;
-    if (deleteAllowed(inodesInPath, src)) {
-      List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
-      FSDirSnapshotOp.checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
-      filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks,
-          removedINodes, mtime);
-      namesystem.removeSnapshottableDirs(snapshottableDirs); 
-    }
-
-    if (filesRemoved >= 0) {
-      getFSNamesystem().removePathAndBlocks(src, collectedBlocks, 
-          removedINodes, false);
-    }
-  }
-  
-  /**
-   * Delete a path from the name space
-   * Update the count at each ancestor directory with quota
-   * @param iip the inodes resolved from the path
-   * @param collectedBlocks blocks collected from the deleted path
-   * @param removedINodes inodes that should be removed from {@link #inodeMap}
-   * @param mtime the time the inode is removed
-   * @return the number of inodes deleted; 0 if no inodes are deleted.
-   */ 
-  long unprotectedDelete(INodesInPath iip, BlocksMapUpdateInfo collectedBlocks,
-      List<INode> removedINodes, long mtime) throws QuotaExceededException {
-    assert hasWriteLock();
-
-    // check if target node exists
-    INode targetNode = iip.getLastINode();
-    if (targetNode == null) {
-      return -1;
-    }
-
-    // record modification
-    final int latestSnapshot = iip.getLatestSnapshotId();
-    targetNode.recordModification(latestSnapshot);
-
-    // Remove the node from the namespace
-    long removed = removeLastINode(iip);
-    if (removed == -1) {
-      return -1;
-    }
-
-    // set the parent's modification time
-    final INodeDirectory parent = targetNode.getParent();
-    parent.updateModificationTime(mtime, latestSnapshot);
-    if (removed == 0) {
-      return 0;
-    }
-    
-    // collect block
-    if (!targetNode.isInLatestSnapshot(latestSnapshot)) {
-      targetNode.destroyAndCollectBlocks(collectedBlocks, removedINodes);
-    } else {
-      Quota.Counts counts = targetNode.cleanSubtree(CURRENT_STATE_ID,
-          latestSnapshot, collectedBlocks, removedINodes, true);
-      parent.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
-          -counts.get(Quota.DISKSPACE), true);
-      removed = counts.get(Quota.NAMESPACE);
-    }
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
-          + iip.getPath() + " is removed");
-    }
-    return removed;
-  }
-
-  /** 
    * Check whether the filepath could be created
    * @throws SnapshotAccessControlException if path is in RO snapshot
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24315e7d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 1f4d1a6..322e18c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -347,7 +347,7 @@ public class FSEditLogLoader {
       INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path, true);
       if (oldFile != null && addCloseOp.overwrite) {
         // This is OP_ADD with overwrite
-        fsDir.unprotectedDelete(path, addCloseOp.mtime);
+        FSDirDeleteOp.deleteForEditLog(fsDir, path, addCloseOp.mtime);
         iip = INodesInPath.replace(iip, iip.length() - 1, null);
         oldFile = null;
       }
@@ -520,8 +520,8 @@ public class FSEditLogLoader {
     }
     case OP_DELETE: {
       DeleteOp deleteOp = (DeleteOp)op;
-      fsDir.unprotectedDelete(
-          renameReservedPathsOnUpgrade(deleteOp.path, logVersion),
+      FSDirDeleteOp.deleteForEditLog(
+          fsDir, renameReservedPathsOnUpgrade(deleteOp.path, logVersion),
           deleteOp.timestamp);
       
       if (toAddRetryCache) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24315e7d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 9d2fe6b..df10f59 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -150,7 +150,6 @@ import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
@@ -2474,11 +2473,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         if (overwrite) {
           toRemoveBlocks = new BlocksMapUpdateInfo();
           List<INode> toRemoveINodes = new ChunkedArrayList<INode>();
-          long ret = dir.delete(iip, toRemoveBlocks, toRemoveINodes, now());
+          long ret = FSDirDeleteOp.delete(dir, iip, toRemoveBlocks,
+                                          toRemoveINodes, now());
           if (ret >= 0) {
             iip = INodesInPath.replace(iip, iip.length() - 1, null);
-            incrDeletedFileCount(ret);
-            removePathAndBlocks(src, null, toRemoveINodes, true);
+            FSDirDeleteOp.incrDeletedFileCount(ret);
+            removeLeasesAndINodes(src, toRemoveINodes, true);
           }
         } else {
           // If lease soft limit time is expired, recover the lease
@@ -3531,99 +3531,35 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * description of exceptions
    */
   boolean delete(String src, boolean recursive, boolean logRetryCache)
-      throws AccessControlException, SafeModeException,
-      UnresolvedLinkException, IOException {
-
+      throws IOException {
+    waitForLoadingFSImage();
+    checkOperation(OperationCategory.WRITE);
+    BlocksMapUpdateInfo toRemovedBlocks = null;
+    writeLock();
     boolean ret = false;
     try {
-      ret = deleteInt(src, recursive, logRetryCache);
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot delete " + src);
+      toRemovedBlocks = FSDirDeleteOp.delete(
+          this, src, recursive, logRetryCache);
+      ret = toRemovedBlocks != null;
     } catch (AccessControlException e) {
       logAuditEvent(false, "delete", src);
       throw e;
+    } finally {
+      writeUnlock();
     }
-    return ret;
-  }
-      
-  private boolean deleteInt(String src, boolean recursive, boolean logRetryCache)
-      throws AccessControlException, SafeModeException,
-      UnresolvedLinkException, IOException {
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
-    }
-    boolean status = deleteInternal(src, recursive, true, logRetryCache);
-    if (status) {
-      logAuditEvent(true, "delete", src);
+    if (toRemovedBlocks != null) {
+      removeBlocks(toRemovedBlocks); // Incremental deletion of blocks
     }
-    return status;
+    logAuditEvent(true, "delete", src);
+    return ret;
   }
-    
+
   FSPermissionChecker getPermissionChecker()
       throws AccessControlException {
     return dir.getPermissionChecker();
   }
-  
-  /**
-   * Remove a file/directory from the namespace.
-   * <p>
-   * For large directories, deletion is incremental. The blocks under
-   * the directory are collected and deleted a small number at a time holding
-   * the {@link FSNamesystem} lock.
-   * <p>
-   * For small directory or file the deletion is done in one shot.
-   * 
-   * @see ClientProtocol#delete(String, boolean) for description of exceptions
-   */
-  private boolean deleteInternal(String src, boolean recursive,
-      boolean enforcePermission, boolean logRetryCache)
-      throws AccessControlException, SafeModeException, UnresolvedLinkException,
-             IOException {
-    BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-    List<INode> removedINodes = new ChunkedArrayList<INode>();
-    FSPermissionChecker pc = getPermissionChecker();
-    checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    boolean ret = false;
-
-    waitForLoadingFSImage();
-    writeLock();
-    try {
-      checkOperation(OperationCategory.WRITE);
-      checkNameNodeSafeMode("Cannot delete " + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath4Write(src, false);
-      if (!recursive && dir.isNonEmptyDirectory(iip)) {
-        throw new PathIsNotEmptyDirectoryException(src + " is non empty");
-      }
-      if (enforcePermission && isPermissionEnabled) {
-        dir.checkPermission(pc, iip, false, null, FsAction.WRITE, null,
-            FsAction.ALL, true);
-      }
-
-      long mtime = now();
-      // Unlink the target directory from directory tree
-      long filesRemoved = dir.delete(iip, collectedBlocks, removedINodes,
-              mtime);
-      if (filesRemoved < 0) {
-        return false;
-      }
-      getEditLog().logDelete(src, mtime, logRetryCache);
-      incrDeletedFileCount(filesRemoved);
-      // Blocks/INodes will be handled later
-      removePathAndBlocks(src, null, removedINodes, true);
-      ret = true;
-    } finally {
-      writeUnlock();
-    }
-    getEditLog().logSync(); 
-    removeBlocks(collectedBlocks); // Incremental deletion of blocks
-    collectedBlocks.clear();
-
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* Namesystem.delete: "
-        + src +" is removed");
-    }
-    return ret;
-  }
 
   /**
    * From the given list, incrementally remove the blocks from blockManager
@@ -3650,15 +3586,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
   
   /**
-   * Remove leases, inodes and blocks related to a given path
+   * Remove leases and inodes related to a given path
    * @param src The given path
-   * @param blocks Containing the list of blocks to be deleted from blocksMap
-   * @param removedINodes Containing the list of inodes to be removed from 
+   * @param removedINodes Containing the list of inodes to be removed from
    *                      inodesMap
    * @param acquireINodeMapLock Whether to acquire the lock for inode removal
    */
-  void removePathAndBlocks(String src, BlocksMapUpdateInfo blocks,
-      List<INode> removedINodes, final boolean acquireINodeMapLock) {
+  void removeLeasesAndINodes(String src, List<INode> removedINodes,
+      final boolean acquireINodeMapLock) {
     assert hasWriteLock();
     leaseManager.removeLeaseWithPrefixPath(src);
     // remove inodes from inodesMap
@@ -3675,11 +3610,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       }
       removedINodes.clear();
     }
-    if (blocks == null) {
-      return;
-    }
-    
-    removeBlocksAndUpdateSafemodeTotal(blocks);
   }
 
   /**
@@ -4500,10 +4430,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
-  void incrDeletedFileCount(long count) {
-    NameNode.getNameNodeMetrics().incrFilesDeleted(count);
-  }
-
   /**
    * Close file.
    * @param path
@@ -4638,7 +4564,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
         for (BlockCollection bc : filesToDelete) {
           LOG.warn("Removing lazyPersist file " + bc.getName() + " with no replicas.");
-          deleteInternal(bc.getName(), false, false, false);
+          BlocksMapUpdateInfo toRemoveBlocks =
+          FSDirDeleteOp.deleteInternal(
+              FSNamesystem.this, bc.getName(),
+              INodesInPath.fromINode((INodeFile) bc), false);
+          if (toRemoveBlocks != null) {
+            removeBlocks(toRemoveBlocks); // Incremental deletion of blocks
+          }
         }
       } finally {
         writeUnlock();