You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ka...@apache.org on 2017/07/31 11:37:38 UTC

[4/6] sentry git commit: SENTRY-1839: Fork files from sentry-binding-hive-common package to sentry-binding-hive and sentry-binding-hive-v2 packages. (kalyan kumar kalvagadda reviewed by Colm O hEigeartaigh)

http://git-wip-us.apache.org/repos/asf/sentry/blob/b19cb01b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/HiveAuthzBindingHookBaseV2.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/HiveAuthzBindingHookBaseV2.java b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/HiveAuthzBindingHookBaseV2.java
new file mode 100644
index 0000000..5a21dd3
--- /dev/null
+++ b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/HiveAuthzBindingHookBaseV2.java
@@ -0,0 +1,880 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.binding.hive;
+
+import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME;
+
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.security.CodeSource;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.hooks.Entity;
+import org.apache.hadoop.hive.ql.hooks.Entity.Type;
+import org.apache.hadoop.hive.ql.hooks.Hook;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.AbstractSemanticAnalyzerHook;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.HiveParser;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.sentry.binding.hive.authz.HiveAuthzBinding;
+import org.apache.sentry.binding.hive.authz.HiveAuthzPrivileges;
+import org.apache.sentry.binding.hive.authz.HiveAuthzPrivileges.HiveOperationScope;
+import org.apache.sentry.binding.hive.authz.HiveAuthzPrivileges.HiveOperationType;
+import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
+import org.apache.sentry.core.common.Subject;
+import org.apache.sentry.core.common.utils.PathUtils;
+import org.apache.sentry.core.model.db.AccessURI;
+import org.apache.sentry.core.model.db.Column;
+import org.apache.sentry.core.model.db.DBModelAction;
+import org.apache.sentry.core.model.db.DBModelAuthorizable;
+import org.apache.sentry.core.model.db.DBModelAuthorizable.AuthorizableType;
+import org.apache.sentry.core.model.db.Database;
+import org.apache.sentry.core.model.db.Table;
+import org.apache.sentry.provider.cache.PrivilegeCache;
+import org.apache.sentry.provider.cache.SimplePrivilegeCache;
+import org.apache.sentry.provider.common.AuthorizationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public abstract class HiveAuthzBindingHookBaseV2 extends AbstractSemanticAnalyzerHook {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(HiveAuthzBindingHookBaseV2.class);
+  protected final HiveAuthzBinding hiveAuthzBinding;
+  protected final HiveAuthzConf authzConf;
+  protected Database currDB = Database.ALL;
+  protected Table currTab;
+  protected List<AccessURI> udfURIs;
+  protected AccessURI serdeURI;
+  protected AccessURI partitionURI;
+  protected Table currOutTab = null;
+  protected Database currOutDB = null;
+  protected final List<String> serdeWhiteList;
+  protected boolean serdeURIPrivilegesEnabled;
+
+  protected final static HiveAuthzPrivileges columnMetaDataPrivilege =
+      new HiveAuthzPrivileges.AuthzPrivilegeBuilder()
+          .addInputObjectPriviledge(AuthorizableType.Column,
+              EnumSet.of(DBModelAction.SELECT, DBModelAction.INSERT))
+          .setOperationScope(HiveOperationScope.COLUMN).setOperationType(HiveOperationType.INFO)
+          .build();
+
+  // True if this is a basic DESCRIBE <table> operation. False for other DESCRIBE variants
+  // like DESCRIBE [FORMATTED|EXTENDED]. Required because Hive treats these stmts as the same
+  // HiveOperationType, but we want to enforces different privileges on each statement.
+  // Basic DESCRIBE <table> is allowed with only column-level privs, while the variants
+  // require table-level privileges.
+  protected boolean isDescTableBasic = false;
+
+  public HiveAuthzBindingHookBaseV2() throws Exception {
+    SessionState session = SessionState.get();
+    if(session == null) {
+      throw new IllegalStateException("Session has not been started");
+    }
+    // HACK: set a random classname to force the Auth V2 in Hive
+    SessionState.get().setAuthorizer(null);
+
+    HiveConf hiveConf = session.getConf();
+    if(hiveConf == null) {
+      throw new IllegalStateException("Session HiveConf is null");
+    }
+    authzConf = loadAuthzConf(hiveConf);
+    udfURIs = Lists.newArrayList();
+    hiveAuthzBinding = new HiveAuthzBinding(hiveConf, authzConf);
+    String serdeWhiteLists =
+        authzConf.get(HiveAuthzConf.HIVE_SENTRY_SERDE_WHITELIST,
+            HiveAuthzConf.HIVE_SENTRY_SERDE_WHITELIST_DEFAULT);
+    serdeWhiteList = Arrays.asList(serdeWhiteLists.split(","));
+    serdeURIPrivilegesEnabled =
+        authzConf.getBoolean(HiveAuthzConf.HIVE_SENTRY_SERDE_URI_PRIVILIEGES_ENABLED,
+            HiveAuthzConf.HIVE_SENTRY_SERDE_URI_PRIVILIEGES_ENABLED_DEFAULT);
+
+    FunctionRegistry.setupPermissionsForBuiltinUDFs("", HiveAuthzConf.HIVE_UDF_BLACK_LIST);
+  }
+
+  public static HiveAuthzConf loadAuthzConf(HiveConf hiveConf) {
+    boolean depreicatedConfigFile = false;
+    HiveAuthzConf newAuthzConf = null;
+    String hiveAuthzConf = hiveConf.get(HiveAuthzConf.HIVE_SENTRY_CONF_URL);
+    if(hiveAuthzConf == null || (hiveAuthzConf = hiveAuthzConf.trim()).isEmpty()) {
+      hiveAuthzConf = hiveConf.get(HiveAuthzConf.HIVE_ACCESS_CONF_URL);
+      depreicatedConfigFile = true;
+    }
+
+    if(hiveAuthzConf == null || (hiveAuthzConf = hiveAuthzConf.trim()).isEmpty()) {
+      throw new IllegalArgumentException("Configuration key " + HiveAuthzConf.HIVE_SENTRY_CONF_URL
+          + " value '" + hiveAuthzConf + "' is invalid.");
+    }
+    try {
+      newAuthzConf = new HiveAuthzConf(new URL(hiveAuthzConf));
+    } catch (MalformedURLException e) {
+      if (depreicatedConfigFile) {
+        throw new IllegalArgumentException("Configuration key " + HiveAuthzConf.HIVE_ACCESS_CONF_URL
+            + " specifies a malformed URL '" + hiveAuthzConf + "'", e);
+      } else {
+        throw new IllegalArgumentException("Configuration key " + HiveAuthzConf.HIVE_SENTRY_CONF_URL
+            + " specifies a malformed URL '" + hiveAuthzConf + "'", e);
+      }
+    }
+    return newAuthzConf;
+  }
+
+  @Override
+  public abstract ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, ASTNode ast)
+      throws SemanticException;
+
+  /**
+   * Post analyze hook that invokes hive auth bindings
+   */
+  @Override
+  public abstract void postAnalyze(HiveSemanticAnalyzerHookContext context,
+      List<Task<? extends Serializable>> rootTasks) throws SemanticException;
+
+  protected void executeOnFailureHooks(HiveSemanticAnalyzerHookContext context,
+      HiveOperation hiveOp, AuthorizationException e) {
+    SentryOnFailureHookContext hookCtx = new SentryOnFailureHookContextImpl(
+        context.getCommand(), context.getInputs(), context.getOutputs(),
+        hiveOp, currDB, currTab, udfURIs, null, context.getUserName(),
+        context.getIpAddress(), e, context.getConf());
+    String csHooks = authzConf.get(
+        HiveAuthzConf.AuthzConfVars.AUTHZ_ONFAILURE_HOOKS.getVar(), "").trim();
+
+    try {
+      for (Hook aofh : getHooks(csHooks)) {
+        ((SentryOnFailureHook)aofh).run(hookCtx);
+      }
+    } catch (Exception ex) {
+      LOG.error("Error executing hook:", ex);
+    }
+  }
+
+  /**
+   * The command 'create function ... using jar <jar resources>' can create a function
+   * with the supplied jar resources in the command, which is translated into ASTNode being
+   * [functionName functionClass resourceList] and resourceList being [resourceType resourcePath].
+   * This function collects all the jar paths for the supplied jar resources.
+   *
+   * @param ast the AST node for the command
+   * @return    the jar path list if any or an empty list
+   */
+  protected List<String> getFunctionJars(ASTNode ast) {
+    ASTNode resourcesNode = (ASTNode) ast.getFirstChildWithType(HiveParser.TOK_RESOURCE_LIST);
+
+    List<String> resources = new ArrayList<String>();
+    if (resourcesNode != null) {
+      for (int idx = 0; idx < resourcesNode.getChildCount(); ++idx) {
+        ASTNode resNode = (ASTNode) resourcesNode.getChild(idx);
+        ASTNode resTypeNode = (ASTNode) resNode.getChild(0);
+        ASTNode resUriNode = (ASTNode) resNode.getChild(1);
+        if (resTypeNode.getType() == HiveParser.TOK_JAR) {
+          resources.add(PlanUtils.stripQuotes(resUriNode.getText()));
+        }
+      }
+    }
+
+    return resources;
+  }
+
+  @VisibleForTesting
+  protected static AccessURI extractPartition(ASTNode ast) throws SemanticException {
+    for (int i = 0; i < ast.getChildCount(); i++) {
+      ASTNode child = (ASTNode)ast.getChild(i);
+      if (child.getToken().getType() == HiveParser.TOK_PARTITIONLOCATION &&
+          child.getChildCount() == 1) {
+        return parseURI(BaseSemanticAnalyzer.
+          unescapeSQLString(child.getChild(0).getText()));
+      }
+    }
+    return null;
+  }
+
+  @VisibleForTesting
+  protected static AccessURI parseURI(String uri) throws SemanticException {
+    return parseURI(uri, false);
+  }
+
+  @VisibleForTesting
+  protected static AccessURI parseURI(String uri, boolean isLocal)
+      throws SemanticException {
+    try {
+      HiveConf conf = SessionState.get().getConf();
+      String warehouseDir = conf.getVar(ConfVars.METASTOREWAREHOUSE);
+      Path warehousePath = new Path(warehouseDir);
+
+      // If warehousePath is an absolute path and a scheme is null and authority is null as well,
+      // qualified it with default file system scheme and authority.
+      if (warehousePath.isAbsoluteAndSchemeAuthorityNull()) {
+        URI defaultUri = FileSystem.getDefaultUri(conf);
+        warehousePath = warehousePath.makeQualified(defaultUri, warehousePath);
+        warehouseDir = warehousePath.toUri().toString();
+      }
+      return new AccessURI(PathUtils.parseURI(warehouseDir, uri, isLocal));
+    } catch (Exception e) {
+      throw new SemanticException("Error parsing URI " + uri + ": " +
+        e.getMessage(), e);
+    }
+  }
+
+  // Find the current database for session
+  protected Database getCanonicalDb() {
+    return new Database(SessionState.get().getCurrentDatabase());
+  }
+
+  protected void extractDbTableNameFromTOKTABLE(ASTNode astNode) throws SemanticException{
+    String[] fqTableName = BaseSemanticAnalyzer.getQualifiedTableName(astNode);
+    Preconditions.checkArgument(fqTableName.length == 2, "BaseSemanticAnalyzer.getQualifiedTableName should return " +
+            "an array with dbName and tableName");
+    currOutDB = new Database(fqTableName[0]);
+    currOutTab = new Table(fqTableName[1]);
+  }
+
+  /*TODO: Deprecate */
+  protected Database extractDatabase(ASTNode ast) throws SemanticException {
+    String tableName = BaseSemanticAnalyzer.getUnescapedName(ast);
+    if (tableName.contains(".")) {
+      return new Database(tableName.split("\\.")[0]);
+    } else {
+      return getCanonicalDb();
+    }
+  }
+  /*TODO: Deprecate */
+  protected Table extractTable(ASTNode ast) throws SemanticException {
+    String tableName = BaseSemanticAnalyzer.getUnescapedName(ast);
+    if (tableName.contains(".")) {
+      return new Table(tableName.split("\\.")[1]);
+    } else {
+      return new Table(tableName);
+    }
+  }
+
+  public static void runFailureHook(SentryOnFailureHookContext hookContext,
+      String csHooks) {
+    try {
+      for (Hook aofh : getHooks(csHooks)) {
+        ((SentryOnFailureHook) aofh).run(hookContext);
+      }
+    } catch (Exception ex) {
+      LOG.error("Error executing hook:", ex);
+    }
+  }
+  /**
+   * Convert the input/output entities into authorizables. generate
+   * authorizables for cases like Database and metadata operations where the
+   * compiler doesn't capture entities. invoke the hive binding to validate
+   * permissions
+   *
+   * @param context
+   * @param stmtAuthObject
+   * @param stmtOperation
+   * @throws AuthorizationException
+   */
+  protected void authorizeWithHiveBindings(HiveSemanticAnalyzerHookContext context,
+      HiveAuthzPrivileges stmtAuthObject, HiveOperation stmtOperation) throws  AuthorizationException {
+    Set<ReadEntity> inputs = context.getInputs();
+    Set<WriteEntity> outputs = context.getOutputs();
+    List<List<DBModelAuthorizable>> inputHierarchy = new ArrayList<List<DBModelAuthorizable>>();
+    List<List<DBModelAuthorizable>> outputHierarchy = new ArrayList<List<DBModelAuthorizable>>();
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("stmtAuthObject.getOperationScope() = " + stmtAuthObject.getOperationScope());
+      LOG.debug("context.getInputs() = " + context.getInputs());
+      LOG.debug("context.getOutputs() = " + context.getOutputs());
+    }
+
+    // Workaround to allow DESCRIBE <table> to be executed with only column-level privileges, while
+    // still authorizing DESCRIBE [EXTENDED|FORMATTED] as table-level.
+    // This is done by treating DESCRIBE <table> the same as SHOW COLUMNS, which only requires column
+    // level privs.
+    if (isDescTableBasic) {
+      stmtAuthObject = columnMetaDataPrivilege;
+    }
+
+    switch (stmtAuthObject.getOperationScope()) {
+
+    case SERVER :
+      // validate server level privileges if applicable. Eg create UDF,register jar etc ..
+      List<DBModelAuthorizable> serverHierarchy = new ArrayList<DBModelAuthorizable>();
+      serverHierarchy.add(hiveAuthzBinding.getAuthServer());
+      inputHierarchy.add(serverHierarchy);
+      break;
+    case DATABASE:
+      // workaround for database scope statements (create/alter/drop db)
+      List<DBModelAuthorizable> dbHierarchy = new ArrayList<DBModelAuthorizable>();
+      dbHierarchy.add(hiveAuthzBinding.getAuthServer());
+      dbHierarchy.add(currDB);
+      inputHierarchy.add(dbHierarchy);
+
+      if (currOutDB != null) {
+        List<DBModelAuthorizable> outputDbHierarchy = new ArrayList<DBModelAuthorizable>();
+        outputDbHierarchy.add(hiveAuthzBinding.getAuthServer());
+        outputDbHierarchy.add(currOutDB);
+        outputHierarchy.add(outputDbHierarchy);
+      } else {
+        outputHierarchy.add(dbHierarchy);
+      }
+
+      getInputHierarchyFromInputs(inputHierarchy, inputs);
+
+      if (serdeURI != null) {
+        List<DBModelAuthorizable> serdeUriHierarchy = new ArrayList<DBModelAuthorizable>();
+        serdeUriHierarchy.add(hiveAuthzBinding.getAuthServer());
+        serdeUriHierarchy.add(serdeURI);
+        outputHierarchy.add(serdeUriHierarchy);
+      }
+      break;
+    case TABLE:
+      // workaround for add partitions
+      if(partitionURI != null) {
+        inputHierarchy.add(ImmutableList.of(hiveAuthzBinding.getAuthServer(), partitionURI));
+      }
+
+      getInputHierarchyFromInputs(inputHierarchy, inputs);
+      for (WriteEntity writeEntity: outputs) {
+        if (filterWriteEntity(writeEntity)) {
+          continue;
+        }
+        List<DBModelAuthorizable> entityHierarchy = new ArrayList<DBModelAuthorizable>();
+        entityHierarchy.add(hiveAuthzBinding.getAuthServer());
+        entityHierarchy.addAll(getAuthzHierarchyFromEntity(writeEntity));
+        outputHierarchy.add(entityHierarchy);
+      }
+      // workaround for metadata queries.
+      // Capture the table name in pre-analyze and include that in the input entity list
+      if (currTab != null) {
+        List<DBModelAuthorizable> externalAuthorizableHierarchy = new ArrayList<DBModelAuthorizable>();
+        externalAuthorizableHierarchy.add(hiveAuthzBinding.getAuthServer());
+        externalAuthorizableHierarchy.add(currDB);
+        externalAuthorizableHierarchy.add(currTab);
+        inputHierarchy.add(externalAuthorizableHierarchy);
+      }
+
+
+
+      // workaround for DDL statements
+      // Capture the table name in pre-analyze and include that in the output entity list
+      if (currOutTab != null) {
+        List<DBModelAuthorizable> externalAuthorizableHierarchy = new ArrayList<DBModelAuthorizable>();
+        externalAuthorizableHierarchy.add(hiveAuthzBinding.getAuthServer());
+        externalAuthorizableHierarchy.add(currOutDB);
+        externalAuthorizableHierarchy.add(currOutTab);
+        outputHierarchy.add(externalAuthorizableHierarchy);
+      }
+
+      if (serdeURI != null) {
+        List<DBModelAuthorizable> serdeUriHierarchy = new ArrayList<DBModelAuthorizable>();
+        serdeUriHierarchy.add(hiveAuthzBinding.getAuthServer());
+        serdeUriHierarchy.add(serdeURI);
+        outputHierarchy.add(serdeUriHierarchy);
+      }
+
+      break;
+    case FUNCTION:
+      /* The 'FUNCTION' privilege scope currently used for
+       *  - CREATE TEMP FUNCTION
+       *  - DROP TEMP FUNCTION.
+       */
+      if (!udfURIs.isEmpty()) {
+        List<DBModelAuthorizable> udfUriHierarchy = new ArrayList<DBModelAuthorizable>();
+        udfUriHierarchy.add(hiveAuthzBinding.getAuthServer());
+        udfUriHierarchy.addAll(udfURIs);
+        inputHierarchy.add(udfUriHierarchy);
+        for (WriteEntity writeEntity : outputs) {
+          List<DBModelAuthorizable> entityHierarchy = new ArrayList<DBModelAuthorizable>();
+          entityHierarchy.add(hiveAuthzBinding.getAuthServer());
+          entityHierarchy.addAll(getAuthzHierarchyFromEntity(writeEntity));
+          outputHierarchy.add(entityHierarchy);
+        }
+      }
+      break;
+    case CONNECT:
+      /* The 'CONNECT' is an implicit privilege scope currently used for
+       *  - USE <db>
+       *  It's allowed when the user has any privilege on the current database. For application
+       *  backward compatibility, we allow (optional) implicit connect permission on 'default' db.
+       */
+      List<DBModelAuthorizable> connectHierarchy = new ArrayList<DBModelAuthorizable>();
+      connectHierarchy.add(hiveAuthzBinding.getAuthServer());
+      // by default allow connect access to default db
+      Table currTbl = Table.ALL;
+      Column currCol = Column.ALL;
+      if (DEFAULT_DATABASE_NAME.equalsIgnoreCase(currDB.getName()) &&
+          "false".equalsIgnoreCase(authzConf.
+              get(HiveAuthzConf.AuthzConfVars.AUTHZ_RESTRICT_DEFAULT_DB.getVar(), "false"))) {
+        currDB = Database.ALL;
+        currTbl = Table.SOME;
+      }
+
+      connectHierarchy.add(currDB);
+      connectHierarchy.add(currTbl);
+      connectHierarchy.add(currCol);
+
+      inputHierarchy.add(connectHierarchy);
+      outputHierarchy.add(connectHierarchy);
+      break;
+    case COLUMN:
+      for (ReadEntity readEntity: inputs) {
+        if (readEntity.getAccessedColumns() != null && !readEntity.getAccessedColumns().isEmpty()) {
+          addColumnHierarchy(inputHierarchy, readEntity);
+        } else {
+          List<DBModelAuthorizable> entityHierarchy = new ArrayList<DBModelAuthorizable>();
+          entityHierarchy.add(hiveAuthzBinding.getAuthServer());
+          entityHierarchy.addAll(getAuthzHierarchyFromEntity(readEntity));
+          entityHierarchy.add(Column.ALL);
+          inputHierarchy.add(entityHierarchy);
+        }
+      }
+      break;
+    default:
+      throw new AuthorizationException("Unknown operation scope type " +
+          stmtAuthObject.getOperationScope().toString());
+    }
+
+    HiveAuthzBinding binding = null;
+    try {
+      binding = getHiveBindingWithPrivilegeCache(hiveAuthzBinding, context.getUserName());
+    } catch (SemanticException e) {
+      // Will use the original hiveAuthzBinding
+      binding = hiveAuthzBinding;
+    }
+    // validate permission
+    binding.authorize(stmtOperation, stmtAuthObject, getCurrentSubject(context), inputHierarchy,
+        outputHierarchy);
+  }
+
+  // Build the hierarchy of authorizable object for the given entity type.
+  private List<DBModelAuthorizable> getAuthzHierarchyFromEntity(Entity entity) {
+    List<DBModelAuthorizable> objectHierarchy = new ArrayList<DBModelAuthorizable>();
+    switch (entity.getType()) {
+    case TABLE:
+      objectHierarchy.add(new Database(entity.getTable().getDbName()));
+      objectHierarchy.add(new Table(entity.getTable().getTableName()));
+      break;
+    case PARTITION:
+    case DUMMYPARTITION:
+      objectHierarchy.add(new Database(entity.getPartition().getTable().getDbName()));
+      objectHierarchy.add(new Table(entity.getPartition().getTable().getTableName()));
+      break;
+    case DFS_DIR:
+    case LOCAL_DIR:
+      try {
+        objectHierarchy.add(parseURI(entity.toString(),
+            entity.getType().equals(Entity.Type.LOCAL_DIR)));
+      } catch (Exception e) {
+        throw new AuthorizationException("Failed to get File URI", e);
+      }
+      break;
+    case DATABASE:
+    case FUNCTION:
+      // TODO use database entities from compiler instead of capturing from AST
+      break;
+    default:
+      throw new UnsupportedOperationException("Unsupported entity type " +
+          entity.getType().name());
+    }
+    return objectHierarchy;
+  }
+
+  /**
+   * Add column level hierarchy to inputHierarchy
+   *
+   * @param inputHierarchy
+   * @param entity
+   * @param sentryContext
+   */
+  protected void addColumnHierarchy(List<List<DBModelAuthorizable>> inputHierarchy,
+      ReadEntity entity) {
+    List<DBModelAuthorizable> entityHierarchy = new ArrayList<DBModelAuthorizable>();
+    entityHierarchy.add(hiveAuthzBinding.getAuthServer());
+    entityHierarchy.addAll(getAuthzHierarchyFromEntity(entity));
+
+    switch (entity.getType()) {
+    case TABLE:
+    case PARTITION:
+      List<String> cols = entity.getAccessedColumns();
+      for (String col : cols) {
+        List<DBModelAuthorizable> colHierarchy = new ArrayList<DBModelAuthorizable>(entityHierarchy);
+        colHierarchy.add(new Column(col));
+        inputHierarchy.add(colHierarchy);
+      }
+      break;
+    default:
+      inputHierarchy.add(entityHierarchy);
+    }
+  }
+
+  /**
+   * Get Authorizable from inputs and put into inputHierarchy
+   *
+   * @param inputHierarchy
+   * @param entity
+   * @param sentryContext
+   */
+  protected void getInputHierarchyFromInputs(List<List<DBModelAuthorizable>> inputHierarchy,
+      Set<ReadEntity> inputs) {
+    for (ReadEntity readEntity: inputs) {
+      // skip the tables/view that are part of expanded view definition
+      // skip the Hive generated dummy entities created for queries like 'select <expr>'
+      if (isChildTabForView(readEntity) || isDummyEntity(readEntity)) {
+        continue;
+      }
+      if (readEntity.getAccessedColumns() != null && !readEntity.getAccessedColumns().isEmpty()) {
+        addColumnHierarchy(inputHierarchy, readEntity);
+      } else {
+        List<DBModelAuthorizable> entityHierarchy = new ArrayList<DBModelAuthorizable>();
+        entityHierarchy.add(hiveAuthzBinding.getAuthServer());
+        entityHierarchy.addAll(getAuthzHierarchyFromEntity(readEntity));
+        inputHierarchy.add(entityHierarchy);
+      }
+    }
+  }
+
+  // Check if this write entity needs to skipped
+  private boolean filterWriteEntity(WriteEntity writeEntity)
+      throws AuthorizationException {
+    // skip URI validation for session scratch file URIs
+    if (writeEntity.isTempURI()) {
+      return true;
+    }
+    try {
+      if (writeEntity.getTyp().equals(Type.DFS_DIR)
+          || writeEntity.getTyp().equals(Type.LOCAL_DIR)) {
+        HiveConf conf = SessionState.get().getConf();
+        String warehouseDir = conf.getVar(ConfVars.METASTOREWAREHOUSE);
+        URI scratchURI = new URI(PathUtils.parseDFSURI(warehouseDir,
+          conf.getVar(HiveConf.ConfVars.SCRATCHDIR)));
+        URI requestURI = new URI(PathUtils.parseDFSURI(warehouseDir,
+          writeEntity.getLocation().getPath()));
+        LOG.debug("scratchURI = " + scratchURI + ", requestURI = " + requestURI);
+        if (PathUtils.impliesURI(scratchURI, requestURI)) {
+          return true;
+        }
+        URI localScratchURI = new URI(PathUtils.parseLocalURI(conf.getVar(HiveConf.ConfVars.LOCALSCRATCHDIR)));
+        URI localRequestURI = new URI(PathUtils.parseLocalURI(writeEntity.getLocation().getPath()));
+        LOG.debug("localScratchURI = " + localScratchURI + ", localRequestURI = " + localRequestURI);
+        if (PathUtils.impliesURI(localScratchURI, localRequestURI)) {
+          return true;
+        }
+      }
+    } catch (Exception e) {
+      throw new AuthorizationException("Failed to extract uri details", e);
+    }
+    return false;
+  }
+
+  public static List<String> filterShowTables(
+      HiveAuthzBinding hiveAuthzBinding, List<String> queryResult,
+      HiveOperation operation, String userName, String dbName)
+          throws SemanticException {
+    List<String> filteredResult = new ArrayList<String>();
+    Subject subject = new Subject(userName);
+    HiveAuthzPrivileges tableMetaDataPrivilege = new HiveAuthzPrivileges.AuthzPrivilegeBuilder().
+        addInputObjectPriviledge(AuthorizableType.Column, EnumSet.of(DBModelAction.SELECT, DBModelAction.INSERT)).
+        setOperationScope(HiveOperationScope.TABLE).
+        setOperationType(HiveOperationType.INFO).
+        build();
+
+    HiveAuthzBinding hiveBindingWithPrivilegeCache = getHiveBindingWithPrivilegeCache(hiveAuthzBinding, userName);
+
+    for (String tableName : queryResult) {
+      // if user has privileges on table, add to filtered list, else discard
+      Table table = new Table(tableName);
+      Database database;
+      database = new Database(dbName);
+
+      List<List<DBModelAuthorizable>> inputHierarchy = new ArrayList<List<DBModelAuthorizable>>();
+      List<List<DBModelAuthorizable>> outputHierarchy = new ArrayList<List<DBModelAuthorizable>>();
+      List<DBModelAuthorizable> externalAuthorizableHierarchy = new ArrayList<DBModelAuthorizable>();
+      externalAuthorizableHierarchy.add(hiveAuthzBinding.getAuthServer());
+      externalAuthorizableHierarchy.add(database);
+      externalAuthorizableHierarchy.add(table);
+      externalAuthorizableHierarchy.add(Column.ALL);
+      inputHierarchy.add(externalAuthorizableHierarchy);
+
+      try {
+        // do the authorization by new HiveAuthzBinding with PrivilegeCache
+        hiveBindingWithPrivilegeCache.authorize(operation, tableMetaDataPrivilege, subject,
+            inputHierarchy, outputHierarchy);
+        filteredResult.add(table.getName());
+      } catch (AuthorizationException e) {
+        // squash the exception, user doesn't have privileges, so the table is
+        // not added to
+        // filtered list.
+      }
+    }
+    return filteredResult;
+  }
+
+  public static List<FieldSchema> filterShowColumns(
+      HiveAuthzBinding hiveAuthzBinding, List<FieldSchema> cols,
+      HiveOperation operation, String userName, String tableName, String dbName)
+          throws SemanticException {
+    List<FieldSchema> filteredResult = new ArrayList<FieldSchema>();
+    Subject subject = new Subject(userName);
+    HiveAuthzBinding hiveBindingWithPrivilegeCache = getHiveBindingWithPrivilegeCache(hiveAuthzBinding, userName);
+
+    Database database = new Database(dbName);
+    Table table = new Table(tableName);
+    for (FieldSchema col : cols) {
+      // if user has privileges on column, add to filtered list, else discard
+      List<List<DBModelAuthorizable>> inputHierarchy = new ArrayList<List<DBModelAuthorizable>>();
+      List<List<DBModelAuthorizable>> outputHierarchy = new ArrayList<List<DBModelAuthorizable>>();
+      List<DBModelAuthorizable> externalAuthorizableHierarchy = new ArrayList<DBModelAuthorizable>();
+      externalAuthorizableHierarchy.add(hiveAuthzBinding.getAuthServer());
+      externalAuthorizableHierarchy.add(database);
+      externalAuthorizableHierarchy.add(table);
+      externalAuthorizableHierarchy.add(new Column(col.getName()));
+      inputHierarchy.add(externalAuthorizableHierarchy);
+
+      try {
+        // do the authorization by new HiveAuthzBinding with PrivilegeCache
+        hiveBindingWithPrivilegeCache.authorize(operation, columnMetaDataPrivilege, subject,
+            inputHierarchy, outputHierarchy);
+        filteredResult.add(col);
+      } catch (AuthorizationException e) {
+        // squash the exception, user doesn't have privileges, so the column is
+        // not added to
+        // filtered list.
+      }
+    }
+    return filteredResult;
+  }
+
+  public static List<String> filterShowDatabases(
+      HiveAuthzBinding hiveAuthzBinding, List<String> queryResult,
+      HiveOperation operation, String userName) throws SemanticException {
+    List<String> filteredResult = new ArrayList<String>();
+    Subject subject = new Subject(userName);
+    HiveAuthzBinding hiveBindingWithPrivilegeCache = getHiveBindingWithPrivilegeCache(hiveAuthzBinding, userName);
+
+    HiveAuthzPrivileges anyPrivilege = new HiveAuthzPrivileges.AuthzPrivilegeBuilder().
+        addInputObjectPriviledge(AuthorizableType.Column, EnumSet.of(DBModelAction.SELECT, DBModelAction.INSERT)).
+        addInputObjectPriviledge(AuthorizableType.URI, EnumSet.of(DBModelAction.SELECT)).
+        setOperationScope(HiveOperationScope.CONNECT).
+        setOperationType(HiveOperationType.QUERY).
+        build();
+
+    for (String dbName:queryResult) {
+      // if user has privileges on database, add to filtered list, else discard
+      Database database = null;
+
+      // if default is not restricted, continue
+      if (DEFAULT_DATABASE_NAME.equalsIgnoreCase(dbName) && "false".equalsIgnoreCase(
+        hiveAuthzBinding.getAuthzConf().get(
+              HiveAuthzConf.AuthzConfVars.AUTHZ_RESTRICT_DEFAULT_DB.getVar(),
+              "false"))) {
+        filteredResult.add(DEFAULT_DATABASE_NAME);
+        continue;
+      }
+
+      database = new Database(dbName);
+
+      List<List<DBModelAuthorizable>> inputHierarchy = new ArrayList<List<DBModelAuthorizable>>();
+      List<List<DBModelAuthorizable>> outputHierarchy = new ArrayList<List<DBModelAuthorizable>>();
+      List<DBModelAuthorizable> externalAuthorizableHierarchy = new ArrayList<DBModelAuthorizable>();
+      externalAuthorizableHierarchy.add(hiveAuthzBinding.getAuthServer());
+      externalAuthorizableHierarchy.add(database);
+      externalAuthorizableHierarchy.add(Table.ALL);
+      externalAuthorizableHierarchy.add(Column.ALL);
+      inputHierarchy.add(externalAuthorizableHierarchy);
+
+      try {
+        // do the authorization by new HiveAuthzBinding with PrivilegeCache
+        hiveBindingWithPrivilegeCache.authorize(operation, anyPrivilege, subject,
+            inputHierarchy, outputHierarchy);
+        filteredResult.add(database.getName());
+      } catch (AuthorizationException e) {
+        // squash the exception, user doesn't have privileges, so the table is
+        // not added to
+        // filtered list.
+      }
+    }
+
+    return filteredResult;
+  }
+
+  /**
+   * Check if the given read entity is a table that has parents of type Table
+   * Hive compiler performs a query rewrite by replacing view with its definition. In the process, tt captures both
+   * the original view and the tables/view that it selects from .
+   * The access authorization is only interested in the top level views and not the underlying tables.
+   * @param readEntity
+   * @return
+   */
+  private boolean isChildTabForView(ReadEntity readEntity) {
+    // If this is a table added for view, then we need to skip that
+    if (!readEntity.getType().equals(Type.TABLE) && !readEntity.getType().equals(Type.PARTITION)) {
+      return false;
+    }
+    if (readEntity.getParents() != null && readEntity.getParents().size() > 0) {
+      for (ReadEntity parentEntity : readEntity.getParents()) {
+        if (!parentEntity.getType().equals(Type.TABLE)) {
+          return false;
+        }
+      }
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Returns the hooks specified in a configuration variable.  The hooks are returned in a list in
+   * the order they were specified in the configuration variable.
+   *
+   * @param hookConfVar The configuration variable specifying a comma separated list of the hook
+   *                    class names.
+   * @return            A list of the hooks, in the order they are listed in the value of hookConfVar
+   * @throws Exception
+   */
+  private static <T extends Hook> List<T> getHooks(String csHooks) throws Exception {
+
+    List<T> hooks = new ArrayList<T>();
+    if (csHooks.isEmpty()) {
+      return hooks;
+    }
+    for (String hookClass : Splitter.on(",").omitEmptyStrings().trimResults().split(csHooks)) {
+      try {
+        @SuppressWarnings("unchecked")
+        T hook =
+            (T) Class.forName(hookClass, true, JavaUtils.getClassLoader()).newInstance();
+        hooks.add(hook);
+      } catch (ClassNotFoundException e) {
+        LOG.error(hookClass + " Class not found:" + e.getMessage());
+        throw e;
+      }
+    }
+
+    return hooks;
+  }
+
+  // Check if the given entity is identified as dummy by Hive compilers.
+  private boolean isDummyEntity(Entity entity) {
+    return entity.isDummy();
+  }
+
+  // create hiveBinding with PrivilegeCache
+  private static HiveAuthzBinding getHiveBindingWithPrivilegeCache(HiveAuthzBinding hiveAuthzBinding,
+      String userName) throws SemanticException {
+    // get the original HiveAuthzBinding, and get the user's privileges by AuthorizationProvider
+    AuthorizationProvider authProvider = hiveAuthzBinding.getCurrentAuthProvider();
+    Set<String> userPrivileges =
+        authProvider.getPolicyEngine().getPrivileges(
+            authProvider.getGroupMapping().getGroups(userName), Sets.newHashSet(userName),
+            hiveAuthzBinding.getActiveRoleSet(), hiveAuthzBinding.getAuthServer());
+
+    // create PrivilegeCache using user's privileges
+    PrivilegeCache privilegeCache = new SimplePrivilegeCache(userPrivileges);
+    try {
+      // create new instance of HiveAuthzBinding whose backend provider should be SimpleCacheProviderBackend
+      return new HiveAuthzBinding(HiveAuthzBinding.HiveHook.HiveServer2, hiveAuthzBinding.getHiveConf(),
+              hiveAuthzBinding.getAuthzConf(), privilegeCache);
+    } catch (Exception e) {
+      LOG.error("Can not create HiveAuthzBinding with privilege cache.");
+      throw new SemanticException(e);
+    }
+  }
+
+  private static boolean hasPrefixMatch(List<String> prefixList, final String str) {
+    for (String prefix : prefixList) {
+      if (str.startsWith(prefix)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * Set the Serde URI privileges. If the URI privileges are not set, which serdeURI will be null,
+   * the URI authorization checks will be skipped.
+   */
+  protected void setSerdeURI(String serdeClassName) throws SemanticException {
+    if (!serdeURIPrivilegesEnabled) {
+      return;
+    }
+
+    // WhiteList Serde Jar can be used by any users. WhiteList checking is
+    // done by comparing the Java package name. The assumption is cluster
+    // admin will ensure there is no Java namespace collision.
+    // e.g org.apache.hadoop.hive.serde2 is used by hive and cluster admin should
+    // ensure no custom Serde class is introduced under the same namespace.
+    if (!hasPrefixMatch(serdeWhiteList, serdeClassName)) {
+      try {
+        CodeSource serdeSrc =
+            Class.forName(serdeClassName, true, Utilities.getSessionSpecifiedClassLoader())
+                .getProtectionDomain().getCodeSource();
+        if (serdeSrc == null) {
+          throw new SemanticException("Could not resolve the jar for Serde class " + serdeClassName);
+        }
+
+        String serdeJar = serdeSrc.getLocation().getPath();
+        if (serdeJar == null || serdeJar.isEmpty()) {
+          throw new SemanticException("Could not find the jar for Serde class " + serdeClassName
+              + "to validate privileges");
+        }
+
+        serdeURI = parseURI(serdeSrc.getLocation().toString(), true);
+      } catch (ClassNotFoundException e) {
+        throw new SemanticException("Error retrieving Serde class:" + e.getMessage(), e);
+      }
+    }
+  }
+
+  protected HiveOperation getCurrentHiveStmtOp() {
+    SessionState sessState = SessionState.get();
+    if (sessState == null) {
+      // TODO: Warn
+      return null;
+    }
+    return sessState.getHiveOperation();
+  }
+
+  protected Subject getCurrentSubject(HiveSemanticAnalyzerHookContext context) {
+    // Extract the username from the hook context
+    return new Subject(context.getUserName());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/b19cb01b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/HiveAuthzBindingHookV2.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/HiveAuthzBindingHookV2.java b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/HiveAuthzBindingHookV2.java
index 018ebf3..fac6ba3 100644
--- a/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/HiveAuthzBindingHookV2.java
+++ b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/HiveAuthzBindingHookV2.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.sentry.binding.hive.HiveAuthzBindingHookBase;
+import org.apache.sentry.binding.hive.HiveAuthzBindingHookBaseV2;
 import org.apache.sentry.binding.hive.authz.HiveAuthzPrivileges;
 import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
 import org.apache.sentry.core.common.Subject;
@@ -42,7 +42,7 @@ import org.apache.sentry.core.model.db.Database;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HiveAuthzBindingHookV2 extends HiveAuthzBindingHookBase {
+public class HiveAuthzBindingHookV2 extends HiveAuthzBindingHookBaseV2 {
   private static final Logger LOG = LoggerFactory
       .getLogger(HiveAuthzBindingHookV2.class);
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/b19cb01b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/HiveAuthzBindingSessionHookV2.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/HiveAuthzBindingSessionHookV2.java b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/HiveAuthzBindingSessionHookV2.java
index b95bf60..9106911 100644
--- a/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/HiveAuthzBindingSessionHookV2.java
+++ b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/HiveAuthzBindingSessionHookV2.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hive.service.cli.HiveSQLException;
 import org.apache.hive.service.cli.session.HiveSessionHookContext;
-import org.apache.sentry.binding.hive.HiveAuthzBindingHookBase;
+import org.apache.sentry.binding.hive.HiveAuthzBindingHookBaseV2;
 import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
 
 import com.google.common.base.Joiner;
@@ -72,7 +72,7 @@ public class HiveAuthzBindingSessionHookV2 implements
     sessionConf.setBoolVar(ConfVars.HIVE_CAPTURE_TRANSFORM_ENTITY, true);
 
     // set security command list
-    HiveAuthzConf authzConf = HiveAuthzBindingHookBase.loadAuthzConf(sessionConf);
+    HiveAuthzConf authzConf = HiveAuthzBindingHookBaseV2.loadAuthzConf(sessionConf);
     String commandWhitelist =
         authzConf.get(HiveAuthzConf.HIVE_SENTRY_SECURITY_COMMAND_WHITELIST,
             HiveAuthzConf.HIVE_SENTRY_SECURITY_COMMAND_WHITELIST_DEFAULT);

http://git-wip-us.apache.org/repos/asf/sentry/blob/b19cb01b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/SentryAuthorizerFactory.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/SentryAuthorizerFactory.java b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/SentryAuthorizerFactory.java
index 485ac43..8b56c49 100644
--- a/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/SentryAuthorizerFactory.java
+++ b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/SentryAuthorizerFactory.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzPluginEx
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext;
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext.CLIENT_TYPE;
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactory;
-import org.apache.sentry.binding.hive.HiveAuthzBindingHookBase;
+import org.apache.sentry.binding.hive.HiveAuthzBindingHookBaseV2;
 import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
 import org.apache.sentry.binding.hive.v2.authorizer.DefaultSentryAccessController;
 import org.apache.sentry.binding.hive.v2.authorizer.DefaultSentryValidator;
@@ -49,7 +49,7 @@ public class SentryAuthorizerFactory implements HiveAuthorizerFactory {
           throws HiveAuthzPluginException {
     HiveAuthzSessionContext sessionContext;
     try {
-      this.authzConf = HiveAuthzBindingHookBase.loadAuthzConf(conf);
+      this.authzConf = HiveAuthzBindingHookBaseV2.loadAuthzConf(conf);
       sessionContext = applyTestSettings(ctx, conf);
       assertHiveCliAuthDisabled(conf, sessionContext);
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/sentry/blob/b19cb01b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/AuthorizingObjectStoreBaseV2.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/AuthorizingObjectStoreBaseV2.java b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/AuthorizingObjectStoreBaseV2.java
new file mode 100644
index 0000000..e8e1d23
--- /dev/null
+++ b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/AuthorizingObjectStoreBaseV2.java
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.binding.metastore;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.List;
+import java.util.Set;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ObjectStore;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.sentry.binding.hive.HiveAuthzBindingHookBaseV2;
+import org.apache.sentry.binding.hive.authz.HiveAuthzBinding;
+import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
+import org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/***
+ * This class is the wrapper of ObjectStore which is the interface between the
+ * application logic and the database store. Do the authorization or filter the
+ * result when processing the metastore request.
+ * eg:
+ * Callers will only receive the objects back which they have privileges to
+ * access.
+ * If there is a request for the object list(like getAllTables()), the result
+ * will be filtered to exclude object the requestor doesn't have privilege to
+ * access.
+ */
+public class AuthorizingObjectStoreBaseV2 extends ObjectStore {
+  private static ImmutableSet<String> serviceUsers;
+  private static HiveConf hiveConf;
+  private static HiveAuthzConf authzConf;
+  private static HiveAuthzBinding hiveAuthzBinding;
+  private static String NO_ACCESS_MESSAGE_TABLE = "Table does not exist or insufficient privileges to access: ";
+  private static String NO_ACCESS_MESSAGE_DATABASE = "Database does not exist or insufficient privileges to access: ";
+
+  @Override
+  public List<String> getDatabases(String pattern) throws MetaException {
+    return filterDatabases(super.getDatabases(pattern));
+  }
+
+  @Override
+  public List<String> getAllDatabases() throws MetaException {
+    return filterDatabases(super.getAllDatabases());
+  }
+
+  @Override
+  public Database getDatabase(String name) throws NoSuchObjectException {
+    Database db = super.getDatabase(name);
+    try {
+      if (filterDatabases(Lists.newArrayList(name)).isEmpty()) {
+        throw new NoSuchObjectException(getNoAccessMessageForDB(name));
+      }
+    } catch (MetaException e) {
+      throw new NoSuchObjectException("Failed to authorized access to " + name
+          + " : " + e.getMessage());
+    }
+    return db;
+  }
+
+  @Override
+  public Table getTable(String dbName, String tableName) throws MetaException {
+    Table table = super.getTable(dbName, tableName);
+    if (table == null
+        || filterTables(dbName, Lists.newArrayList(tableName)).isEmpty()) {
+      return null;
+    }
+    return table;
+  }
+
+  @Override
+  public Partition getPartition(String dbName, String tableName,
+      List<String> part_vals) throws MetaException, NoSuchObjectException {
+    if (filterTables(dbName, Lists.newArrayList(tableName)).isEmpty()) {
+      throw new NoSuchObjectException(getNoAccessMessageForTable(dbName, tableName));
+    }
+    return super.getPartition(dbName, tableName, part_vals);
+  }
+
+  @Override
+  public List<Partition> getPartitions(String dbName, String tableName,
+      int maxParts) throws MetaException, NoSuchObjectException {
+    if (filterTables(dbName, Lists.newArrayList(tableName)).isEmpty()) {
+      throw new MetaException(getNoAccessMessageForTable(dbName, tableName));
+    }
+    return super.getPartitions(dbName, tableName, maxParts);
+  }
+
+  @Override
+  public List<String> getTables(String dbName, String pattern)
+      throws MetaException {
+    return filterTables(dbName, super.getTables(dbName, pattern));
+  }
+ 
+  @Override
+  public List<Table> getTableObjectsByName(String dbname, List<String> tableNames)
+      throws MetaException, UnknownDBException {
+    return super.getTableObjectsByName(dbname, filterTables(dbname, tableNames));
+  }
+
+  @Override
+  public List<String> getAllTables(String dbName) throws MetaException {
+    return filterTables(dbName, super.getAllTables(dbName));
+  }
+
+  @Override
+  public List<String> listTableNamesByFilter(String dbName, String filter,
+      short maxTables) throws MetaException {
+    return filterTables(dbName,
+        super.listTableNamesByFilter(dbName, filter, maxTables));
+  }
+
+  @Override
+  public List<String> listPartitionNames(String dbName, String tableName,
+      short max_parts) throws MetaException {
+    if (filterTables(dbName, Lists.newArrayList(tableName)).isEmpty()) {
+      throw new MetaException(getNoAccessMessageForTable(dbName, tableName));
+    }
+    return super.listPartitionNames(dbName, tableName, max_parts);
+  }
+
+  @Override
+  public List<String> listPartitionNamesByFilter(String dbName,
+      String tableName, String filter, short max_parts) throws MetaException {
+    if (filterTables(dbName, Lists.newArrayList(tableName)).isEmpty()) {
+      throw new MetaException(getNoAccessMessageForTable(dbName, tableName));
+    }
+    return super.listPartitionNamesByFilter(dbName, tableName, filter,
+        max_parts);
+  }
+
+  @Override
+  public Index getIndex(String dbName, String origTableName, String indexName)
+      throws MetaException {
+    if (filterTables(dbName, Lists.newArrayList(origTableName)).isEmpty()) {
+      throw new MetaException(getNoAccessMessageForTable(dbName, origTableName));
+    }
+    return super.getIndex(dbName, origTableName, indexName);
+  }
+
+  @Override
+  public List<Index> getIndexes(String dbName, String origTableName, int max)
+      throws MetaException {
+    if (filterTables(dbName, Lists.newArrayList(origTableName)).isEmpty()) {
+      throw new MetaException(getNoAccessMessageForTable(dbName, origTableName));
+    }
+    return super.getIndexes(dbName, origTableName, max);
+  }
+
+  @Override
+  public List<String> listIndexNames(String dbName, String origTableName,
+      short max) throws MetaException {
+    if (filterTables(dbName, Lists.newArrayList(origTableName)).isEmpty()) {
+      throw new MetaException(getNoAccessMessageForTable(dbName, origTableName));
+    }
+    return super.listIndexNames(dbName, origTableName, max);
+  }
+
+  @Override
+  public List<Partition> getPartitionsByFilter(String dbName,
+      String tblName, String filter, short maxParts) throws MetaException,
+      NoSuchObjectException {
+    if (filterTables(dbName, Lists.newArrayList(tblName)).isEmpty()) {
+      throw new MetaException(getNoAccessMessageForTable(dbName, tblName));
+    }
+    return super.getPartitionsByFilter(dbName, tblName, filter, maxParts);
+  }
+
+  @Override
+  public List<Partition> getPartitionsByNames(String dbName, String tblName,
+      List<String> partNames) throws MetaException, NoSuchObjectException {
+    if (filterTables(dbName, Lists.newArrayList(tblName)).isEmpty()) {
+      throw new MetaException(getNoAccessMessageForTable(dbName, tblName));
+    }
+    return super.getPartitionsByNames(dbName, tblName, partNames);
+  }
+
+  @Override
+  public Partition getPartitionWithAuth(String dbName, String tblName,
+      List<String> partVals, String user_name, List<String> group_names)
+      throws MetaException, NoSuchObjectException, InvalidObjectException {
+    if (filterTables(dbName, Lists.newArrayList(tblName)).isEmpty()) {
+      throw new MetaException(getNoAccessMessageForTable(dbName, tblName));
+    }
+    return super.getPartitionWithAuth(dbName, tblName, partVals, user_name,
+        group_names);
+  }
+
+  @Override
+  public List<Partition> getPartitionsWithAuth(String dbName, String tblName,
+      short maxParts, String userName, List<String> groupNames)
+      throws MetaException, InvalidObjectException {
+    if (filterTables(dbName, Lists.newArrayList(tblName)).isEmpty()) {
+      throw new MetaException(getNoAccessMessageForTable(dbName, tblName));
+    }
+    return super.getPartitionsWithAuth(dbName, tblName, maxParts, userName,
+        groupNames);
+  }
+
+  @Override
+  public List<String> listPartitionNamesPs(String dbName, String tblName,
+      List<String> part_vals, short max_parts) throws MetaException,
+      NoSuchObjectException {
+    if (filterTables(dbName, Lists.newArrayList(tblName)).isEmpty()) {
+      throw new MetaException(getNoAccessMessageForTable(dbName, tblName));
+    }
+    return super.listPartitionNamesPs(dbName, tblName, part_vals, max_parts);
+  }
+
+  @Override
+  public List<Partition> listPartitionsPsWithAuth(String dbName,
+      String tblName, List<String> part_vals, short max_parts, String userName,
+      List<String> groupNames) throws MetaException, InvalidObjectException,
+      NoSuchObjectException {
+    if (filterTables(dbName, Lists.newArrayList(tblName)).isEmpty()) {
+      throw new MetaException(getNoAccessMessageForTable(dbName, tblName));
+    }
+    return super.listPartitionsPsWithAuth(dbName, tblName, part_vals,
+        max_parts, userName, groupNames);
+  }
+
+  @Override
+  public ColumnStatistics getTableColumnStatistics(String dbName,
+      String tableName, List<String> colNames) throws MetaException,
+      NoSuchObjectException {
+    if (filterTables(dbName, Lists.newArrayList(tableName)).isEmpty()) {
+      throw new MetaException(getNoAccessMessageForTable(dbName, tableName));
+    }
+    return super.getTableColumnStatistics(dbName, tableName, colNames);
+  }
+
+  @Override
+  public List<ColumnStatistics> getPartitionColumnStatistics(
+      String dbName, String tblName, List<String> partNames,
+      List<String> colNames) throws MetaException, NoSuchObjectException {
+    if (filterTables(dbName, Lists.newArrayList(tblName)).isEmpty()) {
+      throw new MetaException(getNoAccessMessageForTable(dbName, tblName));
+    }
+    return super.getPartitionColumnStatistics(dbName, tblName, partNames,
+        colNames);
+  }
+
+  /**
+   * Invoke Hive database filtering that removes the entries which use has no
+   * privileges to access
+   * @param dbList
+   * @return
+   * @throws MetaException
+   */
+  private List<String> filterDatabases(List<String> dbList)
+      throws MetaException {
+    if (needsAuthorization(getUserName())) {
+      try {
+        return HiveAuthzBindingHookBaseV2.filterShowDatabases(getHiveAuthzBinding(),
+            dbList, HiveOperation.SHOWDATABASES, getUserName());
+      } catch (SemanticException e) {
+        throw new MetaException("Error getting DB list " + e.getMessage());
+      }
+    } else {
+      return dbList;
+    }
+  }
+
+  /**
+   * Invoke Hive table filtering that removes the entries which use has no
+   * privileges to access
+   * @param dbList
+   * @return
+   * @throws MetaException
+   */
+  protected List<String> filterTables(String dbName, List<String> tabList)
+      throws MetaException {
+    if (needsAuthorization(getUserName())) {
+      try {
+        return HiveAuthzBindingHookBaseV2.filterShowTables(getHiveAuthzBinding(),
+            tabList, HiveOperation.SHOWTABLES, getUserName(), dbName);
+      } catch (SemanticException e) {
+        throw new MetaException("Error getting Table list " + e.getMessage());
+      }
+    } else {
+      return tabList;
+    }
+  }
+
+  /**
+   * load Hive auth provider
+   *
+   * @return
+   * @throws MetaException
+   */
+  private HiveAuthzBinding getHiveAuthzBinding() throws MetaException {
+    if (hiveAuthzBinding == null) {
+      try {
+        hiveAuthzBinding = new HiveAuthzBinding(HiveAuthzBinding.HiveHook.HiveMetaStore,
+            getHiveConf(), getAuthzConf());
+      } catch (Exception e) {
+        throw new MetaException("Failed to load Hive binding " + e.getMessage());
+      }
+    }
+    return hiveAuthzBinding;
+  }
+
+  private ImmutableSet<String> getServiceUsers() throws MetaException {
+    if (serviceUsers == null) {
+      serviceUsers = ImmutableSet.copyOf(toTrimed(Sets.newHashSet(getAuthzConf().getStrings(
+          AuthzConfVars.AUTHZ_METASTORE_SERVICE_USERS.getVar(), new String[] { "" }))));
+    }
+    return serviceUsers;
+  }
+
+  private HiveConf getHiveConf() {
+    if (hiveConf == null) {
+      hiveConf = new HiveConf(getConf(), this.getClass());
+    }
+    return hiveConf;
+  }
+
+  private HiveAuthzConf getAuthzConf() throws MetaException {
+    if (authzConf == null) {
+      String hiveAuthzConf = getConf().get(HiveAuthzConf.HIVE_SENTRY_CONF_URL);
+      if (hiveAuthzConf == null
+          || (hiveAuthzConf = hiveAuthzConf.trim()).isEmpty()) {
+        throw new MetaException("Configuration key "
+            + HiveAuthzConf.HIVE_SENTRY_CONF_URL + " value '" + hiveAuthzConf
+            + "' is invalid.");
+      }
+      try {
+        authzConf = new HiveAuthzConf(new URL(hiveAuthzConf));
+      } catch (MalformedURLException e) {
+        throw new MetaException("Configuration key "
+            + HiveAuthzConf.HIVE_SENTRY_CONF_URL
+            + " specifies a malformed URL '" + hiveAuthzConf + "' "
+            + e.getMessage());
+      }
+    }
+    return authzConf;
+  }
+
+  /**
+   * Extract the user from underlying auth subsystem
+   * @return
+   * @throws MetaException
+   */
+  private String getUserName() throws MetaException {
+    try {
+      return Utils.getUGI().getShortUserName();
+    } catch (LoginException e) {
+      throw new MetaException("Failed to get username " + e.getMessage());
+    } catch (IOException e) {
+      throw new MetaException("Failed to get username " + e.getMessage());
+    }
+  }
+
+  /**
+   * Check if the give user needs to be validated.
+   * @param userName
+   * @return
+   */
+  private boolean needsAuthorization(String userName) throws MetaException {
+    return !getServiceUsers().contains(userName.trim());
+  }
+
+  private static Set<String> toTrimed(Set<String> s) {
+    Set<String> result = Sets.newHashSet();
+    for (String v : s) {
+      result.add(v.trim());
+    }
+    return result;
+  }
+
+  protected String getNoAccessMessageForTable(String dbName, String tableName) {
+    return NO_ACCESS_MESSAGE_TABLE + "<" + dbName + ">.<" + tableName + ">";
+  }
+
+  private String getNoAccessMessageForDB(String dbName) {
+    return NO_ACCESS_MESSAGE_DATABASE + "<" + dbName + ">";
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/b19cb01b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/AuthorizingObjectStoreV2.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/AuthorizingObjectStoreV2.java b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/AuthorizingObjectStoreV2.java
index 913bd00..45edf43 100644
--- a/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/AuthorizingObjectStoreV2.java
+++ b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/AuthorizingObjectStoreV2.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.shims.Utils;
-import org.apache.sentry.binding.hive.HiveAuthzBindingHookBase;
+import org.apache.sentry.binding.hive.HiveAuthzBindingHookBaseV2;
 import org.apache.sentry.binding.hive.authz.HiveAuthzBinding;
 import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
 import org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars;
@@ -285,7 +285,7 @@ public class AuthorizingObjectStoreV2 extends ObjectStore {
       throws MetaException {
     if (needsAuthorization(getUserName())) {
       try {
-        return HiveAuthzBindingHookBase.filterShowDatabases(getHiveAuthzBinding(),
+        return HiveAuthzBindingHookBaseV2.filterShowDatabases(getHiveAuthzBinding(),
             dbList, HiveOperation.SHOWDATABASES, getUserName());
       } catch (SemanticException e) {
         throw new MetaException("Error getting DB list " + e.getMessage());
@@ -306,7 +306,7 @@ public class AuthorizingObjectStoreV2 extends ObjectStore {
       throws MetaException {
     if (needsAuthorization(getUserName())) {
       try {
-        return HiveAuthzBindingHookBase.filterShowTables(getHiveAuthzBinding(),
+        return HiveAuthzBindingHookBaseV2.filterShowTables(getHiveAuthzBinding(),
             tabList, HiveOperation.SHOWTABLES, getUserName(), dbName);
       } catch (SemanticException e) {
         throw new MetaException("Error getting Table list " + e.getMessage());

http://git-wip-us.apache.org/repos/asf/sentry/blob/b19cb01b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/MetastoreAuthzBindingBaseV2.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/MetastoreAuthzBindingBaseV2.java b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/MetastoreAuthzBindingBaseV2.java
new file mode 100644
index 0000000..67413fa
--- /dev/null
+++ b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/MetastoreAuthzBindingBaseV2.java
@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.binding.metastore;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStorePreEventListener;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.events.PreAddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.PreAlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.PreAlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.PreCreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.PreDropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.PreDropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.PreDropTableEvent;
+import org.apache.hadoop.hive.metastore.events.PreEventContext;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.sentry.binding.hive.authz.HiveAuthzBinding;
+import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
+import org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars;
+import org.apache.sentry.core.common.utils.PathUtils;
+import org.apache.sentry.core.model.db.AccessURI;
+import org.apache.sentry.core.model.db.DBModelAuthorizable;
+import org.apache.sentry.core.model.db.Database;
+import org.apache.sentry.core.model.db.Server;
+import org.apache.sentry.core.model.db.Table;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+/**
+ * Sentry binding for Hive Metastore. The binding is integrated into Metastore
+ * via the pre-event listener which are fired prior to executing the metadata
+ * action. This point we are only authorizing metadata writes since the listners
+ * are not fired from read events. Each action builds a input and output
+ * hierarchy as per the objects used in the given operations. This is then
+ * passed down to the hive binding which handles the authorization. This ensures
+ * that we follow the same privilege model and policies.
+ */
+public abstract class MetastoreAuthzBindingBaseV2 extends MetaStorePreEventListener {
+
+  /**
+   * Build the set of object hierarchies ie fully qualified db model objects
+   */
+  protected static class HierarcyBuilder {
+    private List<List<DBModelAuthorizable>> authHierarchy;
+
+    public HierarcyBuilder() {
+      authHierarchy = new ArrayList<List<DBModelAuthorizable>>();
+    }
+
+    public HierarcyBuilder addServerToOutput(Server server) {
+      List<DBModelAuthorizable> serverHierarchy = new ArrayList<DBModelAuthorizable>();
+      serverHierarchy.add(server);
+      authHierarchy.add(serverHierarchy);
+      return this;
+    }
+
+    public HierarcyBuilder addDbToOutput(Server server, String dbName) {
+      List<DBModelAuthorizable> dbHierarchy = new ArrayList<DBModelAuthorizable>();
+      addServerToOutput(server);
+      dbHierarchy.add(server);
+      dbHierarchy.add(new Database(dbName));
+      authHierarchy.add(dbHierarchy);
+      return this;
+    }
+
+    public HierarcyBuilder addUriToOutput(Server server, String uriPath,
+        String warehouseDirPath) throws MetaException {
+      List<DBModelAuthorizable> uriHierarchy = new ArrayList<DBModelAuthorizable>();
+      addServerToOutput(server);
+      uriHierarchy.add(server);
+      try {
+        uriHierarchy.add(new AccessURI(PathUtils.parseDFSURI(warehouseDirPath,
+            uriPath)));
+      } catch (URISyntaxException e) {
+        throw new MetaException("Error paring the URI " + e.getMessage());
+      }
+      authHierarchy.add(uriHierarchy);
+      return this;
+    }
+
+    public HierarcyBuilder addTableToOutput(Server server, String dbName,
+        String tableName) {
+      List<DBModelAuthorizable> tableHierarchy = new ArrayList<DBModelAuthorizable>();
+      addDbToOutput(server, dbName);
+      tableHierarchy.add(server);
+      tableHierarchy.add(new Database(dbName));
+      tableHierarchy.add(new Table(tableName));
+      authHierarchy.add(tableHierarchy);
+      return this;
+    }
+
+    public List<List<DBModelAuthorizable>> build() {
+      return authHierarchy;
+    }
+  }
+
+  private HiveAuthzConf authzConf;
+  private final Server authServer;
+  private final HiveConf hiveConf;
+  private final ImmutableSet<String> serviceUsers;
+  private HiveAuthzBinding hiveAuthzBinding;
+  private final String warehouseDir;
+  protected static boolean sentryCacheOutOfSync = false;
+
+  public MetastoreAuthzBindingBaseV2(Configuration config) throws Exception {
+    super(config);
+    String hiveAuthzConf = config.get(HiveAuthzConf.HIVE_SENTRY_CONF_URL);
+    if (hiveAuthzConf == null
+        || (hiveAuthzConf = hiveAuthzConf.trim()).isEmpty()) {
+      throw new IllegalArgumentException("Configuration key "
+          + HiveAuthzConf.HIVE_SENTRY_CONF_URL + " value '" + hiveAuthzConf
+          + "' is invalid.");
+    }
+    try {
+      authzConf = new HiveAuthzConf(new URL(hiveAuthzConf));
+    } catch (MalformedURLException e) {
+      throw new IllegalArgumentException("Configuration key "
+          + HiveAuthzConf.HIVE_SENTRY_CONF_URL + " specifies a malformed URL '"
+          + hiveAuthzConf + "'", e);
+    }
+    hiveConf = new HiveConf(config, this.getClass());
+    this.authServer = new Server(authzConf.get(AuthzConfVars.AUTHZ_SERVER_NAME
+        .getVar()));
+    serviceUsers = ImmutableSet.copyOf(toTrimedLower(Sets.newHashSet(authzConf
+        .getStrings(AuthzConfVars.AUTHZ_METASTORE_SERVICE_USERS.getVar(),
+            new String[] { "" }))));
+    warehouseDir = hiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
+
+  }
+
+  /**
+   * Main listener callback which is the entry point for Sentry
+   */
+  @Override
+  public void onEvent(PreEventContext context) throws MetaException,
+      NoSuchObjectException, InvalidOperationException {
+
+    if (!needsAuthorization(getUserName())) {
+      return;
+    }
+    switch (context.getEventType()) {
+    case CREATE_TABLE:
+      authorizeCreateTable((PreCreateTableEvent) context);
+      break;
+    case DROP_TABLE:
+      authorizeDropTable((PreDropTableEvent) context);
+      break;
+    case ALTER_TABLE:
+      authorizeAlterTable((PreAlterTableEvent) context);
+      break;
+    case ADD_PARTITION:
+      authorizeAddPartition((PreAddPartitionEvent) context);
+      break;
+    case DROP_PARTITION:
+      authorizeDropPartition((PreDropPartitionEvent) context);
+      break;
+    case ALTER_PARTITION:
+      authorizeAlterPartition((PreAlterPartitionEvent) context);
+      break;
+    case CREATE_DATABASE:
+      authorizeCreateDatabase();
+      break;
+    case DROP_DATABASE:
+      authorizeDropDatabase((PreDropDatabaseEvent) context);
+      break;
+    case LOAD_PARTITION_DONE:
+      // noop for now
+      break;
+    default:
+      break;
+    }
+  }
+
+  private void authorizeCreateDatabase()
+      throws InvalidOperationException, MetaException {
+    authorizeMetastoreAccess(HiveOperation.CREATEDATABASE,
+        new HierarcyBuilder().addServerToOutput(getAuthServer()).build(),
+        new HierarcyBuilder().addServerToOutput(getAuthServer()).build());
+  }
+
+  private void authorizeDropDatabase(PreDropDatabaseEvent context)
+      throws InvalidOperationException, MetaException {
+    authorizeMetastoreAccess(HiveOperation.DROPDATABASE,
+ new HierarcyBuilder()
+.addDbToOutput(getAuthServer(),
+            context.getDatabase().getName()).build(),
+        new HierarcyBuilder().addDbToOutput(getAuthServer(),
+            context.getDatabase().getName()).build());
+  }
+
+  private void authorizeCreateTable(PreCreateTableEvent context)
+      throws InvalidOperationException, MetaException {
+    HierarcyBuilder inputBuilder = new HierarcyBuilder();
+    inputBuilder.addDbToOutput(getAuthServer(), context.getTable().getDbName());
+    HierarcyBuilder outputBuilder = new HierarcyBuilder();
+    outputBuilder.addDbToOutput(getAuthServer(), context.getTable().getDbName());
+
+    if (!StringUtils.isEmpty(context.getTable().getSd().getLocation())) {
+      String uriPath;
+      try {
+        uriPath = PathUtils.parseDFSURI(warehouseDir,
+            getSdLocation(context.getTable().getSd()));
+      } catch(URISyntaxException e) {
+        throw new MetaException(e.getMessage());
+      }
+      inputBuilder.addUriToOutput(getAuthServer(), uriPath, warehouseDir);
+    }
+    authorizeMetastoreAccess(HiveOperation.CREATETABLE, inputBuilder.build(),
+        outputBuilder.build());
+  }
+
+  private void authorizeDropTable(PreDropTableEvent context)
+      throws InvalidOperationException, MetaException {
+    authorizeMetastoreAccess(
+        HiveOperation.DROPTABLE,
+        new HierarcyBuilder().addTableToOutput(getAuthServer(),
+            context.getTable().getDbName(), context.getTable().getTableName())
+            .build(),
+        new HierarcyBuilder().addTableToOutput(getAuthServer(),
+            context.getTable().getDbName(), context.getTable().getTableName())
+            .build());
+  }
+
+  private void authorizeAlterTable(PreAlterTableEvent context)
+      throws InvalidOperationException, MetaException {
+    /*
+     * There are multiple alter table options and it's tricky to figure which is
+     * attempted here. Currently all alter table needs full level privilege
+     * except the for setting location which also needs a privile on URI. Hence
+     * we set initially set the operation to ALTERTABLE_ADDCOLS. If the client
+     * has specified the location, then change to ALTERTABLE_LOCATION
+     */
+    HiveOperation operation = HiveOperation.ALTERTABLE_ADDCOLS;
+    HierarcyBuilder inputBuilder = new HierarcyBuilder();
+    inputBuilder.addTableToOutput(getAuthServer(), context.getOldTable()
+        .getDbName(), context.getOldTable().getTableName());
+    HierarcyBuilder outputBuilder = new HierarcyBuilder();
+    outputBuilder.addTableToOutput(getAuthServer(), context.getOldTable()
+        .getDbName(), context.getOldTable().getTableName());
+
+    // if the operation requires location change, then add URI privilege check
+    String oldLocationUri = null;
+    String newLocationUri = null;
+    try {
+      if (!StringUtils.isEmpty(context.getOldTable().getSd().getLocation())) {
+        oldLocationUri = PathUtils.parseDFSURI(warehouseDir,
+            getSdLocation(context.getOldTable().getSd()));
+      }
+      if (!StringUtils.isEmpty(context.getNewTable().getSd().getLocation())) {
+        newLocationUri = PathUtils.parseDFSURI(warehouseDir,
+            getSdLocation(context.getNewTable().getSd()));
+      }
+    } catch (URISyntaxException e) {
+      throw new MetaException(e.getMessage());
+    }
+    if (!StringUtils.equals(oldLocationUri, newLocationUri)) {
+      outputBuilder.addUriToOutput(getAuthServer(), newLocationUri,
+          warehouseDir);
+      operation = HiveOperation.ALTERTABLE_LOCATION;
+    }
+    authorizeMetastoreAccess(
+        operation,
+        inputBuilder.build(), outputBuilder.build());
+  }
+
+  private void authorizeAddPartition(PreAddPartitionEvent context)
+      throws InvalidOperationException, MetaException, NoSuchObjectException {
+    for (Partition mapiPart : context.getPartitions()) {
+	    HierarcyBuilder inputBuilder = new HierarcyBuilder();
+      inputBuilder.addTableToOutput(getAuthServer(), mapiPart
+          .getDbName(), mapiPart.getTableName());
+      HierarcyBuilder outputBuilder = new HierarcyBuilder();
+	    outputBuilder.addTableToOutput(getAuthServer(), mapiPart
+	        .getDbName(), mapiPart.getTableName());
+	    // check if we need to validate URI permissions when storage location is
+	    // non-default, ie something not under the parent table
+
+      String partitionLocation = null;
+      if (mapiPart.isSetSd()) {
+        partitionLocation = mapiPart.getSd().getLocation();
+	    }
+	    if (!StringUtils.isEmpty(partitionLocation)) {
+	      String tableLocation = context
+	          .getHandler()
+	          .get_table(mapiPart.getDbName(),
+	              mapiPart.getTableName()).getSd().getLocation();
+	      String uriPath;
+	      try {
+	        uriPath = PathUtils.parseDFSURI(warehouseDir, mapiPart
+	            .getSd().getLocation());
+	      } catch (URISyntaxException e) {
+	        throw new MetaException(e.getMessage());
+	      }
+        if (!partitionLocation.equals(tableLocation) &&
+            !partitionLocation.startsWith(tableLocation + File.separator)) {
+          outputBuilder.addUriToOutput(getAuthServer(), uriPath, warehouseDir);
+	      }
+	    }
+      authorizeMetastoreAccess(HiveOperation.ALTERTABLE_ADDPARTS,
+	        inputBuilder.build(), outputBuilder.build());
+    }
+  }
+
+  protected void authorizeDropPartition(PreDropPartitionEvent context)
+      throws InvalidOperationException, MetaException {
+    authorizeMetastoreAccess(
+        HiveOperation.ALTERTABLE_DROPPARTS,
+        new HierarcyBuilder().addTableToOutput(getAuthServer(),
+            context.getPartition().getDbName(),
+            context.getPartition().getTableName()).build(),
+        new HierarcyBuilder().addTableToOutput(getAuthServer(),
+            context.getPartition().getDbName(),
+            context.getPartition().getTableName()).build());
+  }
+
+  private void authorizeAlterPartition(PreAlterPartitionEvent context)
+      throws InvalidOperationException, MetaException, NoSuchObjectException {
+    /*
+     * There are multiple alter partition options and it's tricky to figure out
+     * which is attempted here. Currently all alter partition need full level
+     * privilege except the for setting location which also needs a privilege on
+     * URI. Currently we don't try to distinguish the operation type. All alter
+     * partitions are treated as set-location
+     */
+    HierarcyBuilder inputBuilder = new HierarcyBuilder().addTableToOutput(
+        getAuthServer(), context.getDbName(), context.getTableName());
+    HierarcyBuilder outputBuilder = new HierarcyBuilder().addTableToOutput(
+        getAuthServer(), context.getDbName(), context.getTableName());
+
+    Partition partition = context.getNewPartition();
+    String partitionLocation = getSdLocation(partition.getSd());
+    if (!StringUtils.isEmpty(partitionLocation)) {
+      String tableLocation = context.getHandler().get_table(
+          partition.getDbName(), partition.getTableName()).getSd().getLocation();
+
+      String uriPath;
+      try {
+        uriPath = PathUtils.parseDFSURI(warehouseDir, partitionLocation);
+        } catch (URISyntaxException e) {
+        throw new MetaException(e.getMessage());
+      }
+      if (!partitionLocation.startsWith(tableLocation + File.separator)) {
+        outputBuilder.addUriToOutput(getAuthServer(), uriPath, warehouseDir);
+      }
+    }
+    authorizeMetastoreAccess(
+        HiveOperation.ALTERPARTITION_LOCATION,
+        inputBuilder.build(), outputBuilder.build());
+  }
+
+  protected InvalidOperationException invalidOperationException(Exception e) {
+    InvalidOperationException ex = new InvalidOperationException(e.getMessage());
+    ex.initCause(e.getCause());
+    return ex;
+  }
+
+  /**
+   * Assemble the required privileges and requested privileges. Validate using
+   * Hive bind auth provider
+   * @param hiveOp
+   * @param inputHierarchy
+   * @param outputHierarchy
+   * @throws InvalidOperationException
+   */
+  protected abstract void authorizeMetastoreAccess(HiveOperation hiveOp,
+      List<List<DBModelAuthorizable>> inputHierarchy,
+      List<List<DBModelAuthorizable>> outputHierarchy)
+      throws InvalidOperationException;
+
+  public Server getAuthServer() {
+    return authServer;
+  }
+
+  private boolean needsAuthorization(String userName) {
+    return !serviceUsers.contains(userName);
+  }
+
+  private static Set<String> toTrimedLower(Set<String> s) {
+    Set<String> result = Sets.newHashSet();
+    for (String v : s) {
+      result.add(v.trim().toLowerCase());
+    }
+    return result;
+  }
+
+  protected HiveAuthzBinding getHiveAuthzBinding() throws Exception {
+    if (hiveAuthzBinding == null) {
+      hiveAuthzBinding = new HiveAuthzBinding(HiveAuthzBinding.HiveHook.HiveMetaStore, hiveConf, authzConf);
+    }
+    return hiveAuthzBinding;
+  }
+
+  protected String getUserName() throws MetaException {
+    try {
+      return Utils.getUGI().getShortUserName();
+    } catch (LoginException e) {
+      throw new MetaException("Failed to get username " + e.getMessage());
+    } catch (IOException e) {
+      throw new MetaException("Failed to get username " + e.getMessage());
+    }
+  }
+
+  private String getSdLocation(StorageDescriptor sd) {
+    if (sd == null) {
+      return "";
+    } else {
+      return sd.getLocation();
+    }
+  }
+
+  public static boolean isSentryCacheOutOfSync() {
+    return sentryCacheOutOfSync;
+  }
+
+  public static void setSentryCacheOutOfSync(boolean sentryCacheOutOfSync) {
+    MetastoreAuthzBindingBase.sentryCacheOutOfSync = sentryCacheOutOfSync;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/b19cb01b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/MetastoreAuthzBindingV2.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/MetastoreAuthzBindingV2.java b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/MetastoreAuthzBindingV2.java
index cfef1a7..107fe1f 100644
--- a/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/MetastoreAuthzBindingV2.java
+++ b/sentry-binding/sentry-binding-hive-v2/src/main/java/org/apache/sentry/binding/hive/v2/metastore/MetastoreAuthzBindingV2.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.sentry.binding.hive.authz.HiveAuthzBinding;
 import org.apache.sentry.binding.hive.v2.HiveAuthzPrivilegesMapV2;
-import org.apache.sentry.binding.metastore.MetastoreAuthzBindingBase;
+import org.apache.sentry.binding.metastore.MetastoreAuthzBindingBaseV2;
 import org.apache.sentry.core.common.Subject;
 import org.apache.sentry.core.common.exception.SentryUserException;
 import org.apache.sentry.core.model.db.DBModelAuthorizable;
@@ -44,7 +44,7 @@ import org.apache.sentry.core.model.db.DBModelAuthorizable;
  * passed down to the hive binding which handles the authorization. This ensures
  * that we follow the same privilege model and policies.
  */
-public class MetastoreAuthzBindingV2 extends MetastoreAuthzBindingBase {
+public class MetastoreAuthzBindingV2 extends MetastoreAuthzBindingBaseV2 {
 
   public MetastoreAuthzBindingV2(Configuration config) throws Exception {
     super(config);