You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/02/22 20:52:25 UTC

hadoop git commit: HDFS-13136. Avoid taking FSN lock while doing group member lookup for FSD permission check. Contributed by Xiaoyu Yao.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 3132709b4 -> 84a1321f6


HDFS-13136. Avoid taking FSN lock while doing group member lookup for FSD permission check. Contributed by Xiaoyu Yao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/84a1321f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/84a1321f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/84a1321f

Branch: refs/heads/trunk
Commit: 84a1321f6aa0af6895564a7c47f8f264656f0294
Parents: 3132709
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Thu Feb 15 00:02:05 2018 -0800
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Thu Feb 22 11:32:32 2018 -0800

----------------------------------------------------------------------
 .../server/namenode/EncryptionZoneManager.java  |   3 +-
 .../hadoop/hdfs/server/namenode/FSDirAclOp.java |  28 ++-
 .../hdfs/server/namenode/FSDirAttrOp.java       |  54 +++---
 .../hdfs/server/namenode/FSDirConcatOp.java     |   5 +-
 .../hdfs/server/namenode/FSDirDeleteOp.java     |   8 +-
 .../server/namenode/FSDirEncryptionZoneOp.java  |  12 +-
 .../hdfs/server/namenode/FSDirMkdirOp.java      |   3 +-
 .../hdfs/server/namenode/FSDirRenameOp.java     |  11 +-
 .../hdfs/server/namenode/FSDirSnapshotOp.java   |  38 ++--
 .../server/namenode/FSDirStatAndListingOp.java  |  35 ++--
 .../hdfs/server/namenode/FSDirXAttrOp.java      |  35 ++--
 .../hdfs/server/namenode/FSNamesystem.java      | 190 ++++++++++++-------
 .../hdfs/server/namenode/NameNodeAdapter.java   |   5 +-
 .../hdfs/server/namenode/TestAuditLogger.java   |   3 +-
 .../namenode/TestAuditLoggerWithCommands.java   |   4 +-
 15 files changed, 243 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/84a1321f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
index 3fcf797..176ae1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
@@ -154,9 +154,10 @@ public class EncryptionZoneManager {
   public void pauseForTestingAfterNthCheckpoint(final String zone,
       final int count) throws IOException {
     INodesInPath iip;
+    final FSPermissionChecker pc = dir.getPermissionChecker();
     dir.readLock();
     try {
-      iip = dir.resolvePath(dir.getPermissionChecker(), zone, DirOp.READ);
+      iip = dir.resolvePath(pc, zone, DirOp.READ);
     } finally {
       dir.readUnlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84a1321f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
index cc51430..7b3471d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
@@ -36,11 +36,10 @@ import java.util.List;
 
 class FSDirAclOp {
   static FileStatus modifyAclEntries(
-      FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
-      throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, final String srcArg,
+      List<AclEntry> aclSpec) throws IOException {
     String src = srcArg;
     checkAclsConfigFlag(fsd);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -61,11 +60,10 @@ class FSDirAclOp {
   }
 
   static FileStatus removeAclEntries(
-      FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
-      throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, final String srcArg,
+      List<AclEntry> aclSpec) throws IOException {
     String src = srcArg;
     checkAclsConfigFlag(fsd);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -85,11 +83,10 @@ class FSDirAclOp {
     return fsd.getAuditFileInfo(iip);
   }
 
-  static FileStatus removeDefaultAcl(FSDirectory fsd, final String srcArg)
-      throws IOException {
+  static FileStatus removeDefaultAcl(FSDirectory fsd, FSPermissionChecker pc,
+      final String srcArg) throws IOException {
     String src = srcArg;
     checkAclsConfigFlag(fsd);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -109,11 +106,10 @@ class FSDirAclOp {
     return fsd.getAuditFileInfo(iip);
   }
 
-  static FileStatus removeAcl(FSDirectory fsd, final String srcArg)
-      throws IOException {
+  static FileStatus removeAcl(FSDirectory fsd, FSPermissionChecker pc,
+      final String srcArg) throws IOException {
     String src = srcArg;
     checkAclsConfigFlag(fsd);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -129,11 +125,10 @@ class FSDirAclOp {
   }
 
   static FileStatus setAcl(
-      FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
-      throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, final String srcArg,
+      List<AclEntry> aclSpec) throws IOException {
     String src = srcArg;
     checkAclsConfigFlag(fsd);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -148,9 +143,8 @@ class FSDirAclOp {
   }
 
   static AclStatus getAclStatus(
-      FSDirectory fsd, String src) throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, String src) throws IOException {
     checkAclsConfigFlag(fsd);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.readLock();
     try {
       INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84a1321f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
index 201605f..406fe80 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
@@ -51,12 +51,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KE
 
 public class FSDirAttrOp {
   static FileStatus setPermission(
-      FSDirectory fsd, final String src, FsPermission permission)
-      throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, final String src,
+      FsPermission permission) throws IOException {
     if (FSDirectory.isExactReservedName(src)) {
       throw new InvalidPathException(src);
     }
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -71,12 +70,11 @@ public class FSDirAttrOp {
   }
 
   static FileStatus setOwner(
-      FSDirectory fsd, String src, String username, String group)
-      throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, String src, String username,
+      String group) throws IOException {
     if (FSDirectory.isExactReservedName(src)) {
       throw new InvalidPathException(src);
     }
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -101,10 +99,8 @@ public class FSDirAttrOp {
   }
 
   static FileStatus setTimes(
-      FSDirectory fsd, String src, long mtime, long atime)
-      throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
-
+      FSDirectory fsd, FSPermissionChecker pc, String src, long mtime,
+      long atime) throws IOException {
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -129,11 +125,10 @@ public class FSDirAttrOp {
   }
 
   static boolean setReplication(
-      FSDirectory fsd, BlockManager bm, String src, final short replication)
-      throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, BlockManager bm, String src,
+      final short replication) throws IOException {
     bm.verifyReplication(src, replication, null);
     final boolean isFile;
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.writeLock();
     try {
       final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE);
@@ -153,33 +148,31 @@ public class FSDirAttrOp {
     return isFile;
   }
 
-  static FileStatus unsetStoragePolicy(FSDirectory fsd, BlockManager bm,
-      String src) throws IOException {
-    return setStoragePolicy(fsd, bm, src,
+  static FileStatus unsetStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
+      BlockManager bm, String src) throws IOException {
+    return setStoragePolicy(fsd, pc, bm, src,
         HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, "unset");
   }
 
-  static FileStatus setStoragePolicy(FSDirectory fsd, BlockManager bm,
-      String src, final String policyName) throws IOException {
+  static FileStatus setStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
+      BlockManager bm, String src, final String policyName) throws IOException {
     // get the corresponding policy and make sure the policy name is valid
     BlockStoragePolicy policy = bm.getStoragePolicy(policyName);
     if (policy == null) {
       throw new HadoopIllegalArgumentException(
           "Cannot find a block policy with the name " + policyName);
     }
-
-    return setStoragePolicy(fsd, bm, src, policy.getId(), "set");
+    return setStoragePolicy(fsd, pc, bm, src, policy.getId(), "set");
   }
 
-  static FileStatus setStoragePolicy(FSDirectory fsd, BlockManager bm,
-      String src, final byte policyId, final String operation)
+  static FileStatus setStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
+      BlockManager bm, String src, final byte policyId, final String operation)
       throws IOException {
     if (!fsd.isStoragePolicyEnabled()) {
       throw new IOException(String.format(
           "Failed to %s storage policy since %s is set to false.", operation,
           DFS_STORAGE_POLICY_ENABLED_KEY));
     }
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -202,9 +195,8 @@ public class FSDirAttrOp {
     return bm.getStoragePolicies();
   }
 
-  static BlockStoragePolicy getStoragePolicy(FSDirectory fsd, BlockManager bm,
-      String path) throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
+  static BlockStoragePolicy getStoragePolicy(FSDirectory fsd,
+      FSPermissionChecker pc, BlockManager bm, String path) throws IOException {
     fsd.readLock();
     try {
       final INodesInPath iip = fsd.resolvePath(pc, path, DirOp.READ_LINK);
@@ -222,9 +214,8 @@ public class FSDirAttrOp {
     }
   }
 
-  static long getPreferredBlockSize(FSDirectory fsd, String src)
-      throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
+  static long getPreferredBlockSize(FSDirectory fsd, FSPermissionChecker pc,
+      String src) throws IOException {
     fsd.readLock();
     try {
       final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ_LINK);
@@ -240,9 +231,8 @@ public class FSDirAttrOp {
    *
    * Note: This does not support ".inodes" relative path.
    */
-  static void setQuota(FSDirectory fsd, String src, long nsQuota, long ssQuota,
-      StorageType type) throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
+  static void setQuota(FSDirectory fsd, FSPermissionChecker pc, String src,
+      long nsQuota, long ssQuota, StorageType type) throws IOException {
     if (fsd.isPermissionEnabled()) {
       pc.checkSuperuserPrivilege();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84a1321f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
index 4cc5389..b423a95 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
@@ -48,14 +48,13 @@ import static org.apache.hadoop.util.Time.now;
  */
 class FSDirConcatOp {
 
-  static FileStatus concat(FSDirectory fsd, String target, String[] srcs,
-    boolean logRetryCache) throws IOException {
+  static FileStatus concat(FSDirectory fsd, FSPermissionChecker pc,
+      String target, String[] srcs, boolean logRetryCache) throws IOException {
     validatePath(target, srcs);
     assert srcs != null;
     if (FSDirectory.LOG.isDebugEnabled()) {
       FSDirectory.LOG.debug("concat {} to {}", Arrays.toString(srcs), target);
     }
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     final INodesInPath targetIIP = fsd.resolvePath(pc, target, DirOp.WRITE);
     // write permission for the target
     if (fsd.isPermissionEnabled()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84a1321f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
index a83a8b6..1fbb564 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
@@ -88,6 +88,7 @@ class FSDirDeleteOp {
    * For small directory or file the deletion is done in one shot.
    *
    * @param fsn namespace
+   * @param pc FS permission checker
    * @param src path name to be deleted
    * @param recursive boolean true to apply to all sub-directories recursively
    * @param logRetryCache whether to record RPC ids in editlog for retry cache
@@ -96,10 +97,9 @@ class FSDirDeleteOp {
    * @throws IOException
    */
   static BlocksMapUpdateInfo delete(
-      FSNamesystem fsn, String src, boolean recursive, boolean logRetryCache)
-      throws IOException {
+      FSNamesystem fsn, FSPermissionChecker pc, String src, boolean recursive,
+      boolean logRetryCache) throws IOException {
     FSDirectory fsd = fsn.getFSDirectory();
-    FSPermissionChecker pc = fsd.getPermissionChecker();
 
     if (FSDirectory.isExactReservedName(src)) {
       throw new InvalidPathException(src);
@@ -130,7 +130,7 @@ class FSDirDeleteOp {
    * <br>
    *
    * @param fsd the FSDirectory instance
-   * @param src a string representation of a path to an inode
+   * @param iip inodes of a path to be deleted
    * @param mtime the time the inode is removed
    */
   static void deleteForEditLog(FSDirectory fsd, INodesInPath iip, long mtime)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84a1321f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
index 943e60d..9fbdaeb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
@@ -693,11 +693,12 @@ final class FSDirEncryptionZoneOp {
    * a different ACL. HDFS should not try to operate on additional ACLs, but
    * rather use the generate ACL it already has.
    */
-  static String getCurrentKeyVersion(final FSDirectory dir, final String zone)
-      throws IOException {
+  static String getCurrentKeyVersion(final FSDirectory dir,
+      final FSPermissionChecker pc, final String zone) throws IOException {
     assert dir.getProvider() != null;
     assert !dir.hasReadLock();
-    final String keyName = FSDirEncryptionZoneOp.getKeyNameForZone(dir, zone);
+    final String keyName = FSDirEncryptionZoneOp.getKeyNameForZone(dir,
+        pc, zone);
     if (keyName == null) {
       throw new IOException(zone + " is not an encryption zone.");
     }
@@ -719,11 +720,10 @@ final class FSDirEncryptionZoneOp {
    * Resolve the zone to an inode, find the encryption zone info associated with
    * that inode, and return the key name. Does not contact the KMS.
    */
-  static String getKeyNameForZone(final FSDirectory dir, final String zone)
-      throws IOException {
+  static String getKeyNameForZone(final FSDirectory dir,
+      final FSPermissionChecker pc, final String zone) throws IOException {
     assert dir.getProvider() != null;
     final INodesInPath iip;
-    final FSPermissionChecker pc = dir.getPermissionChecker();
     dir.readLock();
     try {
       iip = dir.resolvePath(pc, zone, DirOp.READ);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84a1321f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
index 89fd8a3..45bb6b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
@@ -39,13 +39,12 @@ import static org.apache.hadoop.util.Time.now;
 
 class FSDirMkdirOp {
 
-  static FileStatus mkdirs(FSNamesystem fsn, String src,
+  static FileStatus mkdirs(FSNamesystem fsn, FSPermissionChecker pc, String src,
       PermissionStatus permissions, boolean createParent) throws IOException {
     FSDirectory fsd = fsn.getFSDirectory();
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     }
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.writeLock();
     try {
       INodesInPath iip = fsd.resolvePath(pc, src, DirOp.CREATE);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84a1321f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
index bbbb724..efc8da2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
@@ -47,14 +47,12 @@ import static org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooL
 class FSDirRenameOp {
   @Deprecated
   static RenameResult renameToInt(
-      FSDirectory fsd, final String src, final String dst,
-      boolean logRetryCache)
-      throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, final String src,
+      final String dst, boolean logRetryCache) throws IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
           " to " + dst);
     }
-    FSPermissionChecker pc = fsd.getPermissionChecker();
 
     // Rename does not operate on link targets
     // Do not resolveLink when checking permissions of src and dst
@@ -230,8 +228,8 @@ class FSDirRenameOp {
    * The new rename which has the POSIX semantic.
    */
   static RenameResult renameToInt(
-      FSDirectory fsd, final String srcArg, final String dstArg,
-      boolean logRetryCache, Options.Rename... options)
+      FSDirectory fsd, FSPermissionChecker pc, final String srcArg,
+      final String dstArg, boolean logRetryCache, Options.Rename... options)
       throws IOException {
     String src = srcArg;
     String dst = dstArg;
@@ -239,7 +237,6 @@ class FSDirRenameOp {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options -" +
           " " + src + " to " + dst);
     }
-    final FSPermissionChecker pc = fsd.getPermissionChecker();
 
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     // returns resolved path

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84a1321f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
index 4dacbf2..4a72f54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
@@ -80,14 +80,17 @@ class FSDirSnapshotOp {
 
   /**
    * Create a snapshot
+   * @param fsd FS directory
+   * @param pc FS permission checker
    * @param snapshotRoot The directory path where the snapshot is taken
    * @param snapshotName The name of the snapshot
+   * @param logRetryCache whether to record RPC ids in editlog for retry cache
+   *                      rebuilding.
    */
   static String createSnapshot(
-      FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
-      String snapshotName, boolean logRetryCache)
+      FSDirectory fsd, FSPermissionChecker pc, SnapshotManager snapshotManager,
+      String snapshotRoot, String snapshotName, boolean logRetryCache)
       throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     final INodesInPath iip = fsd.resolvePath(pc, snapshotRoot, DirOp.WRITE);
     if (fsd.isPermissionEnabled()) {
       fsd.checkOwner(pc, iip);
@@ -115,10 +118,9 @@ class FSDirSnapshotOp {
     return snapshotPath;
   }
 
-  static void renameSnapshot(FSDirectory fsd, SnapshotManager snapshotManager,
-      String path, String snapshotOldName, String snapshotNewName,
-      boolean logRetryCache) throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
+  static void renameSnapshot(FSDirectory fsd, FSPermissionChecker pc,
+      SnapshotManager snapshotManager, String path, String snapshotOldName,
+      String snapshotNewName, boolean logRetryCache) throws IOException {
     final INodesInPath iip = fsd.resolvePath(pc, path, DirOp.WRITE);
     if (fsd.isPermissionEnabled()) {
       fsd.checkOwner(pc, iip);
@@ -136,8 +138,8 @@ class FSDirSnapshotOp {
   }
 
   static SnapshottableDirectoryStatus[] getSnapshottableDirListing(
-      FSDirectory fsd, SnapshotManager snapshotManager) throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
+      FSDirectory fsd, FSPermissionChecker pc, SnapshotManager snapshotManager)
+      throws IOException {
     fsd.readLock();
     try {
       final String user = pc.isSuperUser()? null : pc.getUser();
@@ -148,10 +150,9 @@ class FSDirSnapshotOp {
   }
 
   static SnapshotDiffReport getSnapshotDiffReport(FSDirectory fsd,
-      SnapshotManager snapshotManager, String path,
+      FSPermissionChecker pc, SnapshotManager snapshotManager, String path,
       String fromSnapshot, String toSnapshot) throws IOException {
     SnapshotDiffReport diffs;
-    final FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.readLock();
     try {
       INodesInPath iip = fsd.resolvePath(pc, path, DirOp.READ);
@@ -167,11 +168,10 @@ class FSDirSnapshotOp {
   }
 
   static SnapshotDiffReportListing getSnapshotDiffReportListing(FSDirectory fsd,
-      SnapshotManager snapshotManager, String path, String fromSnapshot,
-      String toSnapshot, byte[] startPath, int index,
+      FSPermissionChecker pc, SnapshotManager snapshotManager, String path,
+      String fromSnapshot, String toSnapshot, byte[] startPath, int index,
       int snapshotDiffReportLimit) throws IOException {
     SnapshotDiffReportListing diffs;
-    final FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.readLock();
     try {
       INodesInPath iip = fsd.resolvePath(pc, path, DirOp.READ);
@@ -226,15 +226,19 @@ class FSDirSnapshotOp {
 
   /**
    * Delete a snapshot of a snapshottable directory
+   * @param fsd The FS directory
+   * @param pc The permission checker
+   * @param snapshotManager The snapshot manager
    * @param snapshotRoot The snapshottable directory
    * @param snapshotName The name of the to-be-deleted snapshot
+   * @param logRetryCache whether to record RPC ids in editlog for retry cache
+   *                      rebuilding.
    * @throws IOException
    */
   static INode.BlocksMapUpdateInfo deleteSnapshot(
-      FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
-      String snapshotName, boolean logRetryCache)
+      FSDirectory fsd, FSPermissionChecker pc, SnapshotManager snapshotManager,
+      String snapshotRoot, String snapshotName, boolean logRetryCache)
       throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     final INodesInPath iip = fsd.resolvePath(pc, snapshotRoot, DirOp.WRITE);
     if (fsd.isPermissionEnabled()) {
       fsd.checkOwner(pc, iip);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84a1321f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index 8b77034..7e22ae1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -51,9 +51,9 @@ import java.util.EnumSet;
 import static org.apache.hadoop.util.Time.now;
 
 class FSDirStatAndListingOp {
-  static DirectoryListing getListingInt(FSDirectory fsd, final String srcArg,
-      byte[] startAfter, boolean needLocation) throws IOException {
-    final FSPermissionChecker pc = fsd.getPermissionChecker();
+  static DirectoryListing getListingInt(FSDirectory fsd, FSPermissionChecker pc,
+      final String srcArg, byte[] startAfter, boolean needLocation)
+      throws IOException {
     final INodesInPath iip = fsd.resolvePath(pc, srcArg, DirOp.READ);
 
     // Get file name when startAfter is an INodePath.  This is not the
@@ -85,7 +85,8 @@ class FSDirStatAndListingOp {
 
   /**
    * Get the file info for a specific file.
-   *
+   * @param fsd The FS directory
+   * @param pc The permission checker
    * @param srcArg The string representation of the path to the file
    * @param resolveLink whether to throw UnresolvedLinkException
    *        if src refers to a symlink
@@ -95,11 +96,10 @@ class FSDirStatAndListingOp {
    * @return object containing information regarding the file
    *         or null if file not found
    */
-  static HdfsFileStatus getFileInfo(FSDirectory fsd, String srcArg,
-      boolean resolveLink, boolean needLocation, boolean needBlockToken)
-      throws IOException {
+  static HdfsFileStatus getFileInfo(FSDirectory fsd, FSPermissionChecker pc,
+      String srcArg, boolean resolveLink, boolean needLocation,
+      boolean needBlockToken) throws IOException {
     DirOp dirOp = resolveLink ? DirOp.READ : DirOp.READ_LINK;
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     final INodesInPath iip;
     if (pc.isSuperUser()) {
       // superuser can only get an ACE if an existing ancestor is a file.
@@ -119,19 +119,18 @@ class FSDirStatAndListingOp {
   /**
    * Returns true if the file is closed
    */
-  static boolean isFileClosed(FSDirectory fsd, String src) throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
+  static boolean isFileClosed(FSDirectory fsd, FSPermissionChecker pc,
+      String src) throws IOException {
     final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
     return !INodeFile.valueOf(iip.getLastINode(), src).isUnderConstruction();
   }
 
   static ContentSummary getContentSummary(
-      FSDirectory fsd, String src) throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
+      FSDirectory fsd, FSPermissionChecker pc, String src) throws IOException {
     final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ_LINK);
     // getContentSummaryInt() call will check access (if enabled) when
     // traversing all sub directories.
-    return getContentSummaryInt(fsd, iip);
+    return getContentSummaryInt(fsd, pc, iip);
   }
 
   /**
@@ -516,7 +515,7 @@ class FSDirStatAndListingOp {
   }
 
   private static ContentSummary getContentSummaryInt(FSDirectory fsd,
-      INodesInPath iip) throws IOException {
+      FSPermissionChecker pc, INodesInPath iip) throws IOException {
     fsd.readLock();
     try {
       INode targetNode = iip.getLastINode();
@@ -528,8 +527,7 @@ class FSDirStatAndListingOp {
         // processed. 0 means disabled. I.e. blocking for the entire duration.
         ContentSummaryComputationContext cscc =
             new ContentSummaryComputationContext(fsd, fsd.getFSNamesystem(),
-                fsd.getContentCountLimit(), fsd.getContentSleepMicroSec(),
-                fsd.getPermissionChecker());
+                fsd.getContentCountLimit(), fsd.getContentSleepMicroSec(), pc);
         ContentSummary cs = targetNode.computeAndConvertContentSummary(
             iip.getPathSnapshotId(), cscc);
         fsd.addYieldCount(cscc.getYieldCount());
@@ -541,8 +539,7 @@ class FSDirStatAndListingOp {
   }
 
   static QuotaUsage getQuotaUsage(
-      FSDirectory fsd, String src) throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
+      FSDirectory fsd, FSPermissionChecker pc, String src) throws IOException {
     final INodesInPath iip;
     fsd.readLock();
     try {
@@ -559,7 +556,7 @@ class FSDirStatAndListingOp {
       return usage;
     } else {
       //If quota isn't set, fall back to getContentSummary.
-      return getContentSummaryInt(fsd, iip);
+      return getContentSummaryInt(fsd, pc, iip);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84a1321f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
index be3092c..24a475f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
@@ -51,22 +51,27 @@ class FSDirXAttrOp {
 
   /**
    * Set xattr for a file or directory.
-   *
+   * @param fsd
+   *          - FS directory
+   * @param pc
+   *          - FS permission checker
    * @param src
    *          - path on which it sets the xattr
    * @param xAttr
    *          - xAttr details to set
    * @param flag
    *          - xAttrs flags
+   * @param logRetryCache
+   *          - whether to record RPC ids in editlog for retry cache
+   *          rebuilding.
    * @throws IOException
    */
   static FileStatus setXAttr(
-      FSDirectory fsd, String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
-      boolean logRetryCache)
+      FSDirectory fsd, FSPermissionChecker pc, String src, XAttr xAttr,
+      EnumSet<XAttrSetFlag> flag, boolean logRetryCache)
       throws IOException {
     checkXAttrsConfigFlag(fsd);
     checkXAttrSize(fsd, xAttr);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     XAttrPermissionFilter.checkPermissionForApi(
         pc, xAttr, FSDirectory.isReservedRawName(src));
     List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
@@ -85,12 +90,10 @@ class FSDirXAttrOp {
     return fsd.getAuditFileInfo(iip);
   }
 
-  static List<XAttr> getXAttrs(FSDirectory fsd, final String srcArg,
-                               List<XAttr> xAttrs)
-      throws IOException {
+  static List<XAttr> getXAttrs(FSDirectory fsd, FSPermissionChecker pc,
+      final String srcArg, List<XAttr> xAttrs) throws IOException {
     String src = srcArg;
     checkXAttrsConfigFlag(fsd);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     final boolean isRawPath = FSDirectory.isReservedRawName(src);
     boolean getAll = xAttrs == null || xAttrs.isEmpty();
     if (!getAll) {
@@ -131,9 +134,8 @@ class FSDirXAttrOp {
   }
 
   static List<XAttr> listXAttrs(
-      FSDirectory fsd, String src) throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, String src) throws IOException {
     FSDirXAttrOp.checkXAttrsConfigFlag(fsd);
-    final FSPermissionChecker pc = fsd.getPermissionChecker();
     final boolean isRawPath = FSDirectory.isReservedRawName(src);
     final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
     if (fsd.isPermissionEnabled()) {
@@ -146,18 +148,23 @@ class FSDirXAttrOp {
 
   /**
    * Remove an xattr for a file or directory.
-   *
+   * @param fsd
+   *          - FS direcotry
+   * @param pc
+   *          - FS permission checker
    * @param src
    *          - path to remove the xattr from
    * @param xAttr
    *          - xAttr to remove
+   * @param logRetryCache
+   *          - whether to record RPC ids in editlog for retry cache
+   *          rebuilding.
    * @throws IOException
    */
   static FileStatus removeXAttr(
-      FSDirectory fsd, String src, XAttr xAttr, boolean logRetryCache)
-      throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, String src, XAttr xAttr,
+      boolean logRetryCache) throws IOException {
     FSDirXAttrOp.checkXAttrsConfigFlag(fsd);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     XAttrPermissionFilter.checkPermissionForApi(
         pc, xAttr, FSDirectory.isReservedRawName(src));
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84a1321f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index b0973a9..d36b122 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1869,11 +1869,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "setPermission";
     FileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set permission for " + src);
-      auditStat = FSDirAttrOp.setPermission(dir, src, permission);
+      auditStat = FSDirAttrOp.setPermission(dir, pc, src, permission);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -1893,11 +1894,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "setOwner";
     FileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set owner for " + src);
-      auditStat = FSDirAttrOp.setOwner(dir, src, username, group);
+      auditStat = FSDirAttrOp.setOwner(dir, pc, src, username, group);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -1917,7 +1919,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "open";
     checkOperation(OperationCategory.READ);
     GetBlockLocationsResult res = null;
-    FSPermissionChecker pc = getPermissionChecker();
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -2030,11 +2032,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "concat";
     FileStatus stat = null;
     boolean success = false;
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot concat " + target);
-      stat = FSDirConcatOp.concat(dir, target, srcs, logRetryCache);
+      stat = FSDirConcatOp.concat(dir, pc, target, srcs, logRetryCache);
       success = true;
     } catch (AccessControlException ace) {
       logAuditEvent(success, operationName, Arrays.toString(srcs),
@@ -2058,11 +2061,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "setTimes";
     FileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set times " + src);
-      auditStat = FSDirAttrOp.setTimes(dir, src, mtime, atime);
+      auditStat = FSDirAttrOp.setTimes(dir, pc, src, mtime, atime);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -2096,8 +2100,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         throw new HadoopIllegalArgumentException(
             "Cannot truncate to a negative file size: " + newLength + ".");
       }
-      final FSPermissionChecker pc = getPermissionChecker();
       checkOperation(OperationCategory.WRITE);
+      final FSPermissionChecker pc = getPermissionChecker();
       writeLock();
       BlocksMapUpdateInfo toRemoveBlocks = new BlocksMapUpdateInfo();
       try {
@@ -2166,11 +2170,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "setReplication";
     boolean success = false;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set replication for " + src);
-      success = FSDirAttrOp.setReplication(dir, blockManager, src, replication);
+      success = FSDirAttrOp.setReplication(dir, pc, blockManager, src,
+          replication);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -2194,11 +2200,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "setStoragePolicy";
     FileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set storage policy for " + src);
-      auditStat = FSDirAttrOp.setStoragePolicy(dir, blockManager, src,
+      auditStat = FSDirAttrOp.setStoragePolicy(dir, pc, blockManager, src,
                                                policyName);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -2219,11 +2226,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "unsetStoragePolicy";
     FileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot unset storage policy for " + src);
-      auditStat = FSDirAttrOp.unsetStoragePolicy(dir, blockManager, src);
+      auditStat = FSDirAttrOp.unsetStoragePolicy(dir, pc, blockManager, src);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -2242,10 +2250,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   BlockStoragePolicy getStoragePolicy(String src) throws IOException {
     checkOperation(OperationCategory.READ);
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      return FSDirAttrOp.getStoragePolicy(dir, blockManager, src);
+      return FSDirAttrOp.getStoragePolicy(dir, pc, blockManager, src);
     } finally {
       readUnlock("getStoragePolicy");
     }
@@ -2267,10 +2276,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   long getPreferredBlockSize(String src) throws IOException {
     checkOperation(OperationCategory.READ);
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      return FSDirAttrOp.getPreferredBlockSize(dir, src);
+      return FSDirAttrOp.getPreferredBlockSize(dir, pc, src);
     } finally {
       readUnlock("getPreferredBlockSize");
     }
@@ -2374,13 +2384,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           "ecPolicyName are exclusive parameters. Set both is not allowed!");
     }
 
-    FSPermissionChecker pc = getPermissionChecker();
     INodesInPath iip = null;
     boolean skipSync = true; // until we do something that might create edits
     HdfsFileStatus stat = null;
     BlocksMapUpdateInfo toRemoveBlocks = null;
 
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2461,8 +2471,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   boolean recoverLease(String src, String holder, String clientMachine)
       throws IOException {
     boolean skipSync = false;
-    FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2601,8 +2611,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     try {
       boolean skipSync = false;
       LastBlockWithStatus lbs = null;
-      final FSPermissionChecker pc = getPermissionChecker();
       checkOperation(OperationCategory.WRITE);
+      final FSPermissionChecker pc = getPermissionChecker();
       writeLock();
       try {
         checkOperation(OperationCategory.WRITE);
@@ -2657,8 +2667,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
     LocatedBlock[] onRetryBlock = new LocatedBlock[1];
     FSDirWriteFileOp.ValidateAddBlockResult r;
-    FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.READ);
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -2708,7 +2718,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final List<DatanodeStorageInfo> chosen;
     final BlockType blockType;
     checkOperation(OperationCategory.READ);
-    FSPermissionChecker pc = getPermissionChecker();
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -2756,7 +2766,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     NameNode.stateChangeLog.debug(
         "BLOCK* NameSystem.abandonBlock: {} of file {}", b, src);
     checkOperation(OperationCategory.WRITE);
-    FSPermissionChecker pc = getPermissionChecker();
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2821,7 +2831,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     throws IOException {
     boolean success = false;
     checkOperation(OperationCategory.WRITE);
-    FSPermissionChecker pc = getPermissionChecker();
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2899,11 +2909,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throws IOException {
     final String operationName = "rename";
     FSDirRenameOp.RenameResult ret = null;
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot rename " + src);
-      ret = FSDirRenameOp.renameToInt(dir, src, dst, logRetryCache);
+      ret = FSDirRenameOp.renameToInt(dir, pc, src, dst, logRetryCache);
     } catch (AccessControlException e)  {
       logAuditEvent(false, operationName, src, dst, null);
       throw e;
@@ -2923,11 +2935,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throws IOException {
     final String operationName = "rename";
     FSDirRenameOp.RenameResult res = null;
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot rename " + src);
-      res = FSDirRenameOp.renameToInt(dir, src, dst, logRetryCache, options);
+      res = FSDirRenameOp.renameToInt(dir, pc, src, dst, logRetryCache,
+          options);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName + " (options=" +
           Arrays.toString(options) + ")", src, dst, null);
@@ -2958,13 +2973,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throws IOException {
     final String operationName = "delete";
     BlocksMapUpdateInfo toRemovedBlocks = null;
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     boolean ret = false;
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot delete " + src);
       toRemovedBlocks = FSDirDeleteOp.delete(
-          this, src, recursive, logRetryCache);
+          this, pc, src, recursive, logRetryCache);
       ret = toRemovedBlocks != null;
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -3063,11 +3080,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = needBlockToken ? "open" : "getfileinfo";
     checkOperation(OperationCategory.READ);
     HdfsFileStatus stat = null;
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
       stat = FSDirStatAndListingOp.getFileInfo(
-          dir, src, resolveLink, needLocation, needBlockToken);
+          dir, pc, src, resolveLink, needLocation, needBlockToken);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -3084,10 +3102,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   boolean isFileClosed(final String src) throws IOException {
     final String operationName = "isFileClosed";
     checkOperation(OperationCategory.READ);
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      return FSDirStatAndListingOp.isFileClosed(dir, src);
+      return FSDirStatAndListingOp.isFileClosed(dir, pc, src);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -3104,11 +3123,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "mkdirs";
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot create directory " + src);
-      auditStat = FSDirMkdirOp.mkdirs(this, src, permissions, createParent);
+      auditStat = FSDirMkdirOp.mkdirs(this, pc, src, permissions,
+          createParent);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -3137,12 +3158,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   ContentSummary getContentSummary(final String src) throws IOException {
     checkOperation(OperationCategory.READ);
     final String operationName = "contentSummary";
-    readLock();
     boolean success = true;
     ContentSummary cs;
+    final FSPermissionChecker pc = getPermissionChecker();
+    readLock();
     try {
       checkOperation(OperationCategory.READ);
-      cs = FSDirStatAndListingOp.getContentSummary(dir, src);
+      cs = FSDirStatAndListingOp.getContentSummary(dir, pc, src);
     } catch (AccessControlException ace) {
       success = false;
       logAuditEvent(success, operationName, src);
@@ -3172,11 +3194,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkOperation(OperationCategory.READ);
     final String operationName = "quotaUsage";
     QuotaUsage quotaUsage;
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     boolean success = true;
     try {
       checkOperation(OperationCategory.READ);
-      quotaUsage = FSDirStatAndListingOp.getQuotaUsage(dir, src);
+      quotaUsage = FSDirStatAndListingOp.getQuotaUsage(dir, pc, src);
     } catch (AccessControlException ace) {
       success = false;
       logAuditEvent(success, operationName, src);
@@ -3202,12 +3225,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
     checkOperation(OperationCategory.WRITE);
     final String operationName = getQuotaCommand(nsQuota, ssQuota);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     boolean success = false;
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set quota on " + src);
-      FSDirAttrOp.setQuota(dir, src, nsQuota, ssQuota, type);
+      FSDirAttrOp.setQuota(dir, pc, src, nsQuota, ssQuota, type);
       success = true;
     } catch (AccessControlException ace) {
       logAuditEvent(success, operationName, src);
@@ -3234,8 +3258,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throws IOException {
     NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
     checkOperation(OperationCategory.WRITE);
-
-    FSPermissionChecker pc = getPermissionChecker();
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -3739,10 +3762,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkOperation(OperationCategory.READ);
     final String operationName = "listStatus";
     DirectoryListing dl = null;
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(NameNode.OperationCategory.READ);
-      dl = getListingInt(dir, src, startAfter, needLocation);
+      dl = getListingInt(dir, pc, src, startAfter, needLocation);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -4678,6 +4702,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
+  void checkSuperuserPrivilege(FSPermissionChecker pc)
+      throws AccessControlException {
+    if (isPermissionEnabled) {
+      pc.checkSuperuserPrivilege();
+    }
+  }
+
   /**
    * Check to see if we have exceeded the limit on the number
    * of inodes.
@@ -6365,14 +6396,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   String createSnapshot(String snapshotRoot, String snapshotName,
                         boolean logRetryCache) throws IOException {
+    checkOperation(OperationCategory.WRITE);
     final String operationName = "createSnapshot";
     String snapshotPath = null;
     boolean success = false;
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot create snapshot for " + snapshotRoot);
-      snapshotPath = FSDirSnapshotOp.createSnapshot(dir,
+      snapshotPath = FSDirSnapshotOp.createSnapshot(dir, pc,
           snapshotManager, snapshotRoot, snapshotName, logRetryCache);
       success = true;
     } catch (AccessControlException ace) {
@@ -6399,15 +6432,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   void renameSnapshot(
       String path, String snapshotOldName, String snapshotNewName,
       boolean logRetryCache) throws IOException {
+    checkOperation(OperationCategory.WRITE);
     final String operationName = "renameSnapshot";
     boolean success = false;
     String oldSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotOldName);
     String newSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotNewName);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot rename snapshot for " + path);
-      FSDirSnapshotOp.renameSnapshot(dir, snapshotManager, path,
+      FSDirSnapshotOp.renameSnapshot(dir, pc, snapshotManager, path,
           snapshotOldName, snapshotNewName, logRetryCache);
       success = true;
     } catch (AccessControlException ace) {
@@ -6435,10 +6470,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     SnapshottableDirectoryStatus[] status = null;
     checkOperation(OperationCategory.READ);
     boolean success = false;
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      status = FSDirSnapshotOp.getSnapshottableDirListing(dir, snapshotManager);
+      status = FSDirSnapshotOp.getSnapshottableDirListing(dir, pc,
+          snapshotManager);
       success = true;
     } catch (AccessControlException ace) {
       logAuditEvent(success, operationName, null, null, null);
@@ -6475,10 +6512,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         path : Snapshot.getSnapshotPath(path, fromSnapshot);
     String toSnapshotRoot = (toSnapshot == null || toSnapshot.isEmpty()) ?
         path : Snapshot.getSnapshotPath(path, toSnapshot);
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      diffs = FSDirSnapshotOp.getSnapshotDiffReport(dir, snapshotManager,
+      diffs = FSDirSnapshotOp.getSnapshotDiffReport(dir, pc, snapshotManager,
           path, fromSnapshot, toSnapshot);
       success = true;
     } catch (AccessControlException ace) {
@@ -6530,11 +6568,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     String toSnapshotRoot =
         (toSnapshot == null || toSnapshot.isEmpty()) ? path :
             Snapshot.getSnapshotPath(path, toSnapshot);
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
       diffs = FSDirSnapshotOp
-          .getSnapshotDiffReportListing(dir, snapshotManager, path,
+          .getSnapshotDiffReportListing(dir, pc, snapshotManager, path,
               fromSnapshot, toSnapshot, startPath, index,
               snapshotDiffReportLimit);
       success = true;
@@ -6562,14 +6601,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "deleteSnapshot";
     boolean success = false;
     String rootPath = null;
-    writeLock();
     BlocksMapUpdateInfo blocksToBeDeleted = null;
+    final FSPermissionChecker pc = getPermissionChecker();
+    writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot delete snapshot for " + snapshotRoot);
       rootPath = Snapshot.getSnapshotPath(snapshotRoot, snapshotName);
-      blocksToBeDeleted = FSDirSnapshotOp.deleteSnapshot(dir, snapshotManager,
-          snapshotRoot, snapshotName, logRetryCache);
+      blocksToBeDeleted = FSDirSnapshotOp.deleteSnapshot(dir, pc,
+          snapshotManager, snapshotRoot, snapshotName, logRetryCache);
       success = true;
     } catch (AccessControlException ace) {
       logAuditEvent(success, operationName, rootPath, null, null);
@@ -7051,11 +7091,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "modifyAclEntries";
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot modify ACL entries on " + src);
-      auditStat = FSDirAclOp.modifyAclEntries(dir, src, aclSpec);
+      auditStat = FSDirAclOp.modifyAclEntries(dir, pc, src, aclSpec);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -7071,11 +7112,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "removeAclEntries";
     checkOperation(OperationCategory.WRITE);
     FileStatus auditStat = null;
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove ACL entries on " + src);
-      auditStat = FSDirAclOp.removeAclEntries(dir, src, aclSpec);
+      auditStat = FSDirAclOp.removeAclEntries(dir, pc, src, aclSpec);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -7090,11 +7132,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "removeDefaultAcl";
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove default ACL entries on " + src);
-      auditStat = FSDirAclOp.removeDefaultAcl(dir, src);
+      auditStat = FSDirAclOp.removeDefaultAcl(dir, pc, src);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -7109,11 +7152,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "removeAcl";
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove ACL on " + src);
-      auditStat = FSDirAclOp.removeAcl(dir, src);
+      auditStat = FSDirAclOp.removeAcl(dir, pc, src);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -7128,11 +7172,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "setAcl";
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set ACL on " + src);
-      auditStat = FSDirAclOp.setAcl(dir, src, aclSpec);
+      auditStat = FSDirAclOp.setAcl(dir, pc, src, aclSpec);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -7147,10 +7192,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "getAclStatus";
     checkOperation(OperationCategory.READ);
     final AclStatus ret;
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      ret = FSDirAclOp.getAclStatus(dir, src);
+      ret = FSDirAclOp.getAclStatus(dir, pc, src);
     } catch(AccessControlException ace) {
       logAuditEvent(false, operationName, src);
       throw ace;
@@ -7179,13 +7225,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     try {
       Metadata metadata = FSDirEncryptionZoneOp.ensureKeyIsInitialized(dir,
           keyName, src);
-      checkSuperuserPrivilege();
-      FSPermissionChecker pc = getPermissionChecker();
+      final FSPermissionChecker pc = getPermissionChecker();
+      checkSuperuserPrivilege(pc);
       checkOperation(OperationCategory.WRITE);
       final FileStatus resultingStat;
       writeLock();
       try {
-        checkSuperuserPrivilege();
+        checkSuperuserPrivilege(pc);
         checkOperation(OperationCategory.WRITE);
         checkNameNodeSafeMode("Cannot create encryption zone on " + src);
         resultingStat = FSDirEncryptionZoneOp.createEncryptionZone(dir, src,
@@ -7240,12 +7286,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throws IOException {
     final String operationName = "listEncryptionZones";
     boolean success = false;
-    checkSuperuserPrivilege();
     checkOperation(OperationCategory.READ);
+    final FSPermissionChecker pc = getPermissionChecker();
+    checkSuperuserPrivilege(pc);
     readLock();
     try {
-      checkSuperuserPrivilege();
       checkOperation(OperationCategory.READ);
+      checkSuperuserPrivilege(pc);
       final BatchedListEntries<EncryptionZone> ret =
           FSDirEncryptionZoneOp.listEncryptionZones(dir, prevId);
       success = true;
@@ -7261,11 +7308,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     boolean success = false;
     try {
       Preconditions.checkNotNull(zone, "zone is null.");
-      checkSuperuserPrivilege();
       checkOperation(OperationCategory.WRITE);
+      final FSPermissionChecker pc = dir.getPermissionChecker();
+      checkSuperuserPrivilege(pc);
       checkNameNodeSafeMode("NameNode in safemode, cannot " + action
           + " re-encryption on zone " + zone);
-      reencryptEncryptionZoneInt(zone, action, logRetryCache);
+      reencryptEncryptionZoneInt(pc, zone, action, logRetryCache);
       success = true;
     } finally {
       logAuditEvent(success, action + "reencryption", zone, null, null);
@@ -7276,12 +7324,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       final long prevId) throws IOException {
     final String operationName = "listReencryptionStatus";
     boolean success = false;
-    checkSuperuserPrivilege();
     checkOperation(OperationCategory.READ);
+    final FSPermissionChecker pc = getPermissionChecker();
+    checkSuperuserPrivilege(pc);
     readLock();
     try {
-      checkSuperuserPrivilege();
       checkOperation(OperationCategory.READ);
+      checkSuperuserPrivilege(pc);
       final BatchedListEntries<ZoneReencryptionStatus> ret =
           FSDirEncryptionZoneOp.listReencryptionStatus(dir, prevId);
       success = true;
@@ -7292,9 +7341,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
-  private void reencryptEncryptionZoneInt(final String zone,
-      final ReencryptAction action, final boolean logRetryCache)
-      throws IOException {
+  private void reencryptEncryptionZoneInt(final FSPermissionChecker pc,
+      final String zone, final ReencryptAction action,
+      final boolean logRetryCache) throws IOException {
     if (getProvider() == null) {
       throw new IOException("No key provider configured, re-encryption "
           + "operation is rejected");
@@ -7302,7 +7351,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     String keyVersionName = null;
     if (action == ReencryptAction.START) {
       // get zone's latest key version name out of the lock.
-      keyVersionName = FSDirEncryptionZoneOp.getCurrentKeyVersion(dir, zone);
+      keyVersionName =
+          FSDirEncryptionZoneOp.getCurrentKeyVersion(dir, pc, zone);
       if (keyVersionName == null) {
         throw new IOException("Failed to get key version name for " + zone);
       }
@@ -7311,11 +7361,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
     writeLock();
     try {
-      checkSuperuserPrivilege();
+      checkSuperuserPrivilege(pc);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("NameNode in safemode, cannot " + action
           + " re-encryption on zone " + zone);
-      final FSPermissionChecker pc = dir.getPermissionChecker();
       List<XAttr> xattrs;
       dir.writeLock();
       try {
@@ -7550,7 +7599,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "getErasureCodingPolicy";
     boolean success = false;
     checkOperation(OperationCategory.READ);
-    FSPermissionChecker pc = getPermissionChecker();
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -7609,11 +7658,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throws IOException {
     final String operationName = "setXAttr";
     FileStatus auditStat = null;
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set XAttr on " + src);
-      auditStat = FSDirXAttrOp.setXAttr(dir, src, xAttr, flag, logRetryCache);
+      auditStat = FSDirXAttrOp.setXAttr(dir, pc, src, xAttr, flag,
+          logRetryCache);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -7629,10 +7681,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "getXAttrs";
     checkOperation(OperationCategory.READ);
     List<XAttr> fsXattrs;
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      fsXattrs = FSDirXAttrOp.getXAttrs(dir, src, xAttrs);
+      fsXattrs = FSDirXAttrOp.getXAttrs(dir, pc, src, xAttrs);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -7647,10 +7700,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final String operationName = "listXAttrs";
     checkOperation(OperationCategory.READ);
     List<XAttr> fsXattrs;
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      fsXattrs = FSDirXAttrOp.listXAttrs(dir, src);
+      fsXattrs = FSDirXAttrOp.listXAttrs(dir, pc, src);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -7665,11 +7719,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throws IOException {
     final String operationName = "removeXAttr";
     FileStatus auditStat = null;
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove XAttr entry on " + src);
-      auditStat = FSDirXAttrOp.removeXAttr(dir, src, xAttr, logRetryCache);
+      auditStat = FSDirXAttrOp.removeXAttr(dir, pc, src, xAttr, logRetryCache);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -7683,7 +7739,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   void checkAccess(String src, FsAction mode) throws IOException {
     final String operationName = "checkAccess";
     checkOperation(OperationCategory.READ);
-    FSPermissionChecker pc = getPermissionChecker();
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -7934,6 +7990,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         .size();
   }
 
+  // This method logs operatoinName without super user privilege.
+  // It should be called without holding FSN lock.
   void checkSuperuserPrivilege(String operationName)
       throws IOException {
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84a1321f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 11d7959..a71538d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -75,10 +75,13 @@ public class NameNodeAdapter {
       boolean resolveLink, boolean needLocation, boolean needBlockToken)
       throws AccessControlException, UnresolvedLinkException, StandbyException,
       IOException {
+    final FSPermissionChecker pc =
+        namenode.getNamesystem().getPermissionChecker();
     namenode.getNamesystem().readLock();
     try {
       return FSDirStatAndListingOp.getFileInfo(namenode.getNamesystem()
-          .getFSDirectory(), src, resolveLink, needLocation, needBlockToken);
+          .getFSDirectory(), pc, src, resolveLink, needLocation,
+          needBlockToken);
     } finally {
       namenode.getNamesystem().readUnlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84a1321f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
index c422f32..5b4f1f4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
@@ -75,6 +75,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doThrow;
 
 /**
@@ -415,7 +416,7 @@ public class TestAuditLogger {
 
       final FSDirectory mockedDir = Mockito.spy(dir);
       AccessControlException ex = new AccessControlException();
-      doThrow(ex).when(mockedDir).getPermissionChecker();
+      doThrow(ex).when(mockedDir).checkTraverse(any(), any(), any());
       cluster.getNamesystem().setFSDirectory(mockedDir);
       assertTrue(DummyAuditLogger.initialized);
       DummyAuditLogger.resetLogCount();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84a1321f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLoggerWithCommands.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLoggerWithCommands.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLoggerWithCommands.java
index 4eda88f..41ee03f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLoggerWithCommands.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLoggerWithCommands.java
@@ -53,6 +53,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import org.mockito.Mockito;
+
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -618,7 +620,7 @@ public class TestAuditLoggerWithCommands {
     final FSDirectory dir = cluster.getNamesystem().getFSDirectory();
     final FSDirectory mockedDir = Mockito.spy(dir);
     AccessControlException ex = new AccessControlException();
-    doThrow(ex).when(mockedDir).getPermissionChecker();
+    doThrow(ex).when(mockedDir).checkTraverse(any(), any(), any());
     cluster.getNamesystem().setFSDirectory(mockedDir);
     String aceGetAclStatus =
         ".*allowed=false.*ugi=theDoctor.*cmd=getAclStatus.*";


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org