You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/12/09 04:31:26 UTC

[32/41] hadoop git commit: HDFS-7476. Consolidate ACL-related operations to a single class. Contributed by Haohui Mai.

HDFS-7476. Consolidate ACL-related operations to a single class. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9297f980
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9297f980
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9297f980

Branch: refs/heads/YARN-2139
Commit: 9297f980c2de8886ff970946a2513e6890cd5552
Parents: e227fb8
Author: cnauroth <cn...@apache.org>
Authored: Sat Dec 6 14:20:00 2014 -0800
Committer: cnauroth <cn...@apache.org>
Committed: Sat Dec 6 14:20:00 2014 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hadoop/hdfs/server/namenode/AclStorage.java |  33 ---
 .../hadoop/hdfs/server/namenode/FSDirAclOp.java | 244 +++++++++++++++++++
 .../hdfs/server/namenode/FSDirectory.java       | 158 ++----------
 .../hdfs/server/namenode/FSEditLogLoader.java   |   2 +-
 .../hdfs/server/namenode/FSNamesystem.java      | 119 ++-------
 .../hdfs/server/namenode/TestAuditLogger.java   |  79 ++----
 7 files changed, 318 insertions(+), 320 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9297f980/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 87b02c4..769be43 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -438,6 +438,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7459. Consolidate cache-related implementation in FSNamesystem into
     a single class. (wheat9)
 
+    HDFS-7476. Consolidate ACL-related operations to a single class.
+    (wheat9 via cnauroth)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9297f980/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
index ac30597..a866046 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
@@ -241,39 +241,6 @@ final class AclStorage {
   }
 
   /**
-   * Completely removes the ACL from an inode.
-   *
-   * @param inode INode to update
-   * @param snapshotId int latest snapshot ID of inode
-   * @throws QuotaExceededException if quota limit is exceeded
-   */
-  public static void removeINodeAcl(INode inode, int snapshotId)
-      throws QuotaExceededException {
-    AclFeature f = inode.getAclFeature();
-    if (f == null) {
-      return;
-    }
-
-    FsPermission perm = inode.getFsPermission();
-    List<AclEntry> featureEntries = getEntriesFromAclFeature(f);
-    if (featureEntries.get(0).getScope() == AclEntryScope.ACCESS) {
-      // Restore group permissions from the feature's entry to permission
-      // bits, overwriting the mask, which is not part of a minimal ACL.
-      AclEntry groupEntryKey = new AclEntry.Builder()
-          .setScope(AclEntryScope.ACCESS).setType(AclEntryType.GROUP).build();
-      int groupEntryIndex = Collections.binarySearch(featureEntries,
-          groupEntryKey, AclTransformation.ACL_ENTRY_COMPARATOR);
-      assert groupEntryIndex >= 0;
-      FsAction groupPerm = featureEntries.get(groupEntryIndex).getPermission();
-      FsPermission newPerm = new FsPermission(perm.getUserAction(), groupPerm,
-          perm.getOtherAction(), perm.getStickyBit());
-      inode.setPermission(newPerm, snapshotId);
-    }
-
-    inode.removeAclFeature(snapshotId);
-  }
-
-  /**
    * Updates an inode with a new ACL.  This method takes a full logical ACL and
    * stores the entries to the inode's {@link FsPermission} and
    * {@link AclFeature}.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9297f980/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
new file mode 100644
index 0000000..ac899aa
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.AclException;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+class FSDirAclOp {
+  static HdfsFileStatus modifyAclEntries(
+      FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
+      throws IOException {
+    String src = srcArg;
+    checkAclsConfigFlag(fsd);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    src = fsd.resolvePath(pc, src, pathComponents);
+    fsd.writeLock();
+    try {
+      INodesInPath iip = fsd.getINodesInPath4Write(
+          FSDirectory.normalizePath(src), true);
+      fsd.checkOwner(pc, iip);
+      INode inode = FSDirectory.resolveLastINode(src, iip);
+      int snapshotId = iip.getLatestSnapshotId();
+      List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
+      List<AclEntry> newAcl = AclTransformation.mergeAclEntries(
+          existingAcl, aclSpec);
+      AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
+      fsd.getEditLog().logSetAcl(src, newAcl);
+    } finally {
+      fsd.writeUnlock();
+    }
+    return fsd.getAuditFileInfo(src, false);
+  }
+
+  static HdfsFileStatus removeAclEntries(
+      FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
+      throws IOException {
+    String src = srcArg;
+    checkAclsConfigFlag(fsd);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    src = fsd.resolvePath(pc, src, pathComponents);
+    fsd.writeLock();
+    try {
+      INodesInPath iip = fsd.getINodesInPath4Write(
+          FSDirectory.normalizePath(src), true);
+      fsd.checkOwner(pc, iip);
+      INode inode = FSDirectory.resolveLastINode(src, iip);
+      int snapshotId = iip.getLatestSnapshotId();
+      List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
+      List<AclEntry> newAcl = AclTransformation.filterAclEntriesByAclSpec(
+        existingAcl, aclSpec);
+      AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
+      fsd.getEditLog().logSetAcl(src, newAcl);
+    } finally {
+      fsd.writeUnlock();
+    }
+    return fsd.getAuditFileInfo(src, false);
+  }
+
+  static HdfsFileStatus removeDefaultAcl(FSDirectory fsd, final String srcArg)
+      throws IOException {
+    String src = srcArg;
+    checkAclsConfigFlag(fsd);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    src = fsd.resolvePath(pc, src, pathComponents);
+    fsd.writeLock();
+    try {
+      INodesInPath iip = fsd.getINodesInPath4Write(
+          FSDirectory.normalizePath(src), true);
+      fsd.checkOwner(pc, iip);
+      INode inode = FSDirectory.resolveLastINode(src, iip);
+      int snapshotId = iip.getLatestSnapshotId();
+      List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
+      List<AclEntry> newAcl = AclTransformation.filterDefaultAclEntries(
+        existingAcl);
+      AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
+      fsd.getEditLog().logSetAcl(src, newAcl);
+    } finally {
+      fsd.writeUnlock();
+    }
+    return fsd.getAuditFileInfo(src, false);
+  }
+
+  static HdfsFileStatus removeAcl(FSDirectory fsd, final String srcArg)
+      throws IOException {
+    String src = srcArg;
+    checkAclsConfigFlag(fsd);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    src = fsd.resolvePath(pc, src, pathComponents);
+    fsd.writeLock();
+    try {
+      INodesInPath iip = fsd.getINodesInPath4Write(src);
+      fsd.checkOwner(pc, iip);
+      unprotectedRemoveAcl(fsd, src);
+    } finally {
+      fsd.writeUnlock();
+    }
+    fsd.getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST);
+    return fsd.getAuditFileInfo(src, false);
+  }
+
+  static HdfsFileStatus setAcl(
+      FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
+      throws IOException {
+    String src = srcArg;
+    checkAclsConfigFlag(fsd);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    src = fsd.resolvePath(pc, src, pathComponents);
+    fsd.writeLock();
+    try {
+      INodesInPath iip = fsd.getINodesInPath4Write(src);
+      fsd.checkOwner(pc, iip);
+      List<AclEntry> newAcl = unprotectedSetAcl(fsd, src, aclSpec);
+      fsd.getEditLog().logSetAcl(src, newAcl);
+    } finally {
+      fsd.writeUnlock();
+    }
+    return fsd.getAuditFileInfo(src, false);
+  }
+
+  static AclStatus getAclStatus(
+      FSDirectory fsd, String src) throws IOException {
+    checkAclsConfigFlag(fsd);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    src = fsd.resolvePath(pc, src, pathComponents);
+    String srcs = FSDirectory.normalizePath(src);
+    fsd.readLock();
+    try {
+      // There is no real inode for the path ending in ".snapshot", so return a
+      // non-null, unpopulated AclStatus.  This is similar to getFileInfo.
+      if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR) &&
+          fsd.getINode4DotSnapshot(srcs) != null) {
+        return new AclStatus.Builder().owner("").group("").build();
+      }
+      INodesInPath iip = fsd.getINodesInPath(srcs, true);
+      if (fsd.isPermissionEnabled()) {
+        fsd.checkTraverse(pc, iip);
+      }
+      INode inode = FSDirectory.resolveLastINode(srcs, iip);
+      int snapshotId = iip.getPathSnapshotId();
+      List<AclEntry> acl = AclStorage.readINodeAcl(inode, snapshotId);
+      return new AclStatus.Builder()
+          .owner(inode.getUserName()).group(inode.getGroupName())
+          .stickyBit(inode.getFsPermission(snapshotId).getStickyBit())
+          .addEntries(acl).build();
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+
+  static List<AclEntry> unprotectedSetAcl(
+      FSDirectory fsd, String src, List<AclEntry> aclSpec)
+      throws IOException {
+    // ACL removal is logged to edits as OP_SET_ACL with an empty list.
+    if (aclSpec.isEmpty()) {
+      unprotectedRemoveAcl(fsd, src);
+      return AclFeature.EMPTY_ENTRY_LIST;
+    }
+
+    assert fsd.hasWriteLock();
+    INodesInPath iip = fsd.getINodesInPath4Write(FSDirectory.normalizePath
+        (src), true);
+    INode inode = FSDirectory.resolveLastINode(src, iip);
+    int snapshotId = iip.getLatestSnapshotId();
+    List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
+    List<AclEntry> newAcl = AclTransformation.replaceAclEntries(existingAcl,
+      aclSpec);
+    AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
+    return newAcl;
+  }
+
+  private static void checkAclsConfigFlag(FSDirectory fsd) throws AclException {
+    if (!fsd.isAclsEnabled()) {
+      throw new AclException(String.format(
+          "The ACL operation has been rejected.  "
+              + "Support for ACLs has been disabled by setting %s to false.",
+          DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY));
+    }
+  }
+
+  private static void unprotectedRemoveAcl(FSDirectory fsd, String src)
+      throws IOException {
+    assert fsd.hasWriteLock();
+    INodesInPath iip = fsd.getINodesInPath4Write(
+        FSDirectory.normalizePath(src), true);
+    INode inode = FSDirectory.resolveLastINode(src, iip);
+    int snapshotId = iip.getLatestSnapshotId();
+    AclFeature f = inode.getAclFeature();
+    if (f == null) {
+      return;
+    }
+
+    FsPermission perm = inode.getFsPermission();
+    List<AclEntry> featureEntries = AclStorage.getEntriesFromAclFeature(f);
+    if (featureEntries.get(0).getScope() == AclEntryScope.ACCESS) {
+      // Restore group permissions from the feature's entry to permission
+      // bits, overwriting the mask, which is not part of a minimal ACL.
+      AclEntry groupEntryKey = new AclEntry.Builder()
+          .setScope(AclEntryScope.ACCESS).setType(AclEntryType.GROUP).build();
+      int groupEntryIndex = Collections.binarySearch(
+          featureEntries, groupEntryKey,
+          AclTransformation.ACL_ENTRY_COMPARATOR);
+      assert groupEntryIndex >= 0;
+      FsAction groupPerm = featureEntries.get(groupEntryIndex).getPermission();
+      FsPermission newPerm = new FsPermission(perm.getUserAction(), groupPerm,
+          perm.getOtherAction(), perm.getStickyBit());
+      inode.setPermission(newPerm, snapshotId);
+    }
+
+    inode.removeAclFeature(snapshotId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9297f980/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 444589e..82741ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -143,6 +142,12 @@ public class FSDirectory implements Closeable {
   private final ReentrantReadWriteLock dirLock;
 
   private final boolean isPermissionEnabled;
+  /**
+   * Support for ACLs is controlled by a configuration flag. If the
+   * configuration flag is false, then the NameNode will reject all
+   * ACL-related operations.
+   */
+  private final boolean aclsEnabled;
   private final String fsOwnerShortUserName;
   private final String supergroup;
   private final INodeId inodeId;
@@ -204,7 +209,10 @@ public class FSDirectory implements Closeable {
     this.supergroup = conf.get(
       DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
       DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
-
+    this.aclsEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY,
+        DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_DEFAULT);
+    LOG.info("ACLs enabled? " + aclsEnabled);
     int configuredLimit = conf.getInt(
         DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
     this.lsLimit = configuredLimit>0 ?
@@ -263,6 +271,9 @@ public class FSDirectory implements Closeable {
   boolean isPermissionEnabled() {
     return isPermissionEnabled;
   }
+  boolean isAclsEnabled() {
+    return aclsEnabled;
+  }
 
   int getLsLimit() {
     return lsLimit;
@@ -1549,140 +1560,6 @@ public class FSDirectory implements Closeable {
     return addINode(path, symlink) ? symlink : null;
   }
 
-  List<AclEntry> modifyAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
-    writeLock();
-    try {
-      return unprotectedModifyAclEntries(src, aclSpec);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  private List<AclEntry> unprotectedModifyAclEntries(String src,
-      List<AclEntry> aclSpec) throws IOException {
-    assert hasWriteLock();
-    INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
-    INode inode = resolveLastINode(src, iip);
-    int snapshotId = iip.getLatestSnapshotId();
-    List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
-    List<AclEntry> newAcl = AclTransformation.mergeAclEntries(existingAcl,
-      aclSpec);
-    AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
-    return newAcl;
-  }
-
-  List<AclEntry> removeAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
-    writeLock();
-    try {
-      return unprotectedRemoveAclEntries(src, aclSpec);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  private List<AclEntry> unprotectedRemoveAclEntries(String src,
-      List<AclEntry> aclSpec) throws IOException {
-    assert hasWriteLock();
-    INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
-    INode inode = resolveLastINode(src, iip);
-    int snapshotId = iip.getLatestSnapshotId();
-    List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
-    List<AclEntry> newAcl = AclTransformation.filterAclEntriesByAclSpec(
-      existingAcl, aclSpec);
-    AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
-    return newAcl;
-  }
-
-  List<AclEntry> removeDefaultAcl(String src) throws IOException {
-    writeLock();
-    try {
-      return unprotectedRemoveDefaultAcl(src);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  private List<AclEntry> unprotectedRemoveDefaultAcl(String src)
-      throws IOException {
-    assert hasWriteLock();
-    INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
-    INode inode = resolveLastINode(src, iip);
-    int snapshotId = iip.getLatestSnapshotId();
-    List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
-    List<AclEntry> newAcl = AclTransformation.filterDefaultAclEntries(
-      existingAcl);
-    AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
-    return newAcl;
-  }
-
-  void removeAcl(String src) throws IOException {
-    writeLock();
-    try {
-      unprotectedRemoveAcl(src);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  private void unprotectedRemoveAcl(String src) throws IOException {
-    assert hasWriteLock();
-    INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
-    INode inode = resolveLastINode(src, iip);
-    int snapshotId = iip.getLatestSnapshotId();
-    AclStorage.removeINodeAcl(inode, snapshotId);
-  }
-
-  List<AclEntry> setAcl(String src, List<AclEntry> aclSpec) throws IOException {
-    writeLock();
-    try {
-      return unprotectedSetAcl(src, aclSpec);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  List<AclEntry> unprotectedSetAcl(String src, List<AclEntry> aclSpec)
-      throws IOException {
-    // ACL removal is logged to edits as OP_SET_ACL with an empty list.
-    if (aclSpec.isEmpty()) {
-      unprotectedRemoveAcl(src);
-      return AclFeature.EMPTY_ENTRY_LIST;
-    }
-
-    assert hasWriteLock();
-    INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
-    INode inode = resolveLastINode(src, iip);
-    int snapshotId = iip.getLatestSnapshotId();
-    List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
-    List<AclEntry> newAcl = AclTransformation.replaceAclEntries(existingAcl,
-      aclSpec);
-    AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
-    return newAcl;
-  }
-
-  AclStatus getAclStatus(String src) throws IOException {
-    String srcs = normalizePath(src);
-    readLock();
-    try {
-      // There is no real inode for the path ending in ".snapshot", so return a
-      // non-null, unpopulated AclStatus.  This is similar to getFileInfo.
-      if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR) &&
-          getINode4DotSnapshot(srcs) != null) {
-        return new AclStatus.Builder().owner("").group("").build();
-      }
-      INodesInPath iip = getLastINodeInPath(srcs, true);
-      INode inode = resolveLastINode(src, iip);
-      int snapshotId = iip.getPathSnapshotId();
-      List<AclEntry> acl = AclStorage.readINodeAcl(inode, snapshotId);
-      return new AclStatus.Builder()
-          .owner(inode.getUserName()).group(inode.getGroupName())
-          .stickyBit(inode.getFsPermission(snapshotId).getStickyBit())
-          .addEntries(acl).build();
-    } finally {
-      readUnlock();
-    }
-  }
-
   /**
    * Removes a list of XAttrs from an inode at a path.
    *
@@ -2065,9 +1942,10 @@ public class FSDirectory implements Closeable {
     return null;
   }
 
-  private static INode resolveLastINode(String src, INodesInPath iip)
+  static INode resolveLastINode(String src, INodesInPath iip)
       throws FileNotFoundException {
-    INode inode = iip.getLastINode();
+    INode[] inodes = iip.getINodes();
+    INode inode = inodes[inodes.length - 1];
     if (inode == null)
       throw new FileNotFoundException("cannot find " + src);
     return inode;
@@ -2246,8 +2124,8 @@ public class FSDirectory implements Closeable {
   }
 
   /** @return the {@link INodesInPath} containing only the last inode. */
-  private INodesInPath getLastINodeInPath(String path, boolean resolveLink
-  ) throws UnresolvedLinkException {
+  INodesInPath getLastINodeInPath(
+      String path, boolean resolveLink) throws UnresolvedLinkException {
     return INodesInPath.resolve(rootDir, INode.getPathComponents(path), 1,
             resolveLink);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9297f980/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index f60f142..d63545b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -823,7 +823,7 @@ public class FSEditLogLoader {
     }
     case OP_SET_ACL: {
       SetAclOp setAclOp = (SetAclOp) op;
-      fsDir.unprotectedSetAcl(setAclOp.src, setAclOp.aclEntries);
+      FSDirAclOp.unprotectedSetAcl(fsDir, setAclOp.src, setAclOp.aclEntries);
       break;
     }
     case OP_SET_XATTR: {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9297f980/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 982798f..e9ce78c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -532,7 +532,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   private final RetryCache retryCache;
 
-  private final boolean aclsEnabled;
   private final boolean xattrsEnabled;
   private final int xattrMaxSize;
 
@@ -851,10 +850,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         auditLoggers.get(0) instanceof DefaultAuditLogger;
       this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
 
-      this.aclsEnabled = conf.getBoolean(
-          DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY,
-          DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_DEFAULT);
-      LOG.info("ACLs enabled? " + aclsEnabled);
       this.xattrsEnabled = conf.getBoolean(
           DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY,
           DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_DEFAULT);
@@ -7731,158 +7726,105 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return results;
   }
 
-  void modifyAclEntries(final String srcArg, List<AclEntry> aclSpec)
+  void modifyAclEntries(final String src, List<AclEntry> aclSpec)
       throws IOException {
-    String src = srcArg;
-    checkAclsConfigFlag();
-    HdfsFileStatus resultingStat = null;
-    FSPermissionChecker pc = getPermissionChecker();
+    HdfsFileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot modify ACL entries on " + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath4Write(src);
-      dir.checkOwner(pc, iip);
-      List<AclEntry> newAcl = dir.modifyAclEntries(src, aclSpec);
-      getEditLog().logSetAcl(src, newAcl);
-      resultingStat = getAuditFileInfo(src, false);
+      auditStat = FSDirAclOp.modifyAclEntries(dir, src, aclSpec);
     } catch (AccessControlException e) {
-      logAuditEvent(false, "modifyAclEntries", srcArg);
+      logAuditEvent(false, "modifyAclEntries", src);
       throw e;
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "modifyAclEntries", srcArg, null, resultingStat);
+    logAuditEvent(true, "modifyAclEntries", src, null, auditStat);
   }
 
-  void removeAclEntries(final String srcArg, List<AclEntry> aclSpec)
+  void removeAclEntries(final String src, List<AclEntry> aclSpec)
       throws IOException {
-    String src = srcArg;
-    checkAclsConfigFlag();
-    HdfsFileStatus resultingStat = null;
-    FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    HdfsFileStatus auditStat = null;
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove ACL entries on " + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath4Write(src);
-      dir.checkOwner(pc, iip);
-      List<AclEntry> newAcl = dir.removeAclEntries(src, aclSpec);
-      getEditLog().logSetAcl(src, newAcl);
-      resultingStat = getAuditFileInfo(src, false);
+      auditStat = FSDirAclOp.removeAclEntries(dir, src, aclSpec);
     } catch (AccessControlException e) {
-      logAuditEvent(false, "removeAclEntries", srcArg);
+      logAuditEvent(false, "removeAclEntries", src);
       throw e;
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "removeAclEntries", srcArg, null, resultingStat);
+    logAuditEvent(true, "removeAclEntries", src, null, auditStat);
   }
 
-  void removeDefaultAcl(final String srcArg) throws IOException {
-    String src = srcArg;
-    checkAclsConfigFlag();
-    HdfsFileStatus resultingStat = null;
-    FSPermissionChecker pc = getPermissionChecker();
+  void removeDefaultAcl(final String src) throws IOException {
+    HdfsFileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove default ACL entries on " + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath4Write(src);
-      dir.checkOwner(pc, iip);
-      List<AclEntry> newAcl = dir.removeDefaultAcl(src);
-      getEditLog().logSetAcl(src, newAcl);
-      resultingStat = getAuditFileInfo(src, false);
+      auditStat = FSDirAclOp.removeDefaultAcl(dir, src);
     } catch (AccessControlException e) {
-      logAuditEvent(false, "removeDefaultAcl", srcArg);
+      logAuditEvent(false, "removeDefaultAcl", src);
       throw e;
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "removeDefaultAcl", srcArg, null, resultingStat);
+    logAuditEvent(true, "removeDefaultAcl", src, null, auditStat);
   }
 
-  void removeAcl(final String srcArg) throws IOException {
-    String src = srcArg;
-    checkAclsConfigFlag();
-    HdfsFileStatus resultingStat = null;
-    FSPermissionChecker pc = getPermissionChecker();
+  void removeAcl(final String src) throws IOException {
+    HdfsFileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove ACL on " + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath4Write(src);
-      dir.checkOwner(pc, iip);
-      dir.removeAcl(src);
-      getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST);
-      resultingStat = getAuditFileInfo(src, false);
+      auditStat = FSDirAclOp.removeAcl(dir, src);
     } catch (AccessControlException e) {
-      logAuditEvent(false, "removeAcl", srcArg);
+      logAuditEvent(false, "removeAcl", src);
       throw e;
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "removeAcl", srcArg, null, resultingStat);
+    logAuditEvent(true, "removeAcl", src, null, auditStat);
   }
 
-  void setAcl(final String srcArg, List<AclEntry> aclSpec) throws IOException {
-    String src = srcArg;
-    checkAclsConfigFlag();
-    HdfsFileStatus resultingStat = null;
-    FSPermissionChecker pc = getPermissionChecker();
+  void setAcl(final String src, List<AclEntry> aclSpec) throws IOException {
+    HdfsFileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set ACL on " + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath4Write(src);
-      dir.checkOwner(pc, iip);
-      List<AclEntry> newAcl = dir.setAcl(src, aclSpec);
-      getEditLog().logSetAcl(src, newAcl);
-      resultingStat = getAuditFileInfo(src, false);
+      auditStat = FSDirAclOp.setAcl(dir, src, aclSpec);
     } catch (AccessControlException e) {
-      logAuditEvent(false, "setAcl", srcArg);
+      logAuditEvent(false, "setAcl", src);
       throw e;
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "setAcl", srcArg, null, resultingStat);
+    logAuditEvent(true, "setAcl", src, null, auditStat);
   }
 
   AclStatus getAclStatus(String src) throws IOException {
-    checkAclsConfigFlag();
-    FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.READ);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     boolean success = false;
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      src = dir.resolvePath(pc, src, pathComponents);
-      INodesInPath iip = dir.getINodesInPath(src, true);
-      if (isPermissionEnabled) {
-        dir.checkPermission(pc, iip, false, null, null, null, null);
-      }
-      final AclStatus ret = dir.getAclStatus(src);
+      final AclStatus ret = FSDirAclOp.getAclStatus(dir, src);
       success = true;
       return ret;
     } finally {
@@ -8369,15 +8311,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
-  private void checkAclsConfigFlag() throws AclException {
-    if (!aclsEnabled) {
-      throw new AclException(String.format(
-          "The ACL operation has been rejected.  "
-              + "Support for ACLs has been disabled by setting %s to false.",
-          DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY));
-    }
-  }
-
   private void checkXAttrsConfigFlag() throws IOException {
     if (!xattrsEnabled) {
       throw new IOException(String.format(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9297f980/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
index c91cd75..0c119bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
@@ -18,21 +18,6 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.NNTOP_ENABLED_KEY;
-import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
-import static org.junit.Assert.*;
-import static org.mockito.Matchers.anyListOf;
-import static org.mockito.Matchers.anyString;
-
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.InetAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.List;
-
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -53,6 +38,22 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.NNTOP_ENABLED_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doThrow;
+
 /**
  * Tests for the {@link AuditLogger} custom audit logging interface.
  */
@@ -208,27 +209,10 @@ public class TestAuditLogger {
     try {
       cluster.waitClusterUp();
       final FSDirectory dir = cluster.getNamesystem().getFSDirectory();
-      // Set up mock FSDirectory to test FSN audit logging during failure
+
       final FSDirectory mockedDir = Mockito.spy(dir);
-      Mockito.doThrow(new AccessControlException("mock setAcl exception")).
-          when(mockedDir).
-          setAcl(anyString(), anyListOf(AclEntry.class));
-      Mockito.doThrow(new AccessControlException("mock getAclStatus exception")).
-          when(mockedDir).
-          getAclStatus(anyString());
-      Mockito.doThrow(new AccessControlException("mock removeAcl exception")).
-          when(mockedDir).
-          removeAcl(anyString());
-      Mockito.doThrow(new AccessControlException("mock removeDefaultAcl exception")).
-          when(mockedDir).
-          removeDefaultAcl(anyString());
-      Mockito.doThrow(new AccessControlException("mock removeAclEntries exception")).
-          when(mockedDir).
-          removeAclEntries(anyString(), anyListOf(AclEntry.class));
-      Mockito.doThrow(new AccessControlException("mock modifyAclEntries exception")).
-          when(mockedDir).
-          modifyAclEntries(anyString(), anyListOf(AclEntry.class));
-      // Replace the FSD with the mock FSD.
+      AccessControlException ex = new AccessControlException();
+      doThrow(ex).when(mockedDir).getPermissionChecker();
       cluster.getNamesystem().setFSDirectory(mockedDir);
       assertTrue(DummyAuditLogger.initialized);
       DummyAuditLogger.resetLogCount();
@@ -239,39 +223,28 @@ public class TestAuditLogger {
 
       try {
         fs.getAclStatus(p);
-      } catch (AccessControlException e) {
-        assertExceptionContains("mock getAclStatus exception", e);
-      }
+      } catch (AccessControlException ignored) {}
 
       try {
         fs.setAcl(p, acls);
-      } catch (AccessControlException e) {
-        assertExceptionContains("mock setAcl exception", e);
-      }
+      } catch (AccessControlException ignored) {}
 
       try {
         fs.removeAcl(p);
-      } catch (AccessControlException e) {
-        assertExceptionContains("mock removeAcl exception", e);
-      }
+      } catch (AccessControlException ignored) {}
 
       try {
         fs.removeDefaultAcl(p);
-      } catch (AccessControlException e) {
-        assertExceptionContains("mock removeDefaultAcl exception", e);
-      }
+      } catch (AccessControlException ignored) {}
 
       try {
         fs.removeAclEntries(p, acls);
-      } catch (AccessControlException e) {
-        assertExceptionContains("mock removeAclEntries exception", e);
-      }
+      } catch (AccessControlException ignored) {}
 
       try {
         fs.modifyAclEntries(p, acls);
-      } catch (AccessControlException e) {
-        assertExceptionContains("mock modifyAclEntries exception", e);
-      }
+      } catch (AccessControlException ignored) {}
+
       assertEquals(6, DummyAuditLogger.logCount);
       assertEquals(6, DummyAuditLogger.unsuccessfulCount);
     } finally {