You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ha...@apache.org on 2017/01/27 23:20:50 UTC

[3/7] sentry git commit: SENTRY-1404

SENTRY-1404

Change-Id: Iaee8cacb457bcffcaf081871b0fa59147f824f6a


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/81facc62
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/81facc62
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/81facc62

Branch: refs/heads/sentry-ha-redesign-1
Commit: 81facc62e64aea82b5d80cb34d29b519b1b4bc58
Parents: ee2d3f7
Author: hahao <ha...@cloudera.com>
Authored: Thu Jan 26 17:37:01 2017 -0800
Committer: hahao <ha...@cloudera.com>
Committed: Thu Jan 26 17:37:01 2017 -0800

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 sentry-binding/sentry-binding-solr/pom.xml      |   9 +
 .../server/namenode/AuthorizationProvider.java  | 411 -----------------
 .../hdfs/SentryAuthorizationConstants.java      |  14 +-
 .../hdfs/SentryAuthorizationProvider.java       | 437 -------------------
 .../hdfs/SentryINodeAttributesProvider.java     | 386 ++++++++++++++++
 .../hdfs/MockSentryAuthorizationProvider.java   |  26 --
 .../hdfs/MockSentryINodeAttributesProvider.java |  26 ++
 .../hdfs/TestSentryAuthorizationProvider.java   | 220 ----------
 .../hdfs/TestSentryINodeAttributesProvider.java | 218 +++++++++
 .../service/persistent/TransactionManager.java  |   2 +-
 .../tests/e2e/hdfs/TestHDFSIntegration.java     |   8 +-
 .../tests/e2e/hdfs/TestHDFSIntegration.java     |  40 +-
 13 files changed, 676 insertions(+), 1123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/81facc62/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b9282e7..359897c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,7 +68,7 @@ limitations under the License.
     <easymock.version>3.0</easymock.version>
     <fest.reflect.version>1.4.1</fest.reflect.version>
     <guava.version>11.0.2</guava.version>
-    <hadoop.version>2.6.0</hadoop.version>
+    <hadoop.version>2.7.2</hadoop.version>
     <hamcrest.version>1.3</hamcrest.version>
     <hive-v2.version>2.0.0</hive-v2.version>
     <hive.version>1.1.0</hive.version>

http://git-wip-us.apache.org/repos/asf/sentry/blob/81facc62/sentry-binding/sentry-binding-solr/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-solr/pom.xml b/sentry-binding/sentry-binding-solr/pom.xml
index a63a600..2bbb2ce 100644
--- a/sentry-binding/sentry-binding-solr/pom.xml
+++ b/sentry-binding/sentry-binding-solr/pom.xml
@@ -62,6 +62,15 @@ limitations under the License.
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-auth</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-minicluster</artifactId>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/sentry/blob/81facc62/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/hadoop/hdfs/server/namenode/AuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/hadoop/hdfs/server/namenode/AuthorizationProvider.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/hadoop/hdfs/server/namenode/AuthorizationProvider.java
deleted file mode 100644
index 383d64d..0000000
--- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/hadoop/hdfs/server/namenode/AuthorizationProvider.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
-import org.apache.hadoop.security.AccessControlException;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Implementations of this interface are called from within an 
- * <code>inode</code> to set or return authorization related information.
- * <p/>
- * The HDFS default implementation, {@link DefaultAuthorizationProvider} uses
- * the <code>inode</code> itself to retrieve and store information.
- * <p/>
- * A custom implementation may use a different authorization store and enforce
- * the permission check using alternate logic.
- * <p/>
- * It is expected that an implementation of the provider will not call external 
- * systems or realize expensive computations on any of the methods defined by 
- * the provider interface as they are typically invoked within remote client 
- * filesystem operations.
- * <p/>
- * If calls to external systems are required, they should be done 
- * asynchronously from the provider methods.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public abstract class AuthorizationProvider {
-
-  private static final ThreadLocal<Boolean> CLIENT_OP_TL =
-      new ThreadLocal<Boolean>() {
-        @Override
-        protected Boolean initialValue() {
-          return Boolean.FALSE;
-        }
-      };
-
-  static void beginClientOp() {
-    CLIENT_OP_TL.set(Boolean.TRUE);
-  }
-
-  static void endClientOp() {
-    CLIENT_OP_TL.set(Boolean.FALSE);
-  }
-
-  private static AuthorizationProvider provider;
-
-  /**
-   * Return the authorization provider singleton for the NameNode.
-   * 
-   * @return the authorization provider
-   */
-  public static AuthorizationProvider get() {
-    return provider;  
-  }
-
-  /**
-   * Set the authorization provider singleton for the NameNode. The 
-   * provider must be started (before being set) and stopped by the setter.
-   * 
-   * @param authzProvider the authorization provider
-   */
-  static void set(AuthorizationProvider authzProvider) {
-    provider = authzProvider;
-  }
-
-  /**
-   * Constant that indicates current state (as opposed to a particular snapshot 
-   * ID) when retrieving authorization information from the provider.
-   */
-  public static final int CURRENT_STATE_ID = Snapshot.CURRENT_STATE_ID;
-
-  /**
-   * This interface exposes INode read-only information relevant for 
-   * authorization decisions.
-   * 
-   * @see AuthorizationProvider
-   */
-  @InterfaceAudience.Public
-  @InterfaceStability.Unstable
-  public interface INodeAuthorizationInfo {
-
-    /**
-     * Return the inode unique ID. This value never changes.
-     * 
-     * @return the inode unique ID.
-     */
-    long getId();
-
-    /**
-     * Return the inode path element name. This value may change.
-     * @return the inode path element name.
-     */
-    String getLocalName();
-
-    /**
-     * Return the parent inode. This value may change.
-     * 
-     * @return the parent inode.
-     */
-    INodeAuthorizationInfo getParent();
-
-    /**
-     * Return the inode full path. This value may change.
-     *
-     * @return the inode full path
-     */
-    String getFullPathName();
-
-    /**
-     * Return if the inode is a directory or not.
-     *
-     * @return <code>TRUE</code> if the inode is a directory, 
-     * <code>FALSE</code> otherwise.
-     */
-    boolean isDirectory();
-
-    /**
-     * Return the inode user for the specified snapshot.
-     * 
-     * @param snapshotId a snapshot ID or {@link #CURRENT_STATE_ID} for latest 
-     * value.
-     * @return the inode user for the specified snapshot.
-     */
-    String getUserName(int snapshotId);
-
-    /**
-     * Return the inode group for the specified snapshot.
-     *
-     * @param snapshotId a snapshot ID or {@link #CURRENT_STATE_ID} for latest
-     * value.
-     * @return the inode group for the specified snapshot.
-     */
-    String getGroupName(int snapshotId);
-
-    /**
-     * Return the inode permission for the specified snapshot.
-     *
-     * @param snapshotId a snapshot ID or {@link #CURRENT_STATE_ID} for latest
-     * value.
-     * @return the inode permission for the specified snapshot.
-     */
-    FsPermission getFsPermission(int snapshotId);
-
-    /**
-     * Return the inode ACL feature for the specified snapshot.
-     *
-     * @param snapshotId a snapshot ID or {@link #CURRENT_STATE_ID} for latest
-     * value.
-     * @return the inode ACL feature for the specified snapshot.
-     */
-    AclFeature getAclFeature(int snapshotId);
-
-  }
-
-  /**
-   * Indicates if the current provider method invocation is part of a client 
-   * operation or it is an internal NameNode call (i.e. a FS image or an edit 
-   * log  operation).
-   * 
-   * @return <code>TRUE</code> if the provider method invocation is being 
-   * done as part of a client operation, <code>FALSE</code> otherwise.
-   */
-  protected final boolean isClientOp() {
-    return Boolean.TRUE.equals(CLIENT_OP_TL.get());
-  }
-
-  /**
-   * Initialize the provider. This method is called at NameNode startup 
-   * time.
-   */
-  public void start() {    
-  }
-
-  /**
-   * Shutdown the provider. This method is called at NameNode shutdown time.
-   */
-  public void stop() {    
-  }
-
-  /**
-   * Set all currently snapshot-able directories and their corresponding last 
-   * snapshot ID. This method is called at NameNode startup.
-   * <p/>
-   * A provider implementation that keeps authorization information on per 
-   * snapshot basis can use this call to initialize/re-sync its information with
-   * the NameNode snapshot-able directories information.
-   * 
-   * @param snapshotableDirs a map with all the currently snapshot-able 
-   * directories and their corresponding last snapshot ID
-   */
-  public void setSnaphottableDirs(Map<INodeAuthorizationInfo, Integer> 
-      snapshotableDirs) {
-  }
-
-  /**
-   * Add a directory as snapshot-able.
-   * <p/>
-   * A provider implementation that keeps authorization information on per 
-   * snapshot basis can use this call to prepare itself for snapshots on the
-   * specified directory.
-   * 
-   * @param dir snapshot-able directory to add
-   */
-  public void addSnapshottable(INodeAuthorizationInfo dir) {
-  }
-
-  /**
-   * Remove a directory as snapshot-able.
-   * <p/>
-   * A provider implementation that keeps authorization information on per 
-   * snapshot basis can use this call to clean up any snapshot on the
-   * specified directory.
-   *
-   * @param dir snapshot-able directory to remove
-   */
-  public void removeSnapshottable(INodeAuthorizationInfo dir) {
-  }
-
-  /**
-   * Create a snapshot for snapshot-able directory.
-   * <p/>
-   * A provider implementation that keeps authorization information on per
-   * snapshot basis can use this call to perform any snapshot related 
-   * bookkeeping on the specified directory because of the snapshot creation.
-   *
-   * @param dir directory to make a snapshot of
-   * @param snapshotId the snapshot ID to create
-   */
-  public void createSnapshot(INodeAuthorizationInfo dir, int snapshotId)
-      throws IOException {    
-  }
-  
-  /**
-   * Remove a snapshot for snapshot-able directory.
-   * <p/>
-   * A provider implementation that keeps authorization information on per
-   * snapshot basis can use this call to perform any snapshot related
-   * bookkeeping on the specified directory because of the snapshot removal.
-   *
-   * @param dir directory to remove a snapshot from
-   * @param snapshotId the snapshot ID to remove
-   */
-  public void removeSnapshot(INodeAuthorizationInfo dir, int snapshotId)
-      throws IOException {
-  }
-  
-  /**
-   * Set the user for an inode.
-   * <p/>
-   * This method is always call within a Filesystem LOCK.
-   * 
-   * @param node inode
-   * @param user user name
-   */
-  public abstract void setUser(INodeAuthorizationInfo node, String user);
-
-  /**
-   * Get the user of an inode.
-   * <p/>
-   * This method is always call within a Filesystem LOCK.
-   * 
-   * @param node inode
-   * @param snapshotId snapshot ID of the inode to get the user from
-   * @return the user of the inode
-   */
-  public abstract String getUser(INodeAuthorizationInfo node, int snapshotId);
-
-  /**
-   * Set teh group of an inode.
-   * <p/>
-   * This method is always call within a Filesystem LOCK.
-   * 
-   * @param node inode
-   * @param group group name
-   */
-  public abstract void setGroup(INodeAuthorizationInfo node, String group);
-
-  /**
-   * Get the group of an inode.
-   * <p/>
-   * This method is always call within a Filesystem LOCK.
-   *
-   * @param node inode
-   * @param snapshotId snapshot ID of the inode to get the group from
-   * @return the group of the inode
-   */
-  public abstract String getGroup(INodeAuthorizationInfo node, int snapshotId);
-
-  /**
-   * Set the permission of an inode.
-   * <p/>
-   * This method is always call within a Filesystem LOCK.
-   * 
-   * @param node inode
-   * @param permission the permission to set
-   */
-  public abstract void setPermission(INodeAuthorizationInfo node, 
-      FsPermission permission);
-
-  /**
-   * Get the permission of an inode.
-   * <p/>
-   * This method is always call within a Filesystem LOCK.
-   * 
-   * @param node inode
-   * @param snapshotId snapshot ID of the inode to get the permission from
-   * @return the permission of the inode
-   */
-  public abstract FsPermission getFsPermission(INodeAuthorizationInfo node, 
-      int snapshotId);
-
-  /**
-   * Get the ACLs of an inode.
-   * <p/>
-   * This method is always call within a Filesystem LOCK.
-   * 
-   * @param node inode
-   * @param snapshotId snapshot ID of the inode to get the ACLs from
-   * @return the ACLs of the inode
-   */
-  public abstract AclFeature getAclFeature(INodeAuthorizationInfo node, 
-      int snapshotId);
-
-  /**
-   * Remove the ACLs of an inode.
-   * <p/>
-   * This method is always call within a Filesystem LOCK.
-   * 
-   * @param node inode
-   */
-  public abstract void removeAclFeature(INodeAuthorizationInfo node);
-
-  /**
-   * Add ACLs to an inode.
-   * <p/>
-   * This method is always call within a Filesystem LOCK.
-   * 
-   * @param node inode
-   * @param f the ACLs of the inode
-   */
-  public abstract void addAclFeature(INodeAuthorizationInfo node, AclFeature f);
-
-  /**
-   * Check whether current user have permissions to access the path.
-   * Traverse is always checked.
-   * <p/>
-   * This method is always call within a Filesystem LOCK.
-   * <p/>
-   * Parent path means the parent directory for the path.
-   * Ancestor path means the last (the closest) existing ancestor directory
-   * of the path.
-   * <p/>
-   * Note that if the parent path exists,
-   * then the parent path and the ancestor path are the same.
-   * <p/>
-   * For example, suppose the path is "/foo/bar/baz".
-   * No matter baz is a file or a directory,
-   * the parent path is "/foo/bar".
-   * If bar exists, then the ancestor path is also "/foo/bar".
-   * If bar does not exist and foo exists,
-   * then the ancestor path is "/foo".
-   * Further, if both foo and bar do not exist,
-   * then the ancestor path is "/".
-   *
-   * @param user user ot check permissions against
-   * @param groups groups of the user to check permissions against
-   * @param inodes inodes of the path to check permissions
-   * @param snapshotId snapshot ID to check permissions
-   * @param doCheckOwner Require user to be the owner of the path?
-   * @param ancestorAccess The access required by the ancestor of the path.
-   * @param parentAccess The access required by the parent of the path.
-   * @param access The access required by the path.
-   * @param subAccess If path is a directory,
-   * it is the access required of the path and all the sub-directories.
-   * If path is not a directory, there is no effect.
-   * @param ignoreEmptyDir Ignore permission checking for empty directory?
-   * @throws AccessControlException
-   * @throws UnresolvedLinkException
-   */
-  public abstract void checkPermission(String user, Set<String> groups,
-      INodeAuthorizationInfo[] inodes, int snapshotId,
-      boolean doCheckOwner, FsAction ancestorAccess, FsAction parentAccess,
-      FsAction access, FsAction subAccess, boolean ignoreEmptyDir)
-      throws AccessControlException, UnresolvedLinkException;
-
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/81facc62/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationConstants.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationConstants.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationConstants.java
index 8836801..e1714b6 100644
--- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationConstants.java
+++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationConstants.java
@@ -29,19 +29,19 @@ public final class SentryAuthorizationConstants {
   public static final String HDFS_GROUP_KEY = CONFIG_PREFIX + "hdfs-group";
   public static final String HDFS_GROUP_DEFAULT = "hive";
 
-  public static final String HDFS_PERMISSION_KEY = CONFIG_PREFIX + 
+  public static final String HDFS_PERMISSION_KEY = CONFIG_PREFIX +
       "hdfs-permission";
-  public static final long HDFS_PERMISSION_DEFAULT = 771;
+  public static final long HDFS_PERMISSION_DEFAULT = 0771;
 
-  public static final String HDFS_PATH_PREFIXES_KEY = CONFIG_PREFIX + 
+  public static final String HDFS_PATH_PREFIXES_KEY = CONFIG_PREFIX +
       "hdfs-path-prefixes";
   public static final String[] HDFS_PATH_PREFIXES_DEFAULT = new String[0];
 
-  public static final String CACHE_REFRESH_INTERVAL_KEY = CONFIG_PREFIX + 
+  public static final String CACHE_REFRESH_INTERVAL_KEY = CONFIG_PREFIX +
       "cache-refresh-interval.ms";
   public static final int CACHE_REFRESH_INTERVAL_DEFAULT = 500;
 
-  public static final String CACHE_STALE_THRESHOLD_KEY = CONFIG_PREFIX + 
+  public static final String CACHE_STALE_THRESHOLD_KEY = CONFIG_PREFIX +
       "cache-stale-threshold.ms";
   public static final int CACHE_STALE_THRESHOLD_DEFAULT = 60 * 1000;
 
@@ -49,10 +49,10 @@ public final class SentryAuthorizationConstants {
       "cache-refresh-retry-wait.ms";
   public static final int CACHE_REFRESH_RETRY_WAIT_DEFAULT = 30 * 1000;
 
-  public static final String INCLUDE_HDFS_AUTHZ_AS_ACL_KEY = CONFIG_PREFIX + 
+  public static final String INCLUDE_HDFS_AUTHZ_AS_ACL_KEY = CONFIG_PREFIX +
       "include-hdfs-authz-as-acl";
   public static final boolean INCLUDE_HDFS_AUTHZ_AS_ACL_DEFAULT = false;
-  
+
   private SentryAuthorizationConstants() {
     // Make constructor private to avoid instantiation
   }

http://git-wip-us.apache.org/repos/asf/sentry/blob/81facc62/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationProvider.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationProvider.java
deleted file mode 100644
index f639f5f..0000000
--- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationProvider.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permission and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclEntryScope;
-import org.apache.hadoop.fs.permission.AclEntryType;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.namenode.AclFeature;
-import org.apache.hadoop.hdfs.server.namenode.AuthorizationProvider;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
-import org.apache.hadoop.security.AccessControlException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-
-public class SentryAuthorizationProvider
-    extends AuthorizationProvider implements Configurable {
-
-  static class SentryAclFeature extends AclFeature {
-    public SentryAclFeature(ImmutableList<AclEntry> entries) {
-      super(entries);
-    }
-  }
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(SentryAuthorizationProvider.class);
-  private static final String WARN_VISIBILITY = 
-      " The result won't be visible when the path is managed by Sentry";
-
-  private boolean started;
-  private Configuration conf;
-  private AuthorizationProvider defaultAuthzProvider;
-  private String user;
-  private String group;
-  private FsPermission permission;
-  private boolean originalAuthzAsAcl;
-  private SentryAuthorizationInfo authzInfo;
-
-  public SentryAuthorizationProvider() {
-    this(null);
-  }
-
-  @VisibleForTesting
-  SentryAuthorizationProvider(SentryAuthorizationInfo authzInfo) {
-    this.authzInfo = authzInfo;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public synchronized void start() {
-    if (started) {
-      throw new IllegalStateException("Provider already started");
-    }
-    started = true;
-    try {
-      if (!conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, false)) {
-        throw new RuntimeException("HDFS ACLs must be enabled");
-      }
-
-      defaultAuthzProvider = AuthorizationProvider.get();
-      defaultAuthzProvider.start();
-      // Configuration is read from hdfs-sentry.xml and NN configuration, in
-      // that order of precedence.
-      Configuration newConf = new Configuration(this.conf);
-      newConf.addResource(SentryAuthorizationConstants.CONFIG_FILE);
-      user = newConf.get(SentryAuthorizationConstants.HDFS_USER_KEY,
-          SentryAuthorizationConstants.HDFS_USER_DEFAULT);
-      group = newConf.get(SentryAuthorizationConstants.HDFS_GROUP_KEY,
-          SentryAuthorizationConstants.HDFS_GROUP_DEFAULT);
-      permission = FsPermission.createImmutable(
-          (short) newConf.getLong(SentryAuthorizationConstants.HDFS_PERMISSION_KEY,
-              SentryAuthorizationConstants.HDFS_PERMISSION_DEFAULT)
-      );
-      originalAuthzAsAcl = newConf.getBoolean(
-          SentryAuthorizationConstants.INCLUDE_HDFS_AUTHZ_AS_ACL_KEY,
-          SentryAuthorizationConstants.INCLUDE_HDFS_AUTHZ_AS_ACL_DEFAULT);
-
-      LOG.info("Starting");
-      LOG.info("Config: hdfs-user[{}] hdfs-group[{}] hdfs-permission[{}] " +
-          "include-hdfs-authz-as-acl[{}]", new Object[]
-          {user, group, permission, originalAuthzAsAcl});
-
-      if (authzInfo == null) {
-        authzInfo = new SentryAuthorizationInfo(newConf);
-      }
-      authzInfo.start();
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-  }
-
-  @Override
-  public synchronized void stop() {
-    LOG.debug("Stopping");
-    authzInfo.stop();
-    defaultAuthzProvider.stop();
-    defaultAuthzProvider = null;
-  }
-
-  @Override
-  public void setSnaphottableDirs(Map<INodeAuthorizationInfo, Integer>
-      snapshotableDirs) {
-    defaultAuthzProvider.setSnaphottableDirs(snapshotableDirs);
-  }
-
-  @Override
-  public void addSnapshottable(INodeAuthorizationInfo dir) {
-    defaultAuthzProvider.addSnapshottable(dir);
-  }
-
-  @Override
-  public void removeSnapshottable(INodeAuthorizationInfo dir) {
-    defaultAuthzProvider.removeSnapshottable(dir);
-  }
-
-  @Override
-  public void createSnapshot(INodeAuthorizationInfo dir, int snapshotId)
-      throws IOException{
-    defaultAuthzProvider.createSnapshot(dir, snapshotId);
-  }
-
-  @Override
-  public void removeSnapshot(INodeAuthorizationInfo dir, int snapshotId)
-      throws IOException {
-    defaultAuthzProvider.removeSnapshot(dir, snapshotId);
-  }
-
-  @Override
-  public void checkPermission(String user, Set<String> groups,
-      INodeAuthorizationInfo[] inodes, int snapshotId,
-      boolean doCheckOwner, FsAction ancestorAccess, FsAction parentAccess,
-      FsAction access, FsAction subAccess, boolean ignoreEmptyDir)
-      throws AccessControlException, UnresolvedLinkException {
-    defaultAuthzProvider.checkPermission(user, groups, inodes, snapshotId,
-        doCheckOwner, ancestorAccess, parentAccess, access, subAccess,
-        ignoreEmptyDir);
-  }
-
-  private static final String[] EMPTY_STRING_ARRAY = new String[0];
-
-  private String[] getPathElements(INodeAuthorizationInfo node) {
-    return getPathElements(node, 0);
-  }
-
-  private String[] getPathElements(INodeAuthorizationInfo node, int idx) {
-    String[] paths;
-    INodeAuthorizationInfo parent = node.getParent();
-    if (parent == null) {
-      paths = idx > 0 ? new String[idx] : EMPTY_STRING_ARRAY;
-    } else {
-      paths = getPathElements(parent, idx + 1);
-      paths[paths.length - 1 - idx] = node.getLocalName();
-    }
-    return paths;
-  }
-
-  private boolean isSentryManaged(final String[] pathElements) {
-    return authzInfo.isSentryManaged(pathElements);
-  }
-
-  private boolean isSentryManaged(INodeAuthorizationInfo node) {
-    String[] pathElements = getPathElements(node);
-    return isSentryManaged(pathElements);
-  }
-
-  @Override
-  public void setUser(INodeAuthorizationInfo node, String user) {
-    // always fall through to defaultAuthZProvider, 
-    // issue warning when the path is sentry managed
-    if (isSentryManaged(node)) {
-      LOG.warn("### setUser {} (sentry managed path) to {}, update HDFS." +
-          WARN_VISIBILITY,
-          node.getFullPathName(), user);
-    }
-    defaultAuthzProvider.setUser(node, user);
-  }
-
-  @Override
-  public String getUser(INodeAuthorizationInfo node, int snapshotId) {
-    return isSentryManaged(node)?
-        this.user : defaultAuthzProvider.getUser(node, snapshotId);
-  }
-
-  @Override
-  public void setGroup(INodeAuthorizationInfo node, String group) {
-    // always fall through to defaultAuthZProvider, 
-    // issue warning when the path is sentry managed
-    if (isSentryManaged(node)) {
-      LOG.warn("### setGroup {} (sentry managed path) to {}, update HDFS." +
-          WARN_VISIBILITY,
-          node.getFullPathName(), group);
-    }
-    defaultAuthzProvider.setGroup(node, group);
-  }
-
-  @Override
-  public String getGroup(INodeAuthorizationInfo node, int snapshotId) {
-    return isSentryManaged(node)?
-        this.group : defaultAuthzProvider.getGroup(node, snapshotId);
-  }
-
-  @Override
-  public void setPermission(INodeAuthorizationInfo node, FsPermission permission) {
-    // always fall through to defaultAuthZProvider, 
-    // issue warning when the path is sentry managed
-    if (isSentryManaged(node)) {
-      LOG.warn("### setPermission {} (sentry managed path) to {}, update HDFS." +
-          WARN_VISIBILITY,
-          node.getFullPathName(), permission.toString());
-    }
-    defaultAuthzProvider.setPermission(node, permission);
-  }
-
-  @Override
-  public FsPermission getFsPermission(
-      INodeAuthorizationInfo node, int snapshotId) {
-    FsPermission returnPerm;
-    String[] pathElements = getPathElements(node);
-    if (!isSentryManaged(pathElements)) {
-      returnPerm = defaultAuthzProvider.getFsPermission(node, snapshotId);
-    } else {
-      returnPerm = this.permission;
-      // Handle case when prefix directory is itself associated with an
-      // authorizable object (default db directory in hive)
-      // An executable permission needs to be set on the the prefix directory
-      // in this case.. else, subdirectories (which map to other dbs) will
-      // not be travesible.
-      for (String [] prefixPath : authzInfo.getPathPrefixes()) {
-        if (Arrays.equals(prefixPath, pathElements)) {
-          returnPerm = FsPermission.createImmutable((short)(returnPerm.toShort() | 0x01));
-          break;
-        }
-      }
-    }
-    return returnPerm;
-  }
-
-  private List<AclEntry> createAclEntries(String user, String group,
-      FsPermission permission) {
-    List<AclEntry> list = new ArrayList<AclEntry>();
-    AclEntry.Builder builder = new AclEntry.Builder();
-    FsPermission fsPerm = new FsPermission(permission);
-    builder.setName(user);
-    builder.setType(AclEntryType.USER);
-    builder.setScope(AclEntryScope.ACCESS);
-    builder.setPermission(fsPerm.getUserAction());
-    list.add(builder.build());
-    builder.setName(group);
-    builder.setType(AclEntryType.GROUP);
-    builder.setScope(AclEntryScope.ACCESS);
-    builder.setPermission(fsPerm.getGroupAction());
-    list.add(builder.build());
-    builder.setName(null);
-    return list;
-  }
-  /*
-  Returns hadoop acls if
-  - Not managed
-  - Not stale and not an auth obj
-  Returns hive:hive
-  - If stale
-  Returns sentry acls
-  - Otherwise, if not stale and auth obj
-   */
-  @Override
-  public AclFeature getAclFeature(INodeAuthorizationInfo node, int snapshotId) {
-    AclFeature f = null;
-    String[] pathElements = getPathElements(node);
-    String p = Arrays.toString(pathElements);
-    boolean isPrefixed = false;
-    boolean isStale = false;
-    boolean hasAuthzObj = false;
-    Map<String, AclEntry> aclMap = null;
-    if (!authzInfo.isUnderPrefix(pathElements)) {
-      isPrefixed = false;
-      f = defaultAuthzProvider.getAclFeature(node, snapshotId);
-    } else if (!authzInfo.doesBelongToAuthzObject(pathElements)) {
-      isPrefixed = true;
-      f = defaultAuthzProvider.getAclFeature(node, snapshotId);
-    } else {
-      isPrefixed = true;
-      hasAuthzObj = true;
-      aclMap = new HashMap<String, AclEntry>();
-      if (originalAuthzAsAcl) {
-        String newUser = defaultAuthzProvider.getUser(node, snapshotId);
-        String newGroup = getDefaultProviderGroup(node, snapshotId);
-        FsPermission perm = defaultAuthzProvider.getFsPermission(node, snapshotId);
-        addToACLMap(aclMap, createAclEntries(newUser, newGroup, perm));
-      } else {
-        addToACLMap(aclMap,
-            createAclEntries(this.user, this.group, this.permission));
-      }
-      if (!authzInfo.isStale()) {
-        isStale = false;
-        addToACLMap(aclMap, authzInfo.getAclEntries(pathElements));
-        f = new SentryAclFeature(ImmutableList.copyOf(aclMap.values()));
-      } else {
-        isStale = true;
-        f = new SentryAclFeature(ImmutableList.copyOf(aclMap.values()));
-      }
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("### getAclEntry \n[" + p + "] : ["
-          + "isPreifxed=" + isPrefixed
-          + ", isStale=" + isStale
-          + ", hasAuthzObj=" + hasAuthzObj
-          + ", origAuthzAsAcl=" + originalAuthzAsAcl + "]\n"
-          + "[" + (aclMap == null ? "null" : aclMap) + "]\n"
-          + "[" + (f == null ? "null" : f.getEntries()) + "]\n");
-    }
-    return f;
-  }
-
-  private void addToACLMap(Map<String, AclEntry> map,
-      Collection<AclEntry> entries) {
-    for (AclEntry ent : entries) {
-      String key = (ent.getName() == null ? "" : ent.getName())
-          + ent.getScope() + ent.getType();
-      AclEntry aclEntry = map.get(key);
-      if (aclEntry == null) {
-        map.put(key, ent);
-      } else {
-        map.put(key,
-            new AclEntry.Builder().
-            setName(ent.getName()).
-            setScope(ent.getScope()).
-            setType(ent.getType()).
-            setPermission(ent.getPermission().or(aclEntry.getPermission())).
-            build());
-      }
-    }
-  }
-
-  private String getDefaultProviderGroup(INodeAuthorizationInfo node,
-      int snapshotId) {
-    String newGroup = defaultAuthzProvider.getGroup(node, snapshotId);
-    INodeAuthorizationInfo pNode = node.getParent();
-    while (newGroup == null && pNode != null) {
-      newGroup = defaultAuthzProvider.getGroup(pNode, snapshotId);
-      pNode = pNode.getParent();
-    }
-    return newGroup;
-  }
-
-  /*
-   * Check if the given node has ACL, remove the ACL if so. Issue a warning
-   * message when the node doesn't have ACL and warn is true.
-   * TODO: We need this to maintain backward compatibility (not throw error in
-   * some cases). We may remove this when we release sentry major version.
-   */
-  private void checkAndRemoveHdfsAcl(INodeAuthorizationInfo node,
-      boolean warn) {
-    AclFeature f = defaultAuthzProvider.getAclFeature(node,
-        Snapshot.CURRENT_STATE_ID);
-    if (f != null) {
-      defaultAuthzProvider.removeAclFeature(node);
-    } else {
-      if (warn) {
-        LOG.warn("### removeAclFeature is requested on {}, but it does not " +
-            "have any acl.", node);
-      }
-    }
-  }
-
-  @Override
-  public void removeAclFeature(INodeAuthorizationInfo node) {
-    // always fall through to defaultAuthZProvider, 
-    // issue warning when the path is sentry managed
-    if (isSentryManaged(node)) {
-      LOG.warn("### removeAclFeature {} (sentry managed path), update HDFS." +
-          WARN_VISIBILITY,
-          node.getFullPathName());
-      // For Sentry-managed paths, client code may try to remove a 
-      // non-existing ACL, ignore the request with a warning if the ACL
-      // doesn't exist
-      checkAndRemoveHdfsAcl(node, true);
-    } else {
-      defaultAuthzProvider.removeAclFeature(node);
-    }
-  }
-
-  @Override
-  public void addAclFeature(INodeAuthorizationInfo node, AclFeature f) {
-    // always fall through to defaultAuthZProvider, 
-    // issue warning when the path is sentry managed
-    if (isSentryManaged(node)) {
-      LOG.warn("### addAclFeature {} (sentry managed path) {}, update HDFS." +
-          WARN_VISIBILITY,
-          node.getFullPathName(), f.toString());
-      // For Sentry-managed path, remove ACL silently before adding new ACL
-      checkAndRemoveHdfsAcl(node, false);
-    }
-    defaultAuthzProvider.addAclFeature(node, f);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/81facc62/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryINodeAttributesProvider.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryINodeAttributesProvider.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryINodeAttributesProvider.java
new file mode 100644
index 0000000..809c816
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryINodeAttributesProvider.java
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permission and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+
+import org.apache.hadoop.hdfs.server.namenode.AclEntryStatusFormat;
+import org.apache.hadoop.hdfs.server.namenode.AclFeature;
+import org.apache.hadoop.hdfs.server.namenode.INodeAttributeProvider;
+import org.apache.hadoop.hdfs.server.namenode.INodeAttributes;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
+import org.apache.hadoop.hdfs.server.namenode.XAttrFeature;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class SentryINodeAttributesProvider extends INodeAttributeProvider
+        implements Configurable {
+
+  private static Logger LOG =
+          LoggerFactory.getLogger(SentryINodeAttributesProvider.class);
+
+  static class SentryAclFeature extends AclFeature {
+    public SentryAclFeature(ImmutableList<AclEntry> entries) {
+      super(AclEntryStatusFormat.toInt(entries));
+    }
+  }
+
+  class SentryPermissionEnforcer implements AccessControlEnforcer {
+    private final AccessControlEnforcer ace;
+
+    SentryPermissionEnforcer(INodeAttributeProvider.AccessControlEnforcer ace) {
+      this.ace = ace;
+    }
+
+    @Override
+    public void checkPermission(String fsOwner, String supergroup,
+                                UserGroupInformation callerUgi,
+                                INodeAttributes[] inodeAttrs,
+                                INode[] inodes, byte[][] pathByNameArr,
+                                int snapshotId, String path,
+                                int ancestorIndex, boolean doCheckOwner,
+                                FsAction ancestorAccess,
+                                FsAction parentAccess, FsAction access,
+                                FsAction subAccess,
+                                boolean ignoreEmptyDir) throws
+            AccessControlException {
+      String[] pathElems = getPathElems(pathByNameArr);
+      if (pathElems != null && (pathElems.length > 1) && ("".equals(pathElems[0]))) {
+        pathElems = Arrays.copyOfRange(pathElems, 1, pathElems.length);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Enforcing Permission : + " + Lists
+              .newArrayList(fsOwner, supergroup, callerUgi.getShortUserName(),
+                      Arrays.toString(callerUgi.getGroupNames()),
+                      Arrays.toString(pathElems), ancestorAccess,
+                      parentAccess, access, subAccess, ignoreEmptyDir));
+      }
+      ace.checkPermission(fsOwner, supergroup, callerUgi,
+              inodeAttrs, inodes,
+              pathByNameArr, snapshotId, path, ancestorIndex,
+              doCheckOwner,
+              ancestorAccess, parentAccess, access, subAccess,
+              ignoreEmptyDir);
+    }
+
+    private String[] getPathElems(byte[][] pathByName) {
+      String[] retVal = new String[pathByName.length];
+      for (int i = 0; i < pathByName.length; i++) {
+        retVal[i] = (pathByName[i] != null) ? DFSUtil.bytes2String
+                (pathByName[i]) : "";
+      }
+      return retVal;
+    }
+  }
+
+  public class SentryINodeAttributes implements INodeAttributes {
+
+    private final INodeAttributes defaultAttributes;
+    private final String[] pathElements;
+
+    public SentryINodeAttributes(INodeAttributes defaultAttributes, String[]
+            pathElements) {
+      this.defaultAttributes = defaultAttributes;
+      this.pathElements = pathElements;
+    }
+
+    @Override
+    public boolean isDirectory() {
+      return defaultAttributes.isDirectory();
+    }
+
+    @Override
+    public byte[] getLocalNameBytes() {
+      return defaultAttributes.getLocalNameBytes();
+    }
+
+    @Override
+    public String getUserName() {
+      return isSentryManaged(pathElements)?
+          SentryINodeAttributesProvider.this.user : defaultAttributes.getUserName();
+    }
+
+    @Override
+    public String getGroupName() {
+      return isSentryManaged(pathElements)?
+          SentryINodeAttributesProvider.this.group : defaultAttributes.getGroupName();
+    }
+
+    @Override
+    public FsPermission getFsPermission() {
+      FsPermission permission;
+
+      if (!isSentryManaged(pathElements)) {
+        permission = defaultAttributes.getFsPermission();
+      } else {
+        FsPermission returnPerm = SentryINodeAttributesProvider.this.permission;
+        // Handle case when prefix directory is itself associated with an
+        // authorizable object (default db directory in hive)
+        // An executable permission needs to be set on the the prefix directory
+        // in this case.. else, subdirectories (which map to other dbs) will
+        // not be travesible.
+        for (String [] prefixPath : authzInfo.getPathPrefixes()) {
+          if (Arrays.equals(prefixPath, pathElements)) {
+            returnPerm = FsPermission.createImmutable((short)(returnPerm.toShort() | 0x01));
+            break;
+          }
+        }
+        permission = returnPerm;
+      }
+      return permission;
+    }
+
+    @Override
+    public short getFsPermissionShort() {
+      return getFsPermission().toShort();
+    }
+
+    @Override
+    public long getPermissionLong() {
+      PermissionStatus permissionStatus = new PermissionStatus(getUserName(),
+              getGroupName(), getFsPermission());
+      // No other way to get the long permission currently
+      return new INodeDirectory(0l, null, permissionStatus, 0l)
+              .getPermissionLong();
+    }
+
+    /**
+     * Returns hadoop acls if
+     *  - Not managed
+     *  - Not stale and not an auth obj
+     * Returns hive:hive
+     *  - If stale
+     * Returns sentry acls
+     *  - Otherwise, if not stale and auth obj
+     **/
+    @Override
+    public AclFeature getAclFeature() {
+      AclFeature aclFeature;
+      String p = Arrays.toString(pathElements);
+      boolean isPrefixed = false;
+      boolean isStale = false;
+      boolean hasAuthzObj = false;
+      Map<String, AclEntry> aclMap = null;
+
+      // If path is not under prefix, return hadoop acls.
+      if (!authzInfo.isUnderPrefix(pathElements)) {
+        isPrefixed = false;
+        aclFeature = defaultAttributes.getAclFeature();
+      } else if (!authzInfo.doesBelongToAuthzObject(pathElements)) {
+        // If path is not managed, return hadoop acls.
+        isPrefixed = true;
+        aclFeature = defaultAttributes.getAclFeature();
+      } else {
+        // If path is managed, add original hadoop permission if originalAuthzAsAcl true.
+        isPrefixed = true;
+        hasAuthzObj = true;
+        aclMap = new HashMap<String, AclEntry>();
+        if (originalAuthzAsAcl) {
+          String user = defaultAttributes.getUserName();
+          String group = defaultAttributes.getGroupName();
+          FsPermission perm = defaultAttributes.getFsPermission();
+          addToACLMap(aclMap, createAclEntries(user, group, perm));
+        } else {
+          // else add hive:hive
+          addToACLMap(aclMap, createAclEntries(user, group, permission));
+        }
+        if (!authzInfo.isStale()) {
+          // if not stale return sentry acls.
+          isStale = false;
+          addToACLMap(aclMap, authzInfo.getAclEntries(pathElements));
+          aclFeature = new SentryAclFeature(ImmutableList.copyOf(aclMap.values()));
+        } else {
+          // if stale return hive:hive
+          isStale = true;
+          aclFeature = new SentryAclFeature(ImmutableList.copyOf(aclMap.values()));
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("### getAclEntry \n[" + (p == null ? "null" : p) + "] : ["
+            + "isPreifxed=" + isPrefixed
+            + ", isStale=" + isStale
+            + ", hasAuthzObj=" + hasAuthzObj
+            + ", origAuthzAsAcl=" + originalAuthzAsAcl + "]\n"
+            + "[" + (aclMap == null ? "null" : aclMap) + "]\n");
+      }
+      return aclFeature;
+    }
+
+    @Override
+    public XAttrFeature getXAttrFeature() {
+      return defaultAttributes.getXAttrFeature();
+    }
+
+    @Override
+    public long getModificationTime() {
+      return defaultAttributes.getModificationTime();
+    }
+
+    @Override
+    public long getAccessTime() {
+      return defaultAttributes.getAccessTime();
+    }
+  }
+
+  private boolean started;
+  private SentryAuthorizationInfo authzInfo;
+  private String user;
+  private String group;
+  private FsPermission permission;
+  private boolean originalAuthzAsAcl;
+  private Configuration conf;
+
+  public SentryINodeAttributesProvider() {
+  }
+
+  private boolean isSentryManaged(final String[] pathElements) {
+    return authzInfo.isSentryManaged(pathElements);
+  }
+
+  @VisibleForTesting
+  SentryINodeAttributesProvider(SentryAuthorizationInfo authzInfo) {
+    this.authzInfo = authzInfo;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+
+  @Override
+  public void start() {
+    if (started) {
+      throw new IllegalStateException("Provider already started");
+    }
+    started = true;
+    try {
+      if (!conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY,
+              false)) {
+        throw new RuntimeException("HDFS ACLs must be enabled");
+      }
+      Configuration conf = new Configuration(this.conf);
+      conf.addResource(SentryAuthorizationConstants.CONFIG_FILE);
+      user = conf.get(SentryAuthorizationConstants.HDFS_USER_KEY,
+              SentryAuthorizationConstants.HDFS_USER_DEFAULT);
+      group = conf.get(SentryAuthorizationConstants.HDFS_GROUP_KEY,
+              SentryAuthorizationConstants.HDFS_GROUP_DEFAULT);
+      permission = FsPermission.createImmutable(
+              (short) conf.getLong(SentryAuthorizationConstants
+                              .HDFS_PERMISSION_KEY,
+                      SentryAuthorizationConstants.HDFS_PERMISSION_DEFAULT)
+      );
+      originalAuthzAsAcl = conf.getBoolean(
+              SentryAuthorizationConstants.INCLUDE_HDFS_AUTHZ_AS_ACL_KEY,
+              SentryAuthorizationConstants.INCLUDE_HDFS_AUTHZ_AS_ACL_DEFAULT);
+
+      LOG.info("Starting");
+      LOG.info("Config: hdfs-user[{}] hdfs-group[{}] hdfs-permission[{}] " +
+              "include-hdfs-authz-as-acl[{}]", new Object[]
+              {user, group, permission, originalAuthzAsAcl});
+
+      if (authzInfo == null) {
+        authzInfo = new SentryAuthorizationInfo(conf);
+      }
+      authzInfo.start();
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  public void stop() {
+    LOG.debug("Stopping");
+    authzInfo.stop();
+  }
+
+  @Override
+  public INodeAttributes getAttributes(String[] pathElements,
+                                       INodeAttributes inode) {
+    Preconditions.checkNotNull(pathElements);
+    pathElements = "".equals(pathElements[0]) && pathElements.length > 1 ?
+            Arrays.copyOfRange(pathElements, 1, pathElements.length) :
+            pathElements;
+    return isSentryManaged(pathElements) ? new SentryINodeAttributes
+            (inode, pathElements) : inode;
+  }
+
+  @Override
+  public AccessControlEnforcer getExternalAccessControlEnforcer
+          (AccessControlEnforcer defaultEnforcer) {
+    return new SentryPermissionEnforcer(defaultEnforcer);
+  }
+
+  private static void addToACLMap(Map<String, AclEntry> map,
+                                  Collection<AclEntry> entries) {
+    for (AclEntry ent : entries) {
+      String key = (ent.getName() == null ? "" : ent.getName())
+              + ent.getScope() + ent.getType();
+      AclEntry aclEntry = map.get(key);
+      if (aclEntry == null) {
+        map.put(key, ent);
+      } else {
+        map.put(key,
+                new AclEntry.Builder().
+                        setName(ent.getName()).
+                        setScope(ent.getScope()).
+                        setType(ent.getType()).
+                        setPermission(ent.getPermission().or(aclEntry
+                                .getPermission())).
+                        build());
+      }
+    }
+  }
+
+  private static List<AclEntry> createAclEntries(String user, String group,
+                                                 FsPermission permission) {
+    List<AclEntry> list = new ArrayList<AclEntry>();
+    AclEntry.Builder builder = new AclEntry.Builder();
+    FsPermission fsPerm = new FsPermission(permission);
+    builder.setName(user);
+    builder.setType(AclEntryType.USER);
+    builder.setScope(AclEntryScope.ACCESS);
+    builder.setPermission(fsPerm.getUserAction());
+    list.add(builder.build());
+    builder.setName(group);
+    builder.setType(AclEntryType.GROUP);
+    builder.setScope(AclEntryScope.ACCESS);
+    builder.setPermission(fsPerm.getGroupAction());
+    list.add(builder.build());
+    builder.setName(null);
+    return list;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/81facc62/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/MockSentryAuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/MockSentryAuthorizationProvider.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/MockSentryAuthorizationProvider.java
deleted file mode 100644
index 2085b52..0000000
--- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/MockSentryAuthorizationProvider.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-public class MockSentryAuthorizationProvider extends
-    SentryAuthorizationProvider {
-
-  public MockSentryAuthorizationProvider() {
-    super(new SentryAuthorizationInfoX());
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/81facc62/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/MockSentryINodeAttributesProvider.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/MockSentryINodeAttributesProvider.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/MockSentryINodeAttributesProvider.java
new file mode 100644
index 0000000..1e74b2d
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/MockSentryINodeAttributesProvider.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+public class MockSentryINodeAttributesProvider extends
+    SentryINodeAttributesProvider {
+
+  public MockSentryINodeAttributesProvider() {
+    super(new SentryAuthorizationInfoX());
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/81facc62/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/TestSentryAuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/TestSentryAuthorizationProvider.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/TestSentryAuthorizationProvider.java
deleted file mode 100644
index 5da0dc2..0000000
--- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/TestSentryAuthorizationProvider.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.LinkedHashSet;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclEntryScope;
-import org.apache.hadoop.fs.permission.AclEntryType;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class TestSentryAuthorizationProvider {
-  private static final String DFS_NAMENODE_AUTHORIZATION_PROVIDER_KEY =
-      "dfs.namenode.authorization.provider.class";
-
-  private MiniDFSCluster miniDFS;
-  private UserGroupInformation admin;
-  
-  @Before
-  public void setUp() throws Exception {
-    admin = UserGroupInformation.createUserForTesting(
-        System.getProperty("user.name"), new String[] { "supergroup" });
-    admin.doAs(new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws Exception {
-        System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, "target/test/data");
-        Configuration conf = new HdfsConfiguration();
-        conf.setBoolean("sentry.authorization-provider.include-hdfs-authz-as-acl", true);
-        conf.set(DFS_NAMENODE_AUTHORIZATION_PROVIDER_KEY,
-            MockSentryAuthorizationProvider.class.getName());
-        conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
-        EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
-        miniDFS = new MiniDFSCluster.Builder(conf).build();
-        return null;
-      }
-    });
-  }
-
-  @After
-  public void cleanUp() throws IOException {
-    if (miniDFS != null) {
-      miniDFS.shutdown();
-    }
-  }
-
-  @Test
-  public void testProvider() throws Exception {
-    admin.doAs(new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws Exception {
-        String sysUser = UserGroupInformation.getCurrentUser().getShortUserName();
-        FileSystem fs = FileSystem.get(miniDFS.getConfiguration(0));
-
-        List<AclEntry> baseAclList = new ArrayList<AclEntry>();
-        AclEntry.Builder builder = new AclEntry.Builder();
-        baseAclList.add(builder.setType(AclEntryType.USER)
-            .setScope(AclEntryScope.ACCESS).build());
-        baseAclList.add(builder.setType(AclEntryType.GROUP)
-            .setScope(AclEntryScope.ACCESS).build());
-        baseAclList.add(builder.setType(AclEntryType.OTHER)
-            .setScope(AclEntryScope.ACCESS).build());
-        Path path1 = new Path("/user/authz/obj/xxx");
-        fs.mkdirs(path1);
-        fs.setAcl(path1, baseAclList);
-
-        fs.mkdirs(new Path("/user/authz/xxx"));
-        fs.mkdirs(new Path("/user/xxx"));
-
-        // root
-        Path path = new Path("/");
-        Assert.assertEquals(sysUser, fs.getFileStatus(path).getOwner());
-        Assert.assertEquals("supergroup", fs.getFileStatus(path).getGroup());
-        Assert.assertEquals(new FsPermission((short) 0755), fs.getFileStatus(path).getPermission());
-        Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty());
-
-        // dir before prefixes
-        path = new Path("/user");
-        Assert.assertEquals(sysUser, fs.getFileStatus(path).getOwner());
-        Assert.assertEquals("supergroup", fs.getFileStatus(path).getGroup());
-        Assert.assertEquals(new FsPermission((short) 0755), fs.getFileStatus(path).getPermission());
-        Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty());
-
-        // prefix dir
-        path = new Path("/user/authz");
-        Assert.assertEquals(sysUser, fs.getFileStatus(path).getOwner());
-        Assert.assertEquals("supergroup", fs.getFileStatus(path).getGroup());
-        Assert.assertEquals(new FsPermission((short) 0755), fs.getFileStatus(path).getPermission());
-        Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty());
-
-        // dir inside of prefix, no obj
-        path = new Path("/user/authz/xxx");
-        FileStatus status = fs.getFileStatus(path);
-        Assert.assertEquals(sysUser, status.getOwner());
-        Assert.assertEquals("supergroup", status.getGroup());
-        Assert.assertEquals(new FsPermission((short) 0755), status.getPermission());
-        Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty());
-
-        // dir inside of prefix, obj
-        path = new Path("/user/authz/obj");
-        Assert.assertEquals("hive", fs.getFileStatus(path).getOwner());
-        Assert.assertEquals("hive", fs.getFileStatus(path).getGroup());
-        Assert.assertEquals(new FsPermission((short) 0771), fs.getFileStatus(path).getPermission());
-        Assert.assertFalse(fs.getAclStatus(path).getEntries().isEmpty());
-
-        List<AclEntry> acls = new ArrayList<AclEntry>();
-        acls.add(new AclEntry.Builder().setName(sysUser).setType(AclEntryType.USER).setScope(AclEntryScope.ACCESS).setPermission(FsAction.ALL).build());
-        acls.add(new AclEntry.Builder().setName("supergroup").setType(AclEntryType.GROUP).setScope(AclEntryScope.ACCESS).setPermission(FsAction.READ_EXECUTE).build());
-        acls.add(new AclEntry.Builder().setName("user-authz").setType(AclEntryType.USER).setScope(AclEntryScope.ACCESS).setPermission(FsAction.ALL).build());
-        Assert.assertEquals(new LinkedHashSet<AclEntry>(acls), new LinkedHashSet<AclEntry>(fs.getAclStatus(path).getEntries()));
-
-        // dir inside of prefix, inside of obj
-        path = new Path("/user/authz/obj/xxx");
-        Assert.assertEquals("hive", fs.getFileStatus(path).getOwner());
-        Assert.assertEquals("hive", fs.getFileStatus(path).getGroup());
-        Assert.assertEquals(new FsPermission((short) 0771), fs.getFileStatus(path).getPermission());
-        Assert.assertFalse(fs.getAclStatus(path).getEntries().isEmpty());
-        
-        Path path2 = new Path("/user/authz/obj/path2");
-        fs.mkdirs(path2);
-        fs.setAcl(path2, baseAclList);
-
-        // dir outside of prefix
-        path = new Path("/user/xxx");
-        Assert.assertEquals(sysUser, fs.getFileStatus(path).getOwner());
-        Assert.assertEquals("supergroup", fs.getFileStatus(path).getGroup());
-        Assert.assertEquals(new FsPermission((short) 0755), fs.getFileStatus(path).getPermission());
-        Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty());
-
-        //stale and dir inside of prefix, obj
-        System.setProperty("test.stale", "true");
-        path = new Path("/user/authz/xxx");
-        status = fs.getFileStatus(path);
-        Assert.assertEquals(sysUser, status.getOwner());
-        Assert.assertEquals("supergroup", status.getGroup());
-        Assert.assertEquals(new FsPermission((short) 0755), status.getPermission());
-        Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty());
-
-        // setPermission sets the permission for dir outside of prefix.
-        // setUser/setGroup sets the user/group for dir outside of prefix.
-        Path pathOutside = new Path("/user/xxx");
-
-        fs.setPermission(pathOutside, new FsPermission((short) 0000));
-        Assert.assertEquals(new FsPermission((short) 0000), fs.getFileStatus(pathOutside).getPermission());
-        fs.setOwner(pathOutside, sysUser, "supergroup");
-        Assert.assertEquals(sysUser, fs.getFileStatus(pathOutside).getOwner());
-        Assert.assertEquals("supergroup", fs.getFileStatus(pathOutside).getGroup());
-
-        // removeAcl removes the ACL entries for dir outside of prefix.
-        List<AclEntry> aclsOutside = new ArrayList<AclEntry>(baseAclList);
-        List<AclEntry> acl = new ArrayList<AclEntry>();
-        acl.add(new AclEntry.Builder().setName("supergroup").setType(AclEntryType.GROUP).setScope(AclEntryScope.ACCESS).
-                setPermission(FsAction.READ_EXECUTE).build());
-        aclsOutside.addAll(acl);
-        fs.setAcl(pathOutside, aclsOutside);
-        fs.removeAclEntries(pathOutside, acl);
-        Assert.assertFalse(fs.getAclStatus(pathOutside).getEntries().containsAll(acl));
-
-        // setPermission sets the permission for dir inside of prefix but not a hive obj.
-        // setUser/setGroup sets the user/group for dir inside of prefix but not a hive obj.
-        Path pathInside = new Path("/user/authz/xxx");
-
-        fs.setPermission(pathInside, new FsPermission((short) 0000));
-        Assert.assertEquals(new FsPermission((short) 0000), fs.getFileStatus(pathInside).getPermission());
-        fs.setOwner(pathInside, sysUser, "supergroup");
-        Assert.assertEquals(sysUser, fs.getFileStatus(pathInside).getOwner());
-        Assert.assertEquals("supergroup", fs.getFileStatus(pathInside).getGroup());
-
-        // removeAcl is a no op for dir inside of prefix.
-        Assert.assertTrue(fs.getAclStatus(pathInside).getEntries().isEmpty());
-        fs.removeAclEntries(pathInside, acl);
-        Assert.assertTrue(fs.getAclStatus(pathInside).getEntries().isEmpty());
-
-        // setPermission/setUser/setGroup is a no op for dir inside of prefix, and is a hive obj.
-        Path pathInsideAndHive = new Path("/user/authz/obj");
-
-        fs.setPermission(pathInsideAndHive, new FsPermission((short) 0000));
-        Assert.assertEquals(new FsPermission((short) 0771), fs.getFileStatus(pathInsideAndHive).getPermission());
-        fs.setOwner(pathInsideAndHive, sysUser, "supergroup");
-        Assert.assertEquals("hive", fs.getFileStatus(pathInsideAndHive).getOwner());
-        Assert.assertEquals("hive", fs.getFileStatus(pathInsideAndHive).getGroup());
-
-        return null;
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/81facc62/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/TestSentryINodeAttributesProvider.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/TestSentryINodeAttributesProvider.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/TestSentryINodeAttributesProvider.java
new file mode 100644
index 0000000..f9862d4
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/TestSentryINodeAttributesProvider.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestSentryINodeAttributesProvider {
+
+  private MiniDFSCluster miniDFS;
+  private UserGroupInformation admin;
+
+  @Before
+  public void setUp() throws Exception {
+    admin = UserGroupInformation.createUserForTesting(
+        System.getProperty("user.name"), new String[] { "supergroup" });
+    admin.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, "target/test/data");
+        Configuration conf = new HdfsConfiguration();
+        conf.setBoolean("sentry.authorization-provider.include-hdfs-authz-as-acl", true);
+        conf.set(DFSConfigKeys.DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY,
+            MockSentryINodeAttributesProvider.class.getName());
+        conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+        EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+        miniDFS = new MiniDFSCluster.Builder(conf).build();
+        return null;
+      }
+    });
+  }
+
+  @After
+  public void cleanUp() throws IOException {
+    if (miniDFS != null) {
+      miniDFS.shutdown();
+    }
+  }
+
+  @Test
+  public void testProvider() throws Exception {
+    admin.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        String sysUser = UserGroupInformation.getCurrentUser().getShortUserName();
+        FileSystem fs = FileSystem.get(miniDFS.getConfiguration(0));
+
+        List<AclEntry> baseAclList = new ArrayList<AclEntry>();
+        AclEntry.Builder builder = new AclEntry.Builder();
+        baseAclList.add(builder.setType(AclEntryType.USER)
+            .setScope(AclEntryScope.ACCESS).build());
+        baseAclList.add(builder.setType(AclEntryType.GROUP)
+            .setScope(AclEntryScope.ACCESS).build());
+        baseAclList.add(builder.setType(AclEntryType.OTHER)
+            .setScope(AclEntryScope.ACCESS).build());
+        Path path1 = new Path("/user/authz/obj/xxx");
+        fs.mkdirs(path1);
+        fs.setAcl(path1, baseAclList);
+
+        fs.mkdirs(new Path("/user/authz/xxx"));
+        fs.mkdirs(new Path("/user/xxx"));
+
+        // root
+        Path path = new Path("/");
+        Assert.assertEquals(sysUser, fs.getFileStatus(path).getOwner());
+        Assert.assertEquals("supergroup", fs.getFileStatus(path).getGroup());
+        Assert.assertEquals(new FsPermission((short) 0755), fs.getFileStatus(path).getPermission());
+        Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty());
+
+        // dir before prefixes
+        path = new Path("/user");
+        Assert.assertEquals(sysUser, fs.getFileStatus(path).getOwner());
+        Assert.assertEquals("supergroup", fs.getFileStatus(path).getGroup());
+        Assert.assertEquals(new FsPermission((short) 0755), fs.getFileStatus(path).getPermission());
+        Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty());
+
+        // prefix dir
+        path = new Path("/user/authz");
+        Assert.assertEquals(sysUser, fs.getFileStatus(path).getOwner());
+        Assert.assertEquals("supergroup", fs.getFileStatus(path).getGroup());
+        Assert.assertEquals(new FsPermission((short) 0755), fs.getFileStatus(path).getPermission());
+        Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty());
+
+        // dir inside of prefix, no obj
+        path = new Path("/user/authz/xxx");
+        FileStatus status = fs.getFileStatus(path);
+        Assert.assertEquals(sysUser, status.getOwner());
+        Assert.assertEquals("supergroup", status.getGroup());
+        Assert.assertEquals(new FsPermission((short) 0755), status.getPermission());
+        Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty());
+
+        // dir inside of prefix, obj
+        path = new Path("/user/authz/obj");
+        Assert.assertEquals("hive", fs.getFileStatus(path).getOwner());
+        Assert.assertEquals("hive", fs.getFileStatus(path).getGroup());
+        Assert.assertEquals(new FsPermission((short) 0771), fs.getFileStatus(path).getPermission());
+        Assert.assertFalse(fs.getAclStatus(path).getEntries().isEmpty());
+
+        List<AclEntry> acls = new ArrayList<AclEntry>();
+        acls.add(new AclEntry.Builder().setName(sysUser).setType(AclEntryType.USER).setScope(AclEntryScope.ACCESS).setPermission(FsAction.ALL).build());
+        acls.add(new AclEntry.Builder().setName("supergroup").setType(AclEntryType.GROUP).setScope(AclEntryScope.ACCESS).setPermission(FsAction.READ_EXECUTE).build());
+        acls.add(new AclEntry.Builder().setName("user-authz").setType(AclEntryType.USER).setScope(AclEntryScope.ACCESS).setPermission(FsAction.ALL).build());
+        Assert.assertEquals(new LinkedHashSet<AclEntry>(acls), new LinkedHashSet<AclEntry>(fs.getAclStatus(path).getEntries()));
+
+        // dir inside of prefix, inside of obj
+        path = new Path("/user/authz/obj/xxx");
+        Assert.assertEquals("hive", fs.getFileStatus(path).getOwner());
+        Assert.assertEquals("hive", fs.getFileStatus(path).getGroup());
+        Assert.assertEquals(new FsPermission((short) 0771), fs.getFileStatus(path).getPermission());
+        Assert.assertFalse(fs.getAclStatus(path).getEntries().isEmpty());
+
+        Path path2 = new Path("/user/authz/obj/path2");
+        fs.mkdirs(path2);
+        fs.setAcl(path2, baseAclList);
+
+        // dir outside of prefix
+        path = new Path("/user/xxx");
+        Assert.assertEquals(sysUser, fs.getFileStatus(path).getOwner());
+        Assert.assertEquals("supergroup", fs.getFileStatus(path).getGroup());
+        Assert.assertEquals(new FsPermission((short) 0755), fs.getFileStatus(path).getPermission());
+        Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty());
+
+        //stale and dir inside of prefix, obj
+        System.setProperty("test.stale", "true");
+        path = new Path("/user/authz/xxx");
+        status = fs.getFileStatus(path);
+        Assert.assertEquals(sysUser, status.getOwner());
+        Assert.assertEquals("supergroup", status.getGroup());
+        Assert.assertEquals(new FsPermission((short) 0755), status.getPermission());
+        Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty());
+
+        // setPermission sets the permission for dir outside of prefix.
+        // setUser/setGroup sets the user/group for dir outside of prefix.
+        Path pathOutside = new Path("/user/xxx");
+
+        fs.setPermission(pathOutside, new FsPermission((short) 0000));
+        Assert.assertEquals(new FsPermission((short) 0000), fs.getFileStatus(pathOutside).getPermission());
+        fs.setOwner(pathOutside, sysUser, "supergroup");
+        Assert.assertEquals(sysUser, fs.getFileStatus(pathOutside).getOwner());
+        Assert.assertEquals("supergroup", fs.getFileStatus(pathOutside).getGroup());
+
+        // removeAcl removes the ACL entries for dir outside of prefix.
+        List<AclEntry> aclsOutside = new ArrayList<AclEntry>(baseAclList);
+        List<AclEntry> acl = new ArrayList<AclEntry>();
+        acl.add(new AclEntry.Builder().setName("supergroup").setType(AclEntryType.GROUP).setScope(AclEntryScope.ACCESS).
+                setPermission(FsAction.READ_EXECUTE).build());
+        aclsOutside.addAll(acl);
+        fs.setAcl(pathOutside, aclsOutside);
+        fs.removeAclEntries(pathOutside, acl);
+        Assert.assertFalse(fs.getAclStatus(pathOutside).getEntries().containsAll(acl));
+
+        // setPermission sets the permission for dir inside of prefix but not a hive obj.
+        // setUser/setGroup sets the user/group for dir inside of prefix but not a hive obj.
+        Path pathInside = new Path("/user/authz/xxx");
+
+        fs.setPermission(pathInside, new FsPermission((short) 0000));
+        Assert.assertEquals(new FsPermission((short) 0000), fs.getFileStatus(pathInside).getPermission());
+        fs.setOwner(pathInside, sysUser, "supergroup");
+        Assert.assertEquals(sysUser, fs.getFileStatus(pathInside).getOwner());
+        Assert.assertEquals("supergroup", fs.getFileStatus(pathInside).getGroup());
+
+        // removeAcl is a no op for dir inside of prefix.
+        Assert.assertTrue(fs.getAclStatus(pathInside).getEntries().isEmpty());
+        fs.removeAclEntries(pathInside, acl);
+        Assert.assertTrue(fs.getAclStatus(pathInside).getEntries().isEmpty());
+
+        // setPermission/setUser/setGroup is a no op for dir inside of prefix, and is a hive obj.
+        Path pathInsideAndHive = new Path("/user/authz/obj");
+
+        fs.setPermission(pathInsideAndHive, new FsPermission((short) 0000));
+        Assert.assertEquals(new FsPermission((short) 0771), fs.getFileStatus(pathInsideAndHive).getPermission());
+        fs.setOwner(pathInsideAndHive, sysUser, "supergroup");
+        Assert.assertEquals("hive", fs.getFileStatus(pathInsideAndHive).getOwner());
+        Assert.assertEquals("hive", fs.getFileStatus(pathInsideAndHive).getGroup());
+
+        return null;
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/81facc62/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
index 40fd58b..fb7c40a 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
@@ -101,7 +101,7 @@ public class TransactionManager {
    * Execute some code as a single transaction, the code in tb.execute()
    * should not start new transaction or manipulate transactions with the
    * PersistenceManager.
-   * 
+   *
    * @param tb transaction block with code to be executed
    * @return Object with the result of tb.execute()
    */

http://git-wip-us.apache.org/repos/asf/sentry/blob/81facc62/sentry-tests/sentry-tests-hive-v2/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive-v2/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java b/sentry-tests/sentry-tests-hive-v2/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
index 4d0c4b5..5873dd5 100644
--- a/sentry-tests/sentry-tests-hive-v2/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
+++ b/sentry-tests/sentry-tests-hive-v2/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
@@ -71,7 +71,7 @@ import org.apache.sentry.binding.hive.v2.SentryHiveAuthorizationTaskFactoryImplV
 import org.apache.sentry.binding.hive.v2.metastore.MetastoreAuthzBindingV2;
 import org.apache.sentry.binding.hive.v2.metastore.SentryMetastorePostEventListenerV2;
 import org.apache.sentry.hdfs.PathsUpdate;
-import org.apache.sentry.hdfs.SentryAuthorizationProvider;
+import org.apache.sentry.hdfs.SentryINodeAttributesProvider;
 import org.apache.sentry.provider.db.SentryAlreadyExistsException;
 import org.apache.sentry.provider.db.SimpleDBProviderBackend;
 import org.apache.sentry.provider.file.LocalGroupResourceAuthorizationProvider;
@@ -137,8 +137,6 @@ public class TestHDFSIntegration {
   private static final int NUM_RETRIES = 10;
   private static final int RETRY_WAIT = 1000;
   private static final String EXTERNAL_SENTRY_SERVICE = "sentry.e2etest.external.sentry";
-  private static final String DFS_NAMENODE_AUTHORIZATION_PROVIDER_KEY =
-      "dfs.namenode.authorization.provider.class";
 
   private static MiniDFSCluster miniDFS;
   private static InternalHiveServer hiveServer2;
@@ -354,8 +352,8 @@ public class TestHDFSIntegration {
       public Void run() throws Exception {
         System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, "target/test/data");
         hadoopConf = new HdfsConfiguration();
-        hadoopConf.set(DFS_NAMENODE_AUTHORIZATION_PROVIDER_KEY,
-            SentryAuthorizationProvider.class.getName());
+        hadoopConfconf.set(DFSConfigKeys.DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY,
+            SentryINodeAttributesProvider.class.getName());
         hadoopConf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
         hadoopConf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
         File dfsDir = assertCreateDir(new File(baseDir, "dfs"));

http://git-wip-us.apache.org/repos/asf/sentry/blob/81facc62/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
index f2d74bf..4f4d3e6 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
@@ -67,7 +67,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl;
 import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
 import org.apache.sentry.hdfs.PathsUpdate;
-import org.apache.sentry.hdfs.SentryAuthorizationProvider;
+import org.apache.sentry.hdfs.SentryINodeAttributesProvider;
 import org.apache.sentry.core.common.exception.SentryAlreadyExistsException;
 import org.apache.sentry.provider.db.SimpleDBProviderBackend;
 import org.apache.sentry.provider.file.LocalGroupResourceAuthorizationProvider;
@@ -143,8 +143,6 @@ public class TestHDFSIntegration {
   private static final int NUM_RETRIES = 10;
   private static final int RETRY_WAIT = 1000;
   private static final String EXTERNAL_SENTRY_SERVICE = "sentry.e2etest.external.sentry";
-  private static final String DFS_NAMENODE_AUTHORIZATION_PROVIDER_KEY =
-      "dfs.namenode.authorization.provider.class";
 
   private static MiniDFSCluster miniDFS;
   private static InternalHiveServer hiveServer2;
@@ -362,8 +360,8 @@ public class TestHDFSIntegration {
       public Void run() throws Exception {
         System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, "target/test/data");
         hadoopConf = new HdfsConfiguration();
-        hadoopConf.set(DFS_NAMENODE_AUTHORIZATION_PROVIDER_KEY,
-            SentryAuthorizationProvider.class.getName());
+        hadoopConf.set(DFSConfigKeys.DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY,
+            SentryINodeAttributesProvider.class.getName());
         hadoopConf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
         hadoopConf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
         File dfsDir = assertCreateDir(new File(baseDir, "dfs"));
@@ -552,7 +550,8 @@ public class TestHDFSIntegration {
     stmt.execute("create role admin_role");
     stmt.execute("grant role admin_role to group hive");
     stmt.execute("grant all on server server1 to role admin_role");
-    stmt.execute("create table p1 (s string) partitioned by (month int, day int)");
+    stmt.execute("create table p1 (s string) partitioned by (month int, day " +
+            "int)");
     stmt.execute("alter table p1 add partition (month=1, day=1)");
     stmt.execute("alter table p1 add partition (month=1, day=2)");
     stmt.execute("alter table p1 add partition (month=2, day=1)");
@@ -589,22 +588,23 @@ public class TestHDFSIntegration {
 
     // Verify default db is STILL inaccessible after grants but tables are fine
     verifyOnPath("/user/hive/warehouse", null, "hbase", false);
-    verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.READ_EXECUTE, "hbase", true);
+    verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.READ_EXECUTE,
+            "hbase", true);
 
     adminUgi.doAs(new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
         // Simulate hdfs dfs -setfacl -m <aclantry> <path>
         AclStatus existing =
-            miniDFS.getFileSystem()
-            .getAclStatus(new Path("/user/hive/warehouse/p1"));
+                miniDFS.getFileSystem()
+                        .getAclStatus(new Path("/user/hive/warehouse/p1"));
         ArrayList<AclEntry> newEntries =
-            new ArrayList<AclEntry>(existing.getEntries());
+                new ArrayList<AclEntry>(existing.getEntries());
         newEntries.add(AclEntry.parseAclEntry("user::---", true));
         newEntries.add(AclEntry.parseAclEntry("group:bla:rwx", true));
         newEntries.add(AclEntry.parseAclEntry("other::---", true));
         miniDFS.getFileSystem().setAcl(new Path("/user/hive/warehouse/p1"),
-            newEntries);
+                newEntries);
         return null;
       }
     });
@@ -617,7 +617,8 @@ public class TestHDFSIntegration {
     verifyOnPath("/user/hive/warehouse", FsAction.READ_EXECUTE, "hbase", true);
 
     // Verify default db grants are propagated to the tables
-    verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.READ_EXECUTE, "hbase", true);
+    verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.READ_EXECUTE,
+            "hbase", true);
 
     // Verify default db revokes work
     stmt.execute("revoke select on database default from role p1_admin");
@@ -1569,8 +1570,16 @@ public class TestHDFSIntegration {
   public void testAuthzObjOnMultipleTables() throws Throwable {
     String dbName = "db1";
 
-    tmpHDFSDir = new Path("/tmp/external/p1");
-    miniDFS.getFileSystem().mkdirs(tmpHDFSDir);
+    tmpHDFSDir = new Path("/tmp/external");
+    if (!miniDFS.getFileSystem().exists(tmpHDFSDir)) {
+      miniDFS.getFileSystem().mkdirs(tmpHDFSDir);
+    }
+
+    miniDFS.getFileSystem().setPermission(tmpHDFSDir, FsPermission.valueOf("drwxrwxrwx"));
+    Path partitionDir = new Path("/tmp/external/p1");
+    if (!miniDFS.getFileSystem().exists(partitionDir)) {
+      miniDFS.getFileSystem().mkdirs(partitionDir);
+    }
 
     dbNames = new String[]{dbName};
     roles = new String[]{"admin_role", "tab1_role", "tab2_role"};
@@ -1787,7 +1796,8 @@ public class TestHDFSIntegration {
       @Override
       public Void run() throws Exception {
         try {
-          miniDFS.getFileSystem().open(new Path("/user/hive/warehouse/p1/month=1/day=1/f1.txt"));
+          Path p = new Path("/user/hive/warehouse/p1/month=1/day=1/f1.txt");
+	        miniDFS.getFileSystem().open(p);
           Assert.fail("Should not be allowed !!");
         } catch (Exception e) {
           Assert.assertEquals("Wrong Error : " + e.getMessage(), true, e.getMessage().contains("Permission denied: user=hbase"));