You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/03/25 00:09:48 UTC

hadoop git commit: HDFS-6826. Plugin interface to enable delegation of HDFS authorization assertions. Contributed by Arun Suresh.

Repository: hadoop
Updated Branches:
  refs/heads/trunk e38ef70fb -> 53a28afe2


HDFS-6826. Plugin interface to enable delegation of HDFS authorization assertions. Contributed by Arun Suresh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/53a28afe
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/53a28afe
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/53a28afe

Branch: refs/heads/trunk
Commit: 53a28afe293e5bf185c8d4f2c7aea212e66015c2
Parents: e38ef70
Author: Jitendra Pandey <Ji...@Jitendra-Pandeys-MacBook-Pro-4.local>
Authored: Tue Mar 24 15:43:03 2015 -0700
Committer: Jitendra Pandey <Ji...@Jitendra-Pandeys-MacBook-Pro-4.local>
Committed: Tue Mar 24 16:02:40 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   1 +
 .../DefaultINodeAttributesProvider.java         |  45 ++++
 .../server/namenode/FSDirStatAndListingOp.java  |  51 +++--
 .../hdfs/server/namenode/FSDirectory.java       |  41 +++-
 .../hdfs/server/namenode/FSEditLogLoader.java   |   6 +-
 .../hdfs/server/namenode/FSNamesystem.java      |  19 ++
 .../server/namenode/FSPermissionChecker.java    | 222 +++++++++++-------
 .../server/namenode/INodeAttributeProvider.java | 135 +++++++++++
 .../hdfs/server/namenode/INodeAttributes.java   |   3 +
 .../namenode/INodeDirectoryAttributes.java      |   4 +
 .../server/namenode/INodeFileAttributes.java    |   5 +
 .../hdfs/server/namenode/INodesInPath.java      |   6 +
 .../namenode/TestFSPermissionChecker.java       |   4 +-
 .../namenode/TestINodeAttributeProvider.java    | 229 +++++++++++++++++++
 15 files changed, 659 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/53a28afe/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 5ade5fb..4bed2ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -390,6 +390,9 @@ Release 2.7.0 - UNRELEASED
 
     HDFS-7838. Expose truncate API for libhdfs. (yliu)
 
+    HDFS-6826. Plugin interface to enable delegation of HDFS authorization 
+    assertions. (Arun Suresh via jitendra)
+
   IMPROVEMENTS
 
     HDFS-7752. Improve description for

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53a28afe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 9ecf242..b5bbe5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -477,6 +477,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_DATANODE_IPC_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_IPC_DEFAULT_PORT;
   public static final String  DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY = "dfs.datanode.min.supported.namenode.version";
   public static final String  DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT";
+  public static final String  DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY = "dfs.namenode.inode.attributes.provider.class";
 
   public static final String  DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY = "dfs.block.access.token.enable";
   public static final boolean DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT = false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53a28afe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DefaultINodeAttributesProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DefaultINodeAttributesProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DefaultINodeAttributesProvider.java
new file mode 100644
index 0000000..45aa1b5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DefaultINodeAttributesProvider.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+/**
+ * A default implementation of the INodeAttributesProvider
+ *
+ */
+public class DefaultINodeAttributesProvider extends INodeAttributeProvider {
+
+  public static INodeAttributeProvider DEFAULT_PROVIDER =
+      new DefaultINodeAttributesProvider();
+
+  @Override
+  public void start() {
+    // NO-OP
+  }
+
+  @Override
+  public void stop() {
+    // NO-OP
+  }
+
+  @Override
+  public INodeAttributes getAttributes(String[] pathElements,
+      INodeAttributes inode) {
+    return inode;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53a28afe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index cb3da19..43c2de3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -181,7 +181,7 @@ class FSDirStatAndListingOp {
 
       if (!targetNode.isDirectory()) {
         return new DirectoryListing(
-            new HdfsFileStatus[]{createFileStatus(fsd,
+            new HdfsFileStatus[]{createFileStatus(fsd, src,
                 HdfsFileStatus.EMPTY_NAME, targetNode, needLocation,
                 parentStoragePolicy, snapshot, isRawPath, iip)}, 0);
       }
@@ -200,7 +200,7 @@ class FSDirStatAndListingOp {
         byte curPolicy = isSuperUser && !cur.isSymlink()?
             cur.getLocalStoragePolicyID():
             BlockStoragePolicySuite.ID_UNSPECIFIED;
-        listing[i] = createFileStatus(fsd, cur.getLocalNameBytes(), cur,
+        listing[i] = createFileStatus(fsd, src, cur.getLocalNameBytes(), cur,
             needLocation, getStoragePolicyID(curPolicy,
                 parentStoragePolicy), snapshot, isRawPath, iip);
         listingCnt++;
@@ -253,7 +253,7 @@ class FSDirStatAndListingOp {
     final HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
     for (int i = 0; i < numOfListing; i++) {
       Snapshot.Root sRoot = snapshots.get(i + skipSize).getRoot();
-      listing[i] = createFileStatus(fsd, sRoot.getLocalNameBytes(), sRoot,
+      listing[i] = createFileStatus(fsd, src, sRoot.getLocalNameBytes(), sRoot,
           BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
           false, INodesInPath.fromINode(sRoot));
     }
@@ -270,7 +270,7 @@ class FSDirStatAndListingOp {
    *         or null if file not found
    */
   static HdfsFileStatus getFileInfo(
-      FSDirectory fsd, INodesInPath src, boolean isRawPath,
+      FSDirectory fsd, String path, INodesInPath src, boolean isRawPath,
       boolean includeStoragePolicy)
       throws IOException {
     fsd.readLock();
@@ -279,7 +279,7 @@ class FSDirStatAndListingOp {
       byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ?
           i.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED;
       return i == null ? null : createFileStatus(
-          fsd, HdfsFileStatus.EMPTY_NAME, i, policyId,
+          fsd, path, HdfsFileStatus.EMPTY_NAME, i, policyId,
           src.getPathSnapshotId(), isRawPath, src);
     } finally {
       fsd.readUnlock();
@@ -303,7 +303,7 @@ class FSDirStatAndListingOp {
     fsd.readLock();
     try {
       final INodesInPath iip = fsd.getINodesInPath(srcs, resolveLink);
-      return getFileInfo(fsd, iip, isRawPath, includeStoragePolicy);
+      return getFileInfo(fsd, src, iip, isRawPath, includeStoragePolicy);
     } finally {
       fsd.readUnlock();
     }
@@ -340,14 +340,15 @@ class FSDirStatAndListingOp {
    * @throws java.io.IOException if any error occurs
    */
   static HdfsFileStatus createFileStatus(
-      FSDirectory fsd, byte[] path, INode node, boolean needLocation,
-      byte storagePolicy, int snapshot, boolean isRawPath, INodesInPath iip)
+      FSDirectory fsd, String fullPath, byte[] path, INode node,
+      boolean needLocation, byte storagePolicy, int snapshot, boolean isRawPath,
+      INodesInPath iip)
       throws IOException {
     if (needLocation) {
-      return createLocatedFileStatus(fsd, path, node, storagePolicy,
+      return createLocatedFileStatus(fsd, fullPath, path, node, storagePolicy,
           snapshot, isRawPath, iip);
     } else {
-      return createFileStatus(fsd, path, node, storagePolicy, snapshot,
+      return createFileStatus(fsd, fullPath, path, node, storagePolicy, snapshot,
           isRawPath, iip);
     }
   }
@@ -356,8 +357,9 @@ class FSDirStatAndListingOp {
    * Create FileStatus by file INode
    */
   static HdfsFileStatus createFileStatus(
-      FSDirectory fsd, byte[] path, INode node, byte storagePolicy,
-      int snapshot, boolean isRawPath, INodesInPath iip) throws IOException {
+      FSDirectory fsd, String fullPath, byte[] path, INode node,
+      byte storagePolicy, int snapshot, boolean isRawPath,
+      INodesInPath iip) throws IOException {
      long size = 0;     // length is zero for directories
      short replication = 0;
      long blocksize = 0;
@@ -380,6 +382,8 @@ class FSDirStatAndListingOp {
      int childrenNum = node.isDirectory() ?
          node.asDirectory().getChildrenNum(snapshot) : 0;
 
+     INodeAttributes nodeAttrs =
+         fsd.getAttributes(fullPath, path, node, snapshot);
      return new HdfsFileStatus(
         size,
         node.isDirectory(),
@@ -387,9 +391,9 @@ class FSDirStatAndListingOp {
         blocksize,
         node.getModificationTime(snapshot),
         node.getAccessTime(snapshot),
-        getPermissionForFileStatus(node, snapshot, isEncrypted),
-        node.getUserName(snapshot),
-        node.getGroupName(snapshot),
+        getPermissionForFileStatus(nodeAttrs, isEncrypted),
+        nodeAttrs.getUserName(),
+        nodeAttrs.getGroupName(),
         node.isSymlink() ? node.asSymlink().getSymlink() : null,
         path,
         node.getId(),
@@ -402,8 +406,9 @@ class FSDirStatAndListingOp {
    * Create FileStatus with location info by file INode
    */
   private static HdfsLocatedFileStatus createLocatedFileStatus(
-      FSDirectory fsd, byte[] path, INode node, byte storagePolicy,
-      int snapshot, boolean isRawPath, INodesInPath iip) throws IOException {
+      FSDirectory fsd, String fullPath, byte[] path, INode node,
+      byte storagePolicy, int snapshot, boolean isRawPath,
+      INodesInPath iip) throws IOException {
     assert fsd.hasReadLock();
     long size = 0; // length is zero for directories
     short replication = 0;
@@ -437,12 +442,14 @@ class FSDirStatAndListingOp {
     int childrenNum = node.isDirectory() ?
         node.asDirectory().getChildrenNum(snapshot) : 0;
 
+    INodeAttributes nodeAttrs =
+        fsd.getAttributes(fullPath, path, node, snapshot);
     HdfsLocatedFileStatus status =
         new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
           blocksize, node.getModificationTime(snapshot),
           node.getAccessTime(snapshot),
-          getPermissionForFileStatus(node, snapshot, isEncrypted),
-          node.getUserName(snapshot), node.getGroupName(snapshot),
+          getPermissionForFileStatus(nodeAttrs, isEncrypted),
+          nodeAttrs.getUserName(), nodeAttrs.getGroupName(),
           node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
           node.getId(), loc, childrenNum, feInfo, storagePolicy);
     // Set caching information for the located blocks.
@@ -467,9 +474,9 @@ class FSDirStatAndListingOp {
    * and encrypted bit on if it represents an encrypted file/dir.
    */
   private static FsPermission getPermissionForFileStatus(
-      INode node, int snapshot, boolean isEncrypted) {
-    FsPermission perm = node.getFsPermission(snapshot);
-    boolean hasAcl = node.getAclFeature(snapshot) != null;
+      INodeAttributes node, boolean isEncrypted) {
+    FsPermission perm = node.getFsPermission();
+    boolean hasAcl = node.getAclFeature() != null;
     if (hasAcl || isEncrypted) {
       perm = new FsPermissionExtension(perm, hasAcl, isEncrypted);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53a28afe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 2f73627..7eea343 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -167,6 +167,12 @@ public class FSDirectory implements Closeable {
 
   private final FSEditLog editLog;
 
+  private INodeAttributeProvider attributeProvider;
+
+  public void setINodeAttributeProvider(INodeAttributeProvider provider) {
+    attributeProvider = provider;
+  }
+
   // utility methods to acquire and release read lock and write lock
   void readLock() {
     this.dirLock.readLock().lock();
@@ -1623,13 +1629,23 @@ public class FSDirectory implements Closeable {
   FSPermissionChecker getPermissionChecker()
     throws AccessControlException {
     try {
-      return new FSPermissionChecker(fsOwnerShortUserName, supergroup,
+      return getPermissionChecker(fsOwnerShortUserName, supergroup,
           NameNode.getRemoteUser());
-    } catch (IOException ioe) {
-      throw new AccessControlException(ioe);
+    } catch (IOException e) {
+      throw new AccessControlException(e);
     }
   }
 
+  @VisibleForTesting
+  FSPermissionChecker getPermissionChecker(String fsOwner, String superGroup,
+      UserGroupInformation ugi) throws AccessControlException {
+    return new FSPermissionChecker(
+        fsOwner, superGroup, ugi,
+        attributeProvider == null ?
+            DefaultINodeAttributesProvider.DEFAULT_PROVIDER
+            : attributeProvider);
+  }
+
   void checkOwner(FSPermissionChecker pc, INodesInPath iip)
       throws AccessControlException {
     checkPermission(pc, iip, true, null, null, null, null);
@@ -1690,7 +1706,8 @@ public class FSDirectory implements Closeable {
   HdfsFileStatus getAuditFileInfo(INodesInPath iip)
       throws IOException {
     return (namesystem.isAuditEnabled() && namesystem.isExternalInvocation())
-        ? FSDirStatAndListingOp.getFileInfo(this, iip, false, false) : null;
+        ? FSDirStatAndListingOp.getFileInfo(this, iip.getPath(), iip, false,
+            false) : null;
   }
 
   /**
@@ -1736,4 +1753,20 @@ public class FSDirectory implements Closeable {
   void resetLastInodeIdWithoutChecking(long newValue) {
     inodeId.setCurrentValue(newValue);
   }
+
+  INodeAttributes getAttributes(String fullPath, byte[] path,
+      INode node, int snapshot) {
+    INodeAttributes nodeAttrs = node;
+    if (attributeProvider != null) {
+      nodeAttrs = node.getSnapshotINode(snapshot);
+      fullPath = fullPath + (fullPath.endsWith(Path.SEPARATOR) ? ""
+                                                               : Path.SEPARATOR)
+          + DFSUtil.bytes2String(path);
+      nodeAttrs = attributeProvider.getAttributes(fullPath, nodeAttrs);
+    } else {
+      nodeAttrs = node.getSnapshotINode(snapshot);
+    }
+    return nodeAttrs;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53a28afe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index ad661ca..f50dc4d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -378,7 +378,7 @@ public class FSEditLogLoader {
         // add the op into retry cache if necessary
         if (toAddRetryCache) {
           HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
-              fsNamesys.dir, HdfsFileStatus.EMPTY_NAME, newFile,
+              fsNamesys.dir, path, HdfsFileStatus.EMPTY_NAME, newFile,
               BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
               false, iip);
           fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
@@ -397,7 +397,7 @@ public class FSEditLogLoader {
           // add the op into retry cache if necessary
           if (toAddRetryCache) {
             HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
-                fsNamesys.dir,
+                fsNamesys.dir, path,
                 HdfsFileStatus.EMPTY_NAME, newFile,
                 BlockStoragePolicySuite.ID_UNSPECIFIED,
                 Snapshot.CURRENT_STATE_ID, false, iip);
@@ -471,7 +471,7 @@ public class FSEditLogLoader {
         // add the op into retry cache if necessary
         if (toAddRetryCache) {
           HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
-              fsNamesys.dir, HdfsFileStatus.EMPTY_NAME, file,
+              fsNamesys.dir, path, HdfsFileStatus.EMPTY_NAME, file,
               BlockStoragePolicySuite.ID_UNSPECIFIED,
               Snapshot.CURRENT_STATE_ID, false, iip);
           fsNamesys.addCacheEntryWithPayload(appendOp.rpcClientId,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53a28afe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 34b5e95..9235425 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -62,6 +62,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CAC
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
@@ -277,6 +278,7 @@ import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.ChunkedArrayList;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.log4j.Appender;
@@ -536,6 +538,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   private final TopConf topConf;
   private TopMetrics topMetrics;
 
+  private INodeAttributeProvider inodeAttributeProvider;
+
   /**
    * Notify that loading of this FSDirectory is complete, and
    * it is imageLoaded for use
@@ -841,6 +845,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
         auditLoggers.get(0) instanceof DefaultAuditLogger;
       this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
+      Class<? extends INodeAttributeProvider> klass = conf.getClass(
+          DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY,
+          null, INodeAttributeProvider.class);
+      if (klass != null) {
+        inodeAttributeProvider = ReflectionUtils.newInstance(klass, conf);
+        LOG.info("Using INode attribute provider: " + klass.getName());
+      }
     } catch(IOException e) {
       LOG.error(getClass().getSimpleName() + " initialization failed.", e);
       close();
@@ -1067,6 +1078,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     
     registerMXBean();
     DefaultMetricsSystem.instance().register(this);
+    if (inodeAttributeProvider != null) {
+      inodeAttributeProvider.start();
+      dir.setINodeAttributeProvider(inodeAttributeProvider);
+    }
     snapshotManager.registerMXBean();
   }
   
@@ -1075,6 +1090,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   void stopCommonServices() {
     writeLock();
+    if (inodeAttributeProvider != null) {
+      dir.setINodeAttributeProvider(null);
+      inodeAttributeProvider.stop();
+    }
     try {
       if (blockManager != null) blockManager.close();
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53a28afe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
index 0508484..e6570f5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.namenode;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.Stack;
 
@@ -30,6 +29,8 @@ import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.AclEntryType;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.server.namenode.INodeAttributeProvider.AccessControlEnforcer;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -41,25 +42,25 @@ import org.apache.hadoop.security.UserGroupInformation;
  * 
  * Some of the helper methods are gaurded by {@link FSNamesystem#readLock()}.
  */
-class FSPermissionChecker {
+class FSPermissionChecker implements AccessControlEnforcer {
   static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
 
   /** @return a string for throwing {@link AccessControlException} */
-  private String toAccessControlString(INode inode, int snapshotId,
+  private String toAccessControlString(INodeAttributes inodeAttrib, String path,
       FsAction access, FsPermission mode) {
-    return toAccessControlString(inode, snapshotId, access, mode, false);
+    return toAccessControlString(inodeAttrib, path, access, mode, false);
   }
 
   /** @return a string for throwing {@link AccessControlException} */
-  private String toAccessControlString(INode inode, int snapshotId, FsAction access,
-      FsPermission mode, boolean deniedFromAcl) {
+  private String toAccessControlString(INodeAttributes inodeAttrib,
+      String path, FsAction access, FsPermission mode, boolean deniedFromAcl) {
     StringBuilder sb = new StringBuilder("Permission denied: ")
-      .append("user=").append(user).append(", ")
+      .append("user=").append(getUser()).append(", ")
       .append("access=").append(access).append(", ")
-      .append("inode=\"").append(inode.getFullPathName()).append("\":")
-      .append(inode.getUserName(snapshotId)).append(':')
-      .append(inode.getGroupName(snapshotId)).append(':')
-      .append(inode.isDirectory() ? 'd' : '-')
+      .append("inode=\"").append(path).append("\":")
+      .append(inodeAttrib.getUserName()).append(':')
+      .append(inodeAttrib.getGroupName()).append(':')
+      .append(inodeAttrib.isDirectory() ? 'd' : '-')
       .append(mode);
     if (deniedFromAcl) {
       sb.append("+");
@@ -67,42 +68,59 @@ class FSPermissionChecker {
     return sb.toString();
   }
 
+  private final String fsOwner;
+  private final String supergroup;
+  private final UserGroupInformation callerUgi;
+
   private final String user;
-  /** A set with group namess. Not synchronized since it is unmodifiable */
   private final Set<String> groups;
   private final boolean isSuper;
+  private final INodeAttributeProvider attributeProvider;
+
 
   FSPermissionChecker(String fsOwner, String supergroup,
-      UserGroupInformation callerUgi) {
-    HashSet<String> s = new HashSet<String>(Arrays.asList(callerUgi.getGroupNames()));
+      UserGroupInformation callerUgi,
+      INodeAttributeProvider attributeProvider) {
+    this.fsOwner = fsOwner;
+    this.supergroup = supergroup;
+    this.callerUgi = callerUgi;
+    HashSet<String> s =
+        new HashSet<String>(Arrays.asList(callerUgi.getGroupNames()));
     groups = Collections.unmodifiableSet(s);
     user = callerUgi.getShortUserName();
     isSuper = user.equals(fsOwner) || groups.contains(supergroup);
+    this.attributeProvider = attributeProvider;
   }
 
-  /**
-   * Check if the callers group contains the required values.
-   * @param group group to check
-   */
-  public boolean containsGroup(String group) {return groups.contains(group);}
+  public boolean containsGroup(String group) {
+    return groups.contains(group);
+  }
 
   public String getUser() {
     return user;
   }
-  
+
+  public Set<String> getGroups() {
+    return groups;
+  }
+
   public boolean isSuperUser() {
     return isSuper;
   }
-  
+
+  public INodeAttributeProvider getAttributesProvider() {
+    return attributeProvider;
+  }
+
   /**
    * Verify if the caller has the required permission. This will result into 
    * an exception if the caller is not allowed to access the resource.
    */
   public void checkSuperuserPrivilege()
       throws AccessControlException {
-    if (!isSuper) {
+    if (!isSuperUser()) {
       throw new AccessControlException("Access denied for user " 
-          + user + ". Superuser privilege is required");
+          + getUser() + ". Superuser privilege is required");
     }
   }
   
@@ -154,64 +172,98 @@ class FSPermissionChecker {
     // check if (parentAccess != null) && file exists, then check sb
     // If resolveLink, the check is performed on the link target.
     final int snapshotId = inodesInPath.getPathSnapshotId();
-    final int length = inodesInPath.length();
-    final INode last = length > 0 ? inodesInPath.getLastINode() : null;
-    final INode parent = length > 1 ? inodesInPath.getINode(-2) : null;
+    final INode[] inodes = inodesInPath.getINodesArray();
+    final INodeAttributes[] inodeAttrs = new INodeAttributes[inodes.length];
+    final byte[][] pathByNameArr = new byte[inodes.length][];
+    for (int i = 0; i < inodes.length && inodes[i] != null; i++) {
+      if (inodes[i] != null) {
+        pathByNameArr[i] = inodes[i].getLocalNameBytes();
+        inodeAttrs[i] = getINodeAttrs(pathByNameArr, i, inodes[i], snapshotId);
+      }
+    }
+
+    String path = inodesInPath.getPath();
+    int ancestorIndex = inodes.length - 2;
+
+    AccessControlEnforcer enforcer =
+        getAttributesProvider().getExternalAccessControlEnforcer(this);
+    enforcer.checkPermission(fsOwner, supergroup, callerUgi, inodeAttrs, inodes,
+        pathByNameArr, snapshotId, path, ancestorIndex, doCheckOwner,
+        ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir);
+  }
 
-    checkTraverse(inodesInPath, snapshotId);
+  @Override
+  public void checkPermission(String fsOwner, String supergroup,
+      UserGroupInformation callerUgi, INodeAttributes[] inodeAttrs,
+      INode[] inodes, byte[][] pathByNameArr, int snapshotId, String path,
+      int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
+      FsAction parentAccess, FsAction access, FsAction subAccess,
+      boolean ignoreEmptyDir)
+      throws AccessControlException {
+    for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
+        ancestorIndex--);
+    checkTraverse(inodeAttrs, path, ancestorIndex);
 
+    final INodeAttributes last = inodeAttrs[inodeAttrs.length - 1];
     if (parentAccess != null && parentAccess.implies(FsAction.WRITE)
-        && length > 1 && last != null) {
-      checkStickyBit(parent, last, snapshotId);
+        && inodeAttrs.length > 1 && last != null) {
+      checkStickyBit(inodeAttrs[inodeAttrs.length - 2], last);
     }
-    if (ancestorAccess != null && length > 1) {
-      List<INode> inodes = inodesInPath.getReadOnlyINodes();
-      INode ancestor = null;
-      for (int i = inodes.size() - 2; i >= 0 && (ancestor = inodes.get(i)) ==
-          null; i--);
-      check(ancestor, snapshotId, ancestorAccess);
+    if (ancestorAccess != null && inodeAttrs.length > 1) {
+      check(inodeAttrs, path, ancestorIndex, ancestorAccess);
     }
-    if (parentAccess != null && length > 1 && parent != null) {
-      check(parent, snapshotId, parentAccess);
+    if (parentAccess != null && inodeAttrs.length > 1) {
+      check(inodeAttrs, path, inodeAttrs.length - 2, parentAccess);
     }
     if (access != null) {
-      check(last, snapshotId, access);
+      check(last, path, access);
     }
     if (subAccess != null) {
-      checkSubAccess(last, snapshotId, subAccess, ignoreEmptyDir);
+      INode rawLast = inodes[inodeAttrs.length - 1];
+      checkSubAccess(pathByNameArr, inodeAttrs.length - 1, rawLast,
+          snapshotId, subAccess, ignoreEmptyDir);
     }
     if (doCheckOwner) {
-      checkOwner(last, snapshotId);
+      checkOwner(last);
     }
   }
 
+  private INodeAttributes getINodeAttrs(byte[][] pathByNameArr, int pathIdx,
+      INode inode, int snapshotId) {
+    INodeAttributes inodeAttrs = inode.getSnapshotINode(snapshotId);
+    if (getAttributesProvider() != null) {
+      String[] elements = new String[pathIdx + 1];
+      for (int i = 0; i < elements.length; i++) {
+        elements[i] = DFSUtil.bytes2String(pathByNameArr[i]);
+      }
+      inodeAttrs = getAttributesProvider().getAttributes(elements, inodeAttrs);
+    }
+    return inodeAttrs;
+  }
+
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkOwner(INode inode, int snapshotId
+  private void checkOwner(INodeAttributes inode
       ) throws AccessControlException {
-    if (inode != null && user.equals(inode.getUserName(snapshotId))) {
+    if (getUser().equals(inode.getUserName())) {
       return;
     }
     throw new AccessControlException(
             "Permission denied. user="
-            + user + " is not the owner of inode=" + inode);
+            + getUser() + " is not the owner of inode=" + inode);
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkTraverse(INodesInPath iip, int snapshotId)
-      throws AccessControlException {
-    List<INode> inodes = iip.getReadOnlyINodes();
-    for (int i = 0; i < inodes.size() - 1; i++) {
-      INode inode = inodes.get(i);
-      if (inode == null) {
-        break;
-      }
-      check(inode, snapshotId, FsAction.EXECUTE);
+  private void checkTraverse(INodeAttributes[] inodes, String path, int last
+      ) throws AccessControlException {
+    for(int j = 0; j <= last; j++) {
+      check(inodes[j], path, FsAction.EXECUTE);
     }
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkSubAccess(INode inode, int snapshotId, FsAction access,
-      boolean ignoreEmptyDir) throws AccessControlException {
+  private void checkSubAccess(byte[][] pathByNameArr, int pathIdx, INode inode,
+      int snapshotId, FsAction access, boolean ignoreEmptyDir)
+      throws AccessControlException {
     if (inode == null || !inode.isDirectory()) {
       return;
     }
@@ -221,7 +273,9 @@ class FSPermissionChecker {
       INodeDirectory d = directories.pop();
       ReadOnlyList<INode> cList = d.getChildrenList(snapshotId);
       if (!(cList.isEmpty() && ignoreEmptyDir)) {
-        check(d, snapshotId, access);
+        //TODO have to figure this out with inodeattribute provider
+        check(getINodeAttrs(pathByNameArr, pathIdx, d, snapshotId),
+            inode.getFullPathName(), access);
       }
 
       for(INode child : cList) {
@@ -233,37 +287,37 @@ class FSPermissionChecker {
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void check(INode inode, int snapshotId, FsAction access)
-      throws AccessControlException {
+  private void check(INodeAttributes[] inodes, String path, int i, FsAction access
+      ) throws AccessControlException {
+    check(i >= 0 ? inodes[i] : null, path, access);
+  }
+
+  private void check(INodeAttributes inode, String path, FsAction access
+      ) throws AccessControlException {
     if (inode == null) {
       return;
     }
-    FsPermission mode = inode.getFsPermission(snapshotId);
-    AclFeature aclFeature = inode.getAclFeature(snapshotId);
+    final FsPermission mode = inode.getFsPermission();
+    final AclFeature aclFeature = inode.getAclFeature();
     if (aclFeature != null) {
       // It's possible that the inode has a default ACL but no access ACL.
       int firstEntry = aclFeature.getEntryAt(0);
       if (AclEntryStatusFormat.getScope(firstEntry) == AclEntryScope.ACCESS) {
-        checkAccessAcl(inode, snapshotId, access, mode, aclFeature);
+        checkAccessAcl(inode, path, access, mode, aclFeature);
         return;
       }
     }
-    checkFsPermission(inode, snapshotId, access, mode);
-  }
-
-  private void checkFsPermission(INode inode, int snapshotId, FsAction access,
-      FsPermission mode) throws AccessControlException {
-    if (user.equals(inode.getUserName(snapshotId))) { //user class
+    if (getUser().equals(inode.getUserName())) { //user class
       if (mode.getUserAction().implies(access)) { return; }
     }
-    else if (groups.contains(inode.getGroupName(snapshotId))) { //group class
+    else if (getGroups().contains(inode.getGroupName())) { //group class
       if (mode.getGroupAction().implies(access)) { return; }
     }
     else { //other class
       if (mode.getOtherAction().implies(access)) { return; }
     }
     throw new AccessControlException(
-      toAccessControlString(inode, snapshotId, access, mode));
+        toAccessControlString(inode, path, access, mode));
   }
 
   /**
@@ -282,20 +336,20 @@ class FSPermissionChecker {
    * - The other entry must not have a name.
    * - Default entries may be present, but they are ignored during enforcement.
    *
-   * @param inode INode accessed inode
+   * @param inode INodeAttributes accessed inode
    * @param snapshotId int snapshot ID
    * @param access FsAction requested permission
    * @param mode FsPermission mode from inode
    * @param aclFeature AclFeature of inode
    * @throws AccessControlException if the ACL denies permission
    */
-  private void checkAccessAcl(INode inode, int snapshotId, FsAction access,
-      FsPermission mode, AclFeature aclFeature)
+  private void checkAccessAcl(INodeAttributes inode, String path,
+      FsAction access, FsPermission mode, AclFeature aclFeature)
       throws AccessControlException {
     boolean foundMatch = false;
 
     // Use owner entry from permission bits if user is owner.
-    if (user.equals(inode.getUserName(snapshotId))) {
+    if (getUser().equals(inode.getUserName())) {
       if (mode.getUserAction().implies(access)) {
         return;
       }
@@ -314,7 +368,7 @@ class FSPermissionChecker {
         if (type == AclEntryType.USER) {
           // Use named user entry with mask from permission bits applied if user
           // matches name.
-          if (user.equals(name)) {
+          if (getUser().equals(name)) {
             FsAction masked = AclEntryStatusFormat.getPermission(entry).and(
                 mode.getGroupAction());
             if (masked.implies(access)) {
@@ -328,8 +382,8 @@ class FSPermissionChecker {
           // applied if user is a member and entry grants access.  If user is a
           // member of multiple groups that have entries that grant access, then
           // it doesn't matter which is chosen, so exit early after first match.
-          String group = name == null ? inode.getGroupName(snapshotId) : name;
-          if (groups.contains(group)) {
+          String group = name == null ? inode.getGroupName() : name;
+          if (getGroups().contains(group)) {
             FsAction masked = AclEntryStatusFormat.getPermission(entry).and(
                 mode.getGroupAction());
             if (masked.implies(access)) {
@@ -347,28 +401,28 @@ class FSPermissionChecker {
     }
 
     throw new AccessControlException(
-      toAccessControlString(inode, snapshotId, access, mode, true));
+        toAccessControlString(inode, path, access, mode));
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkStickyBit(INode parent, INode inode, int snapshotId
+  private void checkStickyBit(INodeAttributes parent, INodeAttributes inode
       ) throws AccessControlException {
-    if(!parent.getFsPermission(snapshotId).getStickyBit()) {
+    if (!parent.getFsPermission().getStickyBit()) {
       return;
     }
 
     // If this user is the directory owner, return
-    if(parent.getUserName(snapshotId).equals(user)) {
+    if (parent.getUserName().equals(getUser())) {
       return;
     }
 
     // if this user is the file owner, return
-    if(inode.getUserName(snapshotId).equals(user)) {
+    if (inode.getUserName().equals(getUser())) {
       return;
     }
 
     throw new AccessControlException("Permission denied by sticky bit setting:" +
-      " user=" + user + ", inode=" + inode);
+      " user=" + getUser() + ", inode=" + inode);
   }
 
   /**
@@ -384,11 +438,11 @@ class FSPermissionChecker {
     if (isSuperUser()) {
       return;
     }
-    if (user.equals(pool.getOwnerName())
+    if (getUser().equals(pool.getOwnerName())
         && mode.getUserAction().implies(access)) {
       return;
     }
-    if (groups.contains(pool.getGroupName())
+    if (getGroups().contains(pool.getGroupName())
         && mode.getGroupAction().implies(access)) {
       return;
     }
@@ -396,7 +450,7 @@ class FSPermissionChecker {
       return;
     }
     throw new AccessControlException("Permission denied while accessing pool "
-        + pool.getPoolName() + ": user " + user + " does not have "
+        + pool.getPoolName() + ": user " + getUser() + " does not have "
         + access.toString() + " permissions.");
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53a28afe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributeProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributeProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributeProvider.java
new file mode 100644
index 0000000..b12e147
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributeProvider.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public abstract class INodeAttributeProvider {
+
+  /**
+   * The AccessControlEnforcer allows implementations to override the
+   * default File System permission checking logic enforced on a file system
+   * object
+   */
+  public interface AccessControlEnforcer {
+
+    /**
+     * Checks permission on a file system object. Has to throw an Exception
+     * if the filesystem object is not accessessible by the calling Ugi.
+     * @param fsOwner Filesystem owner (The Namenode user)
+     * @param supergroup super user geoup
+     * @param callerUgi UserGroupInformation of the caller
+     * @param inodeAttrs Array of INode attributes for each path element in the
+     *                   the path
+     * @param inodes Array of INodes for each path element in the path
+     * @param pathByNameArr Array of byte arrays of the LocalName
+     * @param snapshotId the snapshotId of the requested path
+     * @param path Path String
+     * @param ancestorIndex Index of ancestor
+     * @param doCheckOwner perform ownership check
+     * @param ancestorAccess The access required by the ancestor of the path.
+     * @param parentAccess The access required by the parent of the path.
+     * @param access The access required by the path.
+     * @param subAccess If path is a directory, It is the access required of
+     *                  the path and all the sub-directories. If path is not a
+     *                  directory, there should ideally be no effect.
+     * @param ignoreEmptyDir Ignore permission checking for empty directory?
+     * @throws AccessControlException
+     */
+    public abstract void checkPermission(String fsOwner, String supergroup,
+        UserGroupInformation callerUgi, INodeAttributes[] inodeAttrs,
+        INode[] inodes, byte[][] pathByNameArr, int snapshotId, String path,
+        int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
+        FsAction parentAccess, FsAction access, FsAction subAccess,
+        boolean ignoreEmptyDir)
+            throws AccessControlException;
+
+  }
+  /**
+   * Initialize the provider. This method is called at NameNode startup
+   * time.
+   */
+  public abstract void start();
+
+  /**
+   * Shutdown the provider. This method is called at NameNode shutdown time.
+   */
+  public abstract void stop();
+
+  @VisibleForTesting
+  String[] getPathElements(String path) {
+    path = path.trim();
+    if (path.charAt(0) != Path.SEPARATOR_CHAR) {
+      throw new IllegalArgumentException("It must be an absolute path: " +
+          path);
+    }
+    int numOfElements = StringUtils.countMatches(path, Path.SEPARATOR);
+    if (path.length() > 1 && path.endsWith(Path.SEPARATOR)) {
+      numOfElements--;
+    }
+    String[] pathElements = new String[numOfElements];
+    int elementIdx = 0;
+    int idx = 0;
+    int found = path.indexOf(Path.SEPARATOR_CHAR, idx);
+    while (found > -1) {
+      if (found > idx) {
+        pathElements[elementIdx++] = path.substring(idx, found);
+      }
+      idx = found + 1;
+      found = path.indexOf(Path.SEPARATOR_CHAR, idx);
+    }
+    if (idx < path.length()) {
+      pathElements[elementIdx] = path.substring(idx);
+    }
+    return pathElements;
+  }
+
+  public INodeAttributes getAttributes(String fullPath, INodeAttributes inode) {
+    return getAttributes(getPathElements(fullPath), inode);
+  }
+
+  public abstract INodeAttributes getAttributes(String[] pathElements,
+      INodeAttributes inode);
+
+  /**
+   * Can be over-ridden by implementations to provide a custom Access Control
+   * Enforcer that can provide an alternate implementation of the
+   * default permission checking logic.
+   * @param defaultEnforcer The Default AccessControlEnforcer
+   * @return The AccessControlEnforcer to use
+   */
+  public AccessControlEnforcer getExternalAccessControlEnforcer(
+      AccessControlEnforcer defaultEnforcer) {
+    return defaultEnforcer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53a28afe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java
index 0f76b68..7b780c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java
@@ -28,6 +28,9 @@ import org.apache.hadoop.hdfs.server.namenode.XAttrFeature;
  */
 @InterfaceAudience.Private
 public interface INodeAttributes {
+
+  public boolean isDirectory();
+
   /**
    * @return null if the local name is null;
    *         otherwise, return the local name byte array.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53a28afe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
index 956deae..240aa15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
@@ -52,6 +52,10 @@ public interface INodeDirectoryAttributes extends INodeAttributes {
           storageSpace(-1).typeSpaces(-1).build();
     }
 
+    public boolean isDirectory() {
+      return true;
+    }
+
     @Override
     public boolean metadataEquals(INodeDirectoryAttributes other) {
       return other != null

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53a28afe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
index 0f85bab..204c8ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
@@ -60,6 +60,11 @@ public interface INodeFileAttributes extends INodeAttributes {
     }
 
     @Override
+    public boolean isDirectory() {
+      return false;
+    }
+
+    @Override
     public short getFileReplication() {
       return HeaderFormat.getReplication(header);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53a28afe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
index 389b62b..f1892c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
@@ -376,6 +376,12 @@ public class INodesInPath {
     return Collections.unmodifiableList(Arrays.asList(inodes));
   }
 
+  public INode[] getINodesArray() {
+    INode[] retArr = new INode[inodes.length];
+    System.arraycopy(inodes, 0, retArr, 0, inodes.length);
+    return retArr;
+  }
+
   /**
    * @param length number of ancestral INodes in the returned INodesInPath
    *               instance

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53a28afe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java
index 883029a..0154a03 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java
@@ -403,7 +403,7 @@ public class TestFSPermissionChecker {
   private void assertPermissionGranted(UserGroupInformation user, String path,
       FsAction access) throws IOException {
     INodesInPath iip = dir.getINodesInPath(path, true);
-    new FSPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(iip,
+    dir.getPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(iip,
       false, null, null, access, null, false);
   }
 
@@ -411,7 +411,7 @@ public class TestFSPermissionChecker {
       FsAction access) throws IOException {
     try {
       INodesInPath iip = dir.getINodesInPath(path, true);
-      new FSPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(iip,
+      dir.getPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(iip,
         false, null, null, access, null, false);
       fail("expected AccessControlException for user + " + user + ", path = " +
         path + ", access = " + access);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53a28afe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeAttributeProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeAttributeProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeAttributeProvider.java
new file mode 100644
index 0000000..111c67c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeAttributeProvider.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.INodeAttributeProvider.AccessControlEnforcer;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestINodeAttributeProvider {
+  private MiniDFSCluster miniDFS;
+  private static final Set<String> CALLED = new HashSet<String>();
+
+  public static class MyAuthorizationProvider extends INodeAttributeProvider {
+
+    public static class MyAccessControlEnforcer implements AccessControlEnforcer {
+
+      @Override
+      public void checkPermission(String fsOwner, String supergroup,
+          UserGroupInformation ugi, INodeAttributes[] inodeAttrs,
+          INode[] inodes, byte[][] pathByNameArr, int snapshotId, String path,
+          int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
+          FsAction parentAccess, FsAction access, FsAction subAccess,
+          boolean ignoreEmptyDir) throws AccessControlException {
+        CALLED.add("checkPermission|" + ancestorAccess + "|" + parentAccess + "|" + access);
+      }
+    }
+
+    @Override
+    public void start() {
+      CALLED.add("start");
+    }
+
+    @Override
+    public void stop() {
+      CALLED.add("stop");
+    }
+
+    @Override
+    public INodeAttributes getAttributes(String[] pathElements,
+        final INodeAttributes inode) {
+      CALLED.add("getAttributes");
+      final boolean useDefault = useDefault(pathElements);
+      return new INodeAttributes() {
+        @Override
+        public boolean isDirectory() {
+          return inode.isDirectory();
+        }
+
+        @Override
+        public byte[] getLocalNameBytes() {
+          return inode.getLocalNameBytes();
+        }
+
+        @Override
+        public String getUserName() {
+          return (useDefault) ? inode.getUserName() : "foo";
+        }
+
+        @Override
+        public String getGroupName() {
+          return (useDefault) ? inode.getGroupName() : "bar";
+        }
+
+        @Override
+        public FsPermission getFsPermission() {
+          return (useDefault) ? inode.getFsPermission()
+                              : new FsPermission(getFsPermissionShort());
+        }
+
+        @Override
+        public short getFsPermissionShort() {
+          return (useDefault) ? inode.getFsPermissionShort()
+                              : (short) getPermissionLong();
+        }
+
+        @Override
+        public long getPermissionLong() {
+          return (useDefault) ? inode.getPermissionLong() : 0770;
+        }
+
+        @Override
+        public AclFeature getAclFeature() {
+          AclFeature f;
+          if (useDefault) {
+            f = inode.getAclFeature();
+          } else {
+            AclEntry acl = new AclEntry.Builder().setType(AclEntryType.GROUP).
+                setPermission(FsAction.ALL).setName("xxx").build();
+            f = new AclFeature(AclEntryStatusFormat.toInt(
+                Lists.newArrayList(acl)));
+          }
+          return f;
+        }
+
+        @Override
+        public XAttrFeature getXAttrFeature() {
+          return (useDefault) ? inode.getXAttrFeature() : null;
+        }
+
+        @Override
+        public long getModificationTime() {
+          return (useDefault) ? inode.getModificationTime() : 0;
+        }
+
+        @Override
+        public long getAccessTime() {
+          return (useDefault) ? inode.getAccessTime() : 0;
+        }
+      };
+
+    }
+
+    @Override
+    public AccessControlEnforcer getExternalAccessControlEnforcer(
+        AccessControlEnforcer deafultEnforcer) {
+      return new MyAccessControlEnforcer();
+    }
+
+    private boolean useDefault(String[] pathElements) {
+      return (pathElements.length < 2) ||
+          !(pathElements[0].equals("user") && pathElements[1].equals("authz"));
+    }
+
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    CALLED.clear();
+    Configuration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY,
+        MyAuthorizationProvider.class.getName());
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+    miniDFS = new MiniDFSCluster.Builder(conf).build();
+  }
+
+  @After
+  public void cleanUp() throws IOException {
+    CALLED.clear();
+    if (miniDFS != null) {
+      miniDFS.shutdown();
+    }
+    Assert.assertTrue(CALLED.contains("stop"));
+  }
+
+  @Test
+  public void testDelegationToProvider() throws Exception {
+    Assert.assertTrue(CALLED.contains("start"));
+    FileSystem fs = FileSystem.get(miniDFS.getConfiguration(0));
+    fs.mkdirs(new Path("/tmp"));
+    fs.setPermission(new Path("/tmp"), new FsPermission((short) 0777));
+    UserGroupInformation ugi = UserGroupInformation.createUserForTesting("u1",
+        new String[]{"g1"});
+    ugi.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        FileSystem fs = FileSystem.get(miniDFS.getConfiguration(0));
+        CALLED.clear();
+        fs.mkdirs(new Path("/tmp/foo"));
+        Assert.assertTrue(CALLED.contains("getAttributes"));
+        Assert.assertTrue(CALLED.contains("checkPermission|null|null|null"));
+        Assert.assertTrue(CALLED.contains("checkPermission|WRITE|null|null"));
+        CALLED.clear();
+        fs.listStatus(new Path("/tmp/foo"));
+        Assert.assertTrue(CALLED.contains("getAttributes"));
+        Assert.assertTrue(
+            CALLED.contains("checkPermission|null|null|READ_EXECUTE"));
+        CALLED.clear();
+        fs.getAclStatus(new Path("/tmp/foo"));
+        Assert.assertTrue(CALLED.contains("getAttributes"));
+        Assert.assertTrue(CALLED.contains("checkPermission|null|null|null"));
+        return null;
+      }
+    });
+  }
+
+  @Test
+  public void testCustomProvider() throws Exception {
+    FileSystem fs = FileSystem.get(miniDFS.getConfiguration(0));
+    fs.mkdirs(new Path("/user/xxx"));
+    FileStatus status = fs.getFileStatus(new Path("/user/xxx"));
+    Assert.assertEquals(System.getProperty("user.name"), status.getOwner());
+    Assert.assertEquals("supergroup", status.getGroup());
+    Assert.assertEquals(new FsPermission((short)0755), status.getPermission());
+    fs.mkdirs(new Path("/user/authz"));
+    status = fs.getFileStatus(new Path("/user/authz"));
+    Assert.assertEquals("foo", status.getOwner());
+    Assert.assertEquals("bar", status.getGroup());
+    Assert.assertEquals(new FsPermission((short) 0770), status.getPermission());
+  }
+
+}