You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2014/11/26 06:04:01 UTC

hadoop git commit: HDFS-7440. Consolidate snapshot related operations in a single class. Contributed by Haohui Mai.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 3d4536af8 -> 3e011f1ec


HDFS-7440. Consolidate snapshot related operations in a single class. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3e011f1e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3e011f1e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3e011f1e

Branch: refs/heads/branch-2
Commit: 3e011f1ec225d8fca8d24a097b43f978e1db6303
Parents: 3d4536a
Author: Haohui Mai <wh...@apache.org>
Authored: Tue Nov 25 21:03:41 2014 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Nov 25 21:03:57 2014 -0800

----------------------------------------------------------------------
 .../hdfs/server/namenode/FSDirSnapshotOp.java   | 222 +++++++++++++++++++
 .../hdfs/server/namenode/FSDirectory.java       |  48 +---
 .../hdfs/server/namenode/FSNamesystem.java      | 172 ++++----------
 3 files changed, 276 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e011f1e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
new file mode 100644
index 0000000..aa751a7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
+import org.apache.hadoop.hdfs.util.ChunkedArrayList;
+
+import java.io.IOException;
+import java.util.List;
+
+class FSDirSnapshotOp {
+  /** Allow snapshot on a directory. */
+  static void allowSnapshot(FSDirectory fsd, SnapshotManager snapshotManager,
+                            String path) throws IOException {
+    fsd.writeLock();
+    try {
+      snapshotManager.setSnapshottable(path, true);
+    } finally {
+      fsd.writeUnlock();
+    }
+    fsd.getEditLog().logAllowSnapshot(path);
+  }
+
+  static void disallowSnapshot(
+      FSDirectory fsd, SnapshotManager snapshotManager,
+      String path) throws IOException {
+    fsd.writeLock();
+    try {
+      snapshotManager.resetSnapshottable(path);
+    } finally {
+      fsd.writeUnlock();
+    }
+    fsd.getEditLog().logDisallowSnapshot(path);
+  }
+
+  /**
+   * Create a snapshot
+   * @param snapshotRoot The directory path where the snapshot is taken
+   * @param snapshotName The name of the snapshot
+   */
+  static String createSnapshot(
+      FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
+      String snapshotName, boolean logRetryCache)
+      throws IOException {
+    final FSPermissionChecker pc = fsd.getPermissionChecker();
+
+    String snapshotPath = null;
+    if (fsd.isPermissionEnabled()) {
+      fsd.checkOwner(pc, snapshotRoot);
+    }
+
+    if (snapshotName == null || snapshotName.isEmpty()) {
+      snapshotName = Snapshot.generateDefaultSnapshotName();
+    }
+
+    if(snapshotName != null){
+      if (!DFSUtil.isValidNameForComponent(snapshotName)) {
+        throw new InvalidPathException("Invalid snapshot name: " +
+            snapshotName);
+      }
+    }
+    fsd.verifySnapshotName(snapshotName, snapshotRoot);
+    fsd.writeLock();
+    try {
+      snapshotPath = snapshotManager.createSnapshot(snapshotRoot, snapshotName);
+    } finally {
+      fsd.writeUnlock();
+    }
+    fsd.getEditLog().logCreateSnapshot(snapshotRoot, snapshotName,
+        logRetryCache);
+
+    return snapshotPath;
+  }
+
+  static void renameSnapshot(FSDirectory fsd, SnapshotManager snapshotManager,
+      String path, String snapshotOldName, String snapshotNewName,
+      boolean logRetryCache) throws IOException {
+
+    if (fsd.isPermissionEnabled()) {
+      FSPermissionChecker pc = fsd.getPermissionChecker();
+        fsd.checkOwner(pc, path);
+    }
+    fsd.verifySnapshotName(snapshotNewName, path);
+    fsd.writeLock();
+    try {
+      snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
+    } finally {
+      fsd.writeUnlock();
+    }
+    fsd.getEditLog().logRenameSnapshot(path, snapshotOldName,
+        snapshotNewName, logRetryCache);
+  }
+
+  static SnapshottableDirectoryStatus[] getSnapshottableDirListing(
+      FSDirectory fsd, SnapshotManager snapshotManager) throws IOException {
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    fsd.readLock();
+    try {
+      final String user = pc.isSuperUser()? null : pc.getUser();
+      return snapshotManager.getSnapshottableDirListing(user);
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+
+  static SnapshotDiffReport getSnapshotDiffReport(
+      FSDirectory fsd, SnapshotManager snapshotManager, String path,
+      String fromSnapshot, String toSnapshot) throws IOException {
+    SnapshotDiffReport diffs;
+    final FSPermissionChecker pc = fsd.getPermissionChecker();
+    fsd.readLock();
+    try {
+      if (fsd.isPermissionEnabled()) {
+        checkSubtreeReadPermission(fsd, pc, path, fromSnapshot);
+        checkSubtreeReadPermission(fsd, pc, path, toSnapshot);
+      }
+      diffs = snapshotManager.diff(path, fromSnapshot, toSnapshot);
+    } finally {
+      fsd.readUnlock();
+    }
+    return diffs;
+  }
+
+  /**
+   * Delete a snapshot of a snapshottable directory
+   * @param snapshotRoot The snapshottable directory
+   * @param snapshotName The name of the to-be-deleted snapshot
+   * @throws IOException
+   */
+  static INode.BlocksMapUpdateInfo deleteSnapshot(
+      FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
+      String snapshotName, boolean logRetryCache)
+      throws IOException {
+    final FSPermissionChecker pc = fsd.getPermissionChecker();
+
+    INode.BlocksMapUpdateInfo collectedBlocks = new INode.BlocksMapUpdateInfo();
+    if (fsd.isPermissionEnabled()) {
+      fsd.checkOwner(pc, snapshotRoot);
+    }
+
+    ChunkedArrayList<INode> removedINodes = new ChunkedArrayList<INode>();
+    fsd.writeLock();
+    try {
+      snapshotManager.deleteSnapshot(snapshotRoot, snapshotName,
+          collectedBlocks, removedINodes);
+      fsd.removeFromInodeMap(removedINodes);
+    } finally {
+      fsd.writeUnlock();
+    }
+    removedINodes.clear();
+    fsd.getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName,
+        logRetryCache);
+
+    return collectedBlocks;
+  }
+
+  private static void checkSubtreeReadPermission(
+      FSDirectory fsd, final FSPermissionChecker pc, String snapshottablePath,
+      String snapshot) throws IOException {
+    final String fromPath = snapshot == null ?
+        snapshottablePath : Snapshot.getSnapshotPath(snapshottablePath,
+        snapshot);
+    fsd.checkPermission(pc, fromPath, false, null, null, FsAction.READ,
+        FsAction.READ);
+  }
+
+  /**
+   * Check if the given INode (or one of its descendants) is snapshottable and
+   * already has snapshots.
+   *
+   * @param target The given INode
+   * @param snapshottableDirs The list of directories that are snapshottable
+   *                          but do not have snapshots yet
+   */
+  static void checkSnapshot(
+      INode target, List<INodeDirectory> snapshottableDirs)
+      throws SnapshotException {
+    if (target.isDirectory()) {
+      INodeDirectory targetDir = target.asDirectory();
+      DirectorySnapshottableFeature sf = targetDir
+          .getDirectorySnapshottableFeature();
+      if (sf != null) {
+        if (sf.getNumSnapshots() > 0) {
+          String fullPath = targetDir.getFullPathName();
+          throw new SnapshotException("The directory " + fullPath
+              + " cannot be deleted since " + fullPath
+              + " is snapshottable and already has snapshots");
+        } else {
+          if (snapshottableDirs != null) {
+            snapshottableDirs.add(targetDir);
+          }
+        }
+      }
+      for (INode child : targetDir.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
+        checkSnapshot(child, snapshottableDirs);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e011f1e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 46b0460..2caaabd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -686,7 +686,7 @@ public class FSDirectory implements Closeable {
     List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
     if (dstInode != null) { // Destination exists
       validateRenameOverwrite(src, dst, overwrite, srcInode, dstInode);
-      checkSnapshot(dstInode, snapshottableDirs);
+      FSDirSnapshotOp.checkSnapshot(dstInode, snapshottableDirs);
     }
 
     INode dstParent = dstIIP.getINode(-2);
@@ -863,7 +863,7 @@ public class FSDirectory implements Closeable {
     }
     // srcInode and its subtree cannot contain snapshottable directories with
     // snapshots
-    checkSnapshot(srcInode, null);
+    FSDirSnapshotOp.checkSnapshot(srcInode, null);
   }
 
   /**
@@ -1208,7 +1208,7 @@ public class FSDirectory implements Closeable {
         filesRemoved = -1;
       } else {
         List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
-        checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
+        FSDirSnapshotOp.checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
         filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks,
             removedINodes, mtime);
         namesystem.removeSnapshottableDirs(snapshottableDirs);
@@ -1278,7 +1278,7 @@ public class FSDirectory implements Closeable {
     long filesRemoved = -1;
     if (deleteAllowed(inodesInPath, src)) {
       List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
-      checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
+      FSDirSnapshotOp.checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
       filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks,
           removedINodes, mtime);
       namesystem.removeSnapshottableDirs(snapshottableDirs); 
@@ -1342,38 +1342,6 @@ public class FSDirectory implements Closeable {
     }
     return removed;
   }
-  
-  /**
-   * Check if the given INode (or one of its descendants) is snapshottable and
-   * already has snapshots.
-   * 
-   * @param target The given INode
-   * @param snapshottableDirs The list of directories that are snapshottable 
-   *                          but do not have snapshots yet
-   */
-  private static void checkSnapshot(INode target,
-      List<INodeDirectory> snapshottableDirs) throws SnapshotException {
-    if (target.isDirectory()) {
-      INodeDirectory targetDir = target.asDirectory();
-      DirectorySnapshottableFeature sf = targetDir
-          .getDirectorySnapshottableFeature();
-      if (sf != null) {
-        if (sf.getNumSnapshots() > 0) {
-          String fullPath = targetDir.getFullPathName();
-          throw new SnapshotException("The directory " + fullPath
-              + " cannot be deleted since " + fullPath
-              + " is snapshottable and already has snapshots");
-        } else {
-          if (snapshottableDirs != null) {
-            snapshottableDirs.add(targetDir);
-          }
-        }
-      } 
-      for (INode child : targetDir.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
-        checkSnapshot(child, snapshottableDirs);
-      }
-    }
-  }
 
   private byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
     return inodePolicy != BlockStoragePolicySuite.ID_UNSPECIFIED ? inodePolicy :
@@ -3323,10 +3291,10 @@ public class FSDirectory implements Closeable {
    * details of the parameters, see
    * {@link FSPermissionChecker#checkPermission}.
    */
-  private void checkPermission(
-    FSPermissionChecker pc, String path, boolean doCheckOwner,
-    FsAction ancestorAccess, FsAction parentAccess, FsAction access,
-    FsAction subAccess)
+  void checkPermission(
+      FSPermissionChecker pc, String path, boolean doCheckOwner,
+      FsAction ancestorAccess, FsAction parentAccess, FsAction access,
+      FsAction subAccess)
     throws AccessControlException, UnresolvedLinkException {
     checkPermission(pc, path, doCheckOwner, ancestorAccess,
         parentAccess, access, subAccess, false, true);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e011f1e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 328eff3..f4eeb18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -7594,55 +7594,39 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
   
   /** Allow snapshot on a directory. */
-  void allowSnapshot(String path) throws SafeModeException, IOException {
+  void allowSnapshot(String path) throws IOException {
     checkOperation(OperationCategory.WRITE);
+    boolean success = false;
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot allow snapshot for " + path);
       checkSuperuserPrivilege();
-
-      dir.writeLock();
-      try {
-        snapshotManager.setSnapshottable(path, true);
-      } finally {
-        dir.writeUnlock();
-      }
-      getEditLog().logAllowSnapshot(path);
+      FSDirSnapshotOp.allowSnapshot(dir, snapshotManager, path);
+      success = true;
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      logAuditEvent(true, "allowSnapshot", path, null, null);
-    }
+    logAuditEvent(success, "allowSnapshot", path, null, null);
   }
   
   /** Disallow snapshot on a directory. */
-  void disallowSnapshot(String path) throws SafeModeException, IOException {
+  void disallowSnapshot(String path) throws IOException {
     checkOperation(OperationCategory.WRITE);
+    boolean success = false;
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot disallow snapshot for " + path);
       checkSuperuserPrivilege();
-
-      dir.writeLock();
-      try {
-        snapshotManager.resetSnapshottable(path);
-      } finally {
-        dir.writeUnlock();
-      }
-      getEditLog().logDisallowSnapshot(path);
+      FSDirSnapshotOp.disallowSnapshot(dir, snapshotManager, path);
+      success = true;
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      logAuditEvent(true, "disallowSnapshot", path, null, null);
-    }
+    logAuditEvent(success, "disallowSnapshot", path, null, null);
   }
   
   /**
@@ -7651,45 +7635,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * @param snapshotName The name of the snapshot
    */
   String createSnapshot(String snapshotRoot, String snapshotName,
-                        boolean logRetryCache)
-      throws SafeModeException, IOException {
-    checkOperation(OperationCategory.WRITE);
-    final FSPermissionChecker pc = getPermissionChecker();
-
+                        boolean logRetryCache) throws IOException {
     String snapshotPath = null;
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot create snapshot for " + snapshotRoot);
-      if (isPermissionEnabled) {
-        checkOwner(pc, snapshotRoot);
-      }
-
-      if (snapshotName == null || snapshotName.isEmpty()) {
-        snapshotName = Snapshot.generateDefaultSnapshotName();
-      }
-      if(snapshotName != null){
-        if (!DFSUtil.isValidNameForComponent(snapshotName)) {
-            throw new InvalidPathException("Invalid snapshot name: "
-                + snapshotName);
-        }
-      }
-      dir.verifySnapshotName(snapshotName, snapshotRoot);
-      dir.writeLock();
-      try {
-        snapshotPath = snapshotManager.createSnapshot(snapshotRoot, snapshotName);
-      } finally {
-        dir.writeUnlock();
-      }
-      getEditLog().logCreateSnapshot(snapshotRoot, snapshotName, logRetryCache);
+      snapshotPath = FSDirSnapshotOp.createSnapshot(dir,
+          snapshotManager, snapshotRoot, snapshotName, logRetryCache);
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      logAuditEvent(true, "createSnapshot", snapshotRoot, snapshotPath, null);
-    }
+    logAuditEvent(snapshotPath != null, "createSnapshot", snapshotRoot,
+        snapshotPath, null);
     return snapshotPath;
   }
   
@@ -7705,32 +7664,22 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       String path, String snapshotOldName, String snapshotNewName,
       boolean logRetryCache) throws IOException {
     checkOperation(OperationCategory.WRITE);
-    final FSPermissionChecker pc = getPermissionChecker();
-
+    boolean success = false;
     writeLock();
-
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot rename snapshot for " + path);
-      if (isPermissionEnabled) {
-        checkOwner(pc, path);
-      }
-      dir.verifySnapshotName(snapshotNewName, path);
-      
-      snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
-      getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName,
-          logRetryCache);
+      FSDirSnapshotOp.renameSnapshot(dir, snapshotManager, path,
+          snapshotOldName, snapshotNewName, logRetryCache);
+      success = true;
     } finally {
       writeUnlock();
-
     }
     getEditLog().logSync();
-    
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      String oldSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotOldName);
-      String newSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotNewName);
-      logAuditEvent(true, "renameSnapshot", oldSnapshotRoot, newSnapshotRoot, null);
-    }
+    String oldSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotOldName);
+    String newSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotNewName);
+    logAuditEvent(success, "renameSnapshot", oldSnapshotRoot,
+        newSnapshotRoot, null);
   }
   
   /**
@@ -7744,18 +7693,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throws IOException {
     SnapshottableDirectoryStatus[] status = null;
     checkOperation(OperationCategory.READ);
-    final FSPermissionChecker checker = getPermissionChecker();
+    boolean success = false;
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      final String user = checker.isSuperUser()? null : checker.getUser();
-      status = snapshotManager.getSnapshottableDirListing(user);
+      status = FSDirSnapshotOp.getSnapshottableDirListing(dir, snapshotManager);
+      success = true;
     } finally {
       readUnlock();
     }
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      logAuditEvent(true, "listSnapshottableDirectory", null, null, null);
-    }
+    logAuditEvent(success, "listSnapshottableDirectory", null, null, null);
     return status;
   }
   
@@ -7776,35 +7723,21 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   SnapshotDiffReport getSnapshotDiffReport(String path,
       String fromSnapshot, String toSnapshot) throws IOException {
-    SnapshotDiffReport diffs;
+    SnapshotDiffReport diffs = null;
     checkOperation(OperationCategory.READ);
-    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      if (isPermissionEnabled) {
-        checkSubtreeReadPermission(pc, path, fromSnapshot);
-        checkSubtreeReadPermission(pc, path, toSnapshot);
-      }
-      diffs = snapshotManager.diff(path, fromSnapshot, toSnapshot);
+      diffs = FSDirSnapshotOp.getSnapshotDiffReport(dir, snapshotManager,
+          path, fromSnapshot, toSnapshot);
     } finally {
       readUnlock();
     }
 
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      logAuditEvent(true, "computeSnapshotDiff", null, null, null);
-    }
+    logAuditEvent(diffs != null, "computeSnapshotDiff", null, null, null);
     return diffs;
   }
   
-  private void checkSubtreeReadPermission(final FSPermissionChecker pc,
-      final String snapshottablePath, final String snapshot)
-          throws AccessControlException, UnresolvedLinkException {
-    final String fromPath = snapshot == null?
-        snapshottablePath: Snapshot.getSnapshotPath(snapshottablePath, snapshot);
-    checkPermission(pc, fromPath, false, null, null, FsAction.READ, FsAction.READ);
-  }
-  
   /**
    * Delete a snapshot of a snapshottable directory
    * @param snapshotRoot The snapshottable directory
@@ -7812,45 +7745,32 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * @throws SafeModeException
    * @throws IOException
    */
-  void deleteSnapshot(String snapshotRoot, String snapshotName,
-                      boolean logRetryCache)
-      throws SafeModeException, IOException {
+  void deleteSnapshot(
+      String snapshotRoot, String snapshotName, boolean logRetryCache)
+      throws IOException {
     checkOperation(OperationCategory.WRITE);
-    final FSPermissionChecker pc = getPermissionChecker();
-
-    BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
+    boolean success = false;
     writeLock();
+    BlocksMapUpdateInfo blocksToBeDeleted = null;
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot delete snapshot for " + snapshotRoot);
-      if (isPermissionEnabled) {
-        checkOwner(pc, snapshotRoot);
-      }
-
-      List<INode> removedINodes = new ChunkedArrayList<INode>();
-      dir.writeLock();
-      try {
-        snapshotManager.deleteSnapshot(snapshotRoot, snapshotName,
-            collectedBlocks, removedINodes);
-        dir.removeFromInodeMap(removedINodes);
-      } finally {
-        dir.writeUnlock();
-      }
-      removedINodes.clear();
-      getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName, logRetryCache);
+      blocksToBeDeleted = FSDirSnapshotOp.deleteSnapshot(dir, snapshotManager,
+          snapshotRoot, snapshotName, logRetryCache);
+      success = true;
     } finally {
       writeUnlock();
-
     }
     getEditLog().logSync();
 
-    removeBlocks(collectedBlocks);
-    collectedBlocks.clear();
-
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      String rootPath = Snapshot.getSnapshotPath(snapshotRoot, snapshotName);
-      logAuditEvent(true, "deleteSnapshot", rootPath, null, null);
+    // Breaking the pattern as removing blocks have to happen outside of the
+    // global lock
+    if (blocksToBeDeleted != null) {
+      removeBlocks(blocksToBeDeleted);
     }
+
+    String rootPath = Snapshot.getSnapshotPath(snapshotRoot, snapshotName);
+    logAuditEvent(success, "deleteSnapshot", rootPath, null, null);
   }
 
   /**