You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/06/25 18:51:58 UTC

[2/2] drill git commit: DRILL-3203: Add support for impersonation in Hive storage plugin

DRILL-3203: Add support for impersonation in Hive storage plugin


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c1b847ac
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c1b847ac
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c1b847ac

Branch: refs/heads/master
Commit: c1b847acdc8cb90a1498b236b3bb5c81ca75c044
Parents: 58c3c4c
Author: vkorukanti <ve...@gmail.com>
Authored: Sat Jun 20 19:29:23 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Thu Jun 25 07:53:36 2015 -0700

----------------------------------------------------------------------
 contrib/storage-hive/core/pom.xml               |  12 +
 .../store/hive/DrillHiveMetaStoreClient.java    | 375 +++++++++++++++++++
 .../store/hive/HiveAuthorizationHelper.java     | 153 ++++++++
 .../apache/drill/exec/store/hive/HiveScan.java  |  31 +-
 .../exec/store/hive/schema/DrillHiveTable.java  |   4 +-
 .../store/hive/schema/DrillHiveViewTable.java   |   5 +-
 .../store/hive/schema/HiveSchemaFactory.java    | 207 +++-------
 .../drill/exec/hive/HiveTestUtilities.java      |  50 +++
 .../hive/BaseTestHiveImpersonation.java         | 140 +++++++
 .../hive/TestSqlStdBasedAuthorization.java      | 296 +++++++++++++++
 .../hive/TestStorageBasedHiveAuthorization.java | 372 ++++++++++++++++++
 .../exec/store/hive/HiveTestDataGenerator.java  |  51 +--
 .../core/src/test/resources/student.txt         |  10 +
 .../core/src/test/resources/voter.txt           |  10 +
 .../org/apache/drill/exec/ops/QueryContext.java |  38 +-
 .../apache/drill/exec/store/AbstractSchema.java |   7 +-
 .../drill/exec/util/ImpersonationUtil.java      |  16 +-
 .../java/org/apache/drill/BaseTestQuery.java    |  31 ++
 .../impersonation/BaseTestImpersonation.java    |  66 +++-
 .../TestImpersonationDisabledWithMiniDFS.java   |  37 +-
 .../TestImpersonationMetadata.java              |  47 +--
 .../impersonation/TestImpersonationQueries.java |  66 +---
 22 files changed, 1686 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c1b847ac/contrib/storage-hive/core/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/pom.xml b/contrib/storage-hive/core/pom.xml
index 546fd7b..f0330f1 100644
--- a/contrib/storage-hive/core/pom.xml
+++ b/contrib/storage-hive/core/pom.xml
@@ -87,6 +87,18 @@
       <artifactId>hadoop-yarn-api</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/drill/blob/c1b847ac/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java
new file mode 100644
index 0000000..ef70b2e
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Override HiveMetaStoreClient to provide additional capabilities such as caching, reconnecting with user
+ * credentials and higher level APIs to get the metadata in form that Drill needs directly.
+ */
+public abstract class DrillHiveMetaStoreClient extends HiveMetaStoreClient {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillHiveMetaStoreClient.class);
+
+  protected final Map<String, String> hiveConfigOverride;
+
+  /**
+   * Create a DrillHiveMetaStoreClient for cases where:
+   *   1. Drill impersonation is enabled and
+   *   2. either storage (in remote HiveMetaStore server) or SQL standard based authorization (in Hive storage plugin)
+   *      is enabled
+   * @param hiveConf Conf including authorization configuration
+   * @param hiveConfigOverride
+   * @param userName User who is trying to access the Hive metadata
+   * @param ignoreAuthzErrors When browsing info schema, we want to ignore permission denied errors. If a permission
+   *                          denied error occurs while accessing metadata for an object, it will not be shown in the
+   *                          info schema.
+   * @return
+   * @throws MetaException
+   */
+  public static DrillHiveMetaStoreClient createClientWithAuthz(final HiveConf hiveConf,
+      final Map<String, String> hiveConfigOverride, final String userName, final boolean ignoreAuthzErrors)
+      throws MetaException {
+    try {
+      final UserGroupInformation ugiForRpc; // UGI credentials to use for RPC communication with Hive MetaStore server
+      if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
+        // If the user impersonation is disabled in Hive storage plugin (not Drill impersonation), use the process
+        // user UGI credentials.
+        ugiForRpc = ImpersonationUtil.getProcessUserUGI();
+      } else {
+        ugiForRpc = ImpersonationUtil.createProxyUgi(userName);
+      }
+      return ugiForRpc.doAs(new PrivilegedExceptionAction<DrillHiveMetaStoreClient>() {
+        @Override
+        public DrillHiveMetaStoreClient run() throws Exception {
+          return new HiveClientWithAuthz(hiveConf, hiveConfigOverride, ugiForRpc, userName, ignoreAuthzErrors);
+        }
+      });
+    } catch (final Exception e) {
+      throw new DrillRuntimeException("Failure setting up HiveMetaStore client.", e);
+    }
+  }
+
+  /**
+   * Create a DrillMetaStoreClient that can be shared across multiple users. This is created when impersonation is
+   * disabled.
+   * @param hiveConf
+   * @param hiveConfigOverride
+   * @return
+   * @throws MetaException
+   */
+  public static DrillHiveMetaStoreClient createNonCloseableClientWithCaching(final HiveConf hiveConf,
+      final Map<String, String> hiveConfigOverride) throws MetaException {
+    return new NonCloseableHiveClientWithCaching(hiveConf, hiveConfigOverride);
+  }
+
+  private DrillHiveMetaStoreClient(final HiveConf hiveConf, final Map<String, String> hiveConfigOverride)
+      throws MetaException {
+    super(hiveConf);
+    this.hiveConfigOverride = hiveConfigOverride;
+  }
+
+
+  /**
+   * Higher level API that returns the databases in Hive.
+   * @return
+   * @throws TException
+   */
+  public abstract List<String> getDatabases() throws TException;
+
+  /**
+   * Higher level API that returns the tables in given database.
+   * @param dbName
+   * @return
+   * @throws TException
+   */
+  public abstract List<String> getTableNames(final String dbName) throws TException;
+
+  /**
+   * Higher level API that returns the {@link HiveReadEntry} for given database and table.
+   * @param dbName
+   * @param tableName
+   * @return
+   * @throws TException
+   */
+  public abstract HiveReadEntry getHiveReadEntry(final String dbName, final String tableName) throws TException;
+
+  /** Helper method which gets database. Retries once if the first call to fetch the metadata fails */
+  protected static List<String> getDatabasesHelper(final IMetaStoreClient mClient) throws TException {
+    try {
+      return mClient.getAllDatabases();
+    } catch (TException e) {
+      logger.warn("Failure while attempting to get hive databases", e);
+      mClient.reconnect();
+      return mClient.getAllDatabases();
+    }
+  }
+
+  /** Helper method which gets tables in a database. Retries once if the first call to fetch the metadata fails */
+  protected static List<String> getTableNamesHelper(final IMetaStoreClient mClient, final String dbName)
+      throws TException {
+    try {
+      return mClient.getAllTables(dbName);
+    } catch (TException e) {
+      logger.warn("Failure while attempting to get hive tables", e);
+      mClient.reconnect();
+      return mClient.getAllTables(dbName);
+    }
+  }
+
+  /** Helper method which gets table metadata. Retries once if the first call to fetch the metadata fails */
+  protected static HiveReadEntry getHiveReadEntryHelper(final IMetaStoreClient mClient, final String dbName,
+      final String tableName, final Map<String, String> hiveConfigOverride) throws TException {
+    Table t = null;
+    try {
+      t = mClient.getTable(dbName, tableName);
+    } catch (TException e) {
+      mClient.reconnect();
+      t = mClient.getTable(dbName, tableName);
+    }
+
+    if (t == null) {
+      throw new UnknownTableException(String.format("Unable to find table '%s'.", tableName));
+    }
+
+    List<Partition> partitions;
+    try {
+      partitions = mClient.listPartitions(dbName, tableName, (short) -1);
+    } catch (TException e) {
+      mClient.reconnect();
+      partitions = mClient.listPartitions(dbName, tableName, (short) -1);
+    }
+
+    List<HiveTable.HivePartition> hivePartitions = Lists.newArrayList();
+    for (Partition part : partitions) {
+      hivePartitions.add(new HiveTable.HivePartition(part));
+    }
+
+    if (hivePartitions.size() == 0) {
+      hivePartitions = null;
+    }
+
+    return new HiveReadEntry(new HiveTable(t), hivePartitions, hiveConfigOverride);
+  }
+
+  /**
+   * HiveMetaStoreClient to create and maintain (reconnection cases) connection to Hive metastore with given user
+   * credentials and check authorization privileges if set.
+   */
+  private static class HiveClientWithAuthz extends DrillHiveMetaStoreClient {
+    private final UserGroupInformation ugiForRpc;
+    private final boolean ignoreAuthzErrors;
+    private HiveAuthorizationHelper authorizer;
+
+    private HiveClientWithAuthz(final HiveConf hiveConf, final Map<String, String> hiveConfigOverride,
+        final UserGroupInformation ugiForRpc, final String userName, final boolean ignoreAuthzErrors)
+        throws TException {
+      super(hiveConf, hiveConfigOverride);
+      this.ugiForRpc = ugiForRpc;
+      this.ignoreAuthzErrors = ignoreAuthzErrors;
+      this.authorizer = new HiveAuthorizationHelper(this, hiveConf, userName);
+    }
+
+    @Override
+    public void reconnect() throws MetaException {
+      try {
+        ugiForRpc.doAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            reconnectSuper();
+            return null;
+          }
+        });
+      } catch (final InterruptedException | IOException e) {
+        throw new DrillRuntimeException("Failed to reconnect to HiveMetaStore: " + e.getMessage(), e);
+      }
+    }
+
+    private void reconnectSuper() throws MetaException {
+      super.reconnect();
+    }
+
+    public List<String> getDatabases() throws TException {
+      try {
+        authorizer.authorizeShowDatabases();
+      } catch (final HiveAccessControlException e) {
+        if (ignoreAuthzErrors) {
+          return Collections.emptyList();
+        }
+        throw UserException.permissionError(e).build(logger);
+      }
+      return getDatabasesHelper(this);
+    }
+
+    public List<String> getTableNames(final String dbName) throws TException {
+      try {
+        authorizer.authorizeShowTables(dbName);
+      } catch (final HiveAccessControlException e) {
+        if (ignoreAuthzErrors) {
+          return Collections.emptyList();
+        }
+        throw UserException.permissionError(e).build(logger);
+      }
+      return getTableNamesHelper(this, dbName);
+    }
+
+    public HiveReadEntry getHiveReadEntry(final String dbName, final String tableName) throws TException {
+      try {
+        authorizer.authorizeReadTable(dbName, tableName);
+      } catch (final HiveAccessControlException e) {
+        if (!ignoreAuthzErrors) {
+          throw UserException.permissionError(e).build(logger);
+        }
+      }
+      return getHiveReadEntryHelper(this, dbName, tableName, hiveConfigOverride);
+    }
+  }
+
+  /**
+   * HiveMetaStoreClient that provides a shared MetaStoreClient implementation with caching.
+   */
+  private static class NonCloseableHiveClientWithCaching extends DrillHiveMetaStoreClient {
+    private final LoadingCache<String, List<String>> databases;
+    private final LoadingCache<String, List<String>> tableNameLoader;
+    private final LoadingCache<String, LoadingCache<String, HiveReadEntry>> tableLoaders;
+
+    private NonCloseableHiveClientWithCaching(final HiveConf hiveConf,
+        final Map<String, String> hiveConfigOverride) throws MetaException {
+      super(hiveConf, hiveConfigOverride);
+
+      databases = CacheBuilder //
+          .newBuilder() //
+          .expireAfterAccess(1, TimeUnit.MINUTES) //
+          .build(new DatabaseLoader());
+
+      tableNameLoader = CacheBuilder //
+          .newBuilder() //
+          .expireAfterAccess(1, TimeUnit.MINUTES) //
+          .build(new TableNameLoader());
+
+      tableLoaders = CacheBuilder //
+          .newBuilder() //
+          .expireAfterAccess(4, TimeUnit.HOURS) //
+          .maximumSize(20) //
+          .build(new TableLoaderLoader());
+    }
+
+    @Override
+    public List<String> getDatabases() throws TException {
+      try {
+        return databases.get("databases");
+      } catch (final ExecutionException e) {
+        throw new TException(e);
+      }
+    }
+
+    @Override
+    public List<String> getTableNames(final String dbName) throws TException {
+      try {
+        return tableNameLoader.get(dbName);
+      } catch (final ExecutionException e) {
+        throw new TException(e);
+      }
+    }
+
+    @Override
+    public HiveReadEntry getHiveReadEntry(final String dbName, final String tableName) throws TException {
+      try {
+        return tableLoaders.get(dbName).get(tableName);
+      } catch (final ExecutionException e) {
+        throw new TException(e);
+      }
+    }
+
+    @Override
+    public void close() {
+      // No-op.
+    }
+
+    private class DatabaseLoader extends CacheLoader<String, List<String>> {
+      @Override
+      public List<String> load(String key) throws Exception {
+        if (!"databases".equals(key)) {
+          throw new UnsupportedOperationException();
+        }
+        synchronized (NonCloseableHiveClientWithCaching.this) {
+          return getDatabasesHelper(NonCloseableHiveClientWithCaching.this);
+        }
+      }
+    }
+
+    private class TableNameLoader extends CacheLoader<String, List<String>> {
+      @Override
+      public List<String> load(String dbName) throws Exception {
+        synchronized (NonCloseableHiveClientWithCaching.this) {
+          return getTableNamesHelper(NonCloseableHiveClientWithCaching.this, dbName);
+        }
+      }
+    }
+
+    private class TableLoaderLoader extends CacheLoader<String, LoadingCache<String, HiveReadEntry>> {
+      @Override
+      public LoadingCache<String, HiveReadEntry> load(String key) throws Exception {
+        return CacheBuilder
+            .newBuilder()
+            .expireAfterAccess(1, TimeUnit.MINUTES)
+            .build(new TableLoader(key));
+      }
+    }
+
+    private class TableLoader extends CacheLoader<String, HiveReadEntry> {
+      private final String dbName;
+
+      public TableLoader(final String dbName) {
+        this.dbName = dbName;
+      }
+
+      @Override
+      public HiveReadEntry load(String key) throws Exception {
+        synchronized (NonCloseableHiveClientWithCaching.this) {
+          return getHiveReadEntryHelper(NonCloseableHiveClientWithCaching.this, dbName, key, hiveConfigOverride);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c1b847ac/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAuthorizationHelper.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAuthorizationHelper.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAuthorizationHelper.java
new file mode 100644
index 0000000..643b121
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAuthorizationHelper.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizerFactory;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzPluginException;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext.CLIENT_TYPE;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactory;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Helper class for initializing and checking privileges according to authorization configuration set in Hive storage
+ * plugin config.
+ */
+public class HiveAuthorizationHelper {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveAuthorizationHelper.class);
+
+  final boolean authzEnabled;
+  final HiveAuthorizer authorizerV2;
+
+  public HiveAuthorizationHelper(final IMetaStoreClient mClient, final HiveConf hiveConf, final String user) {
+    authzEnabled = hiveConf.getBoolVar(ConfVars.HIVE_AUTHORIZATION_ENABLED);
+    if (!authzEnabled) {
+      authorizerV2 = null;
+      return;
+    }
+
+    try {
+      final HiveConf hiveConfCopy = new HiveConf(hiveConf);
+      hiveConfCopy.set("user.name", user);
+
+      final HiveAuthenticationProvider authenticator = HiveUtils.getAuthenticator(hiveConfCopy,
+          HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER);
+      SessionState ss = new SessionState(hiveConfCopy, user);
+      SessionState.start(ss);
+
+      authenticator.setSessionState(ss);
+
+      HiveAuthorizerFactory authorizerFactory =
+          HiveUtils.getAuthorizerFactory(hiveConfCopy, HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER);
+
+      HiveAuthzSessionContext.Builder authzContextBuilder = new HiveAuthzSessionContext.Builder();
+      authzContextBuilder.setClientType(CLIENT_TYPE.HIVESERVER2); // Drill is emulating HS2 here
+
+      authorizerV2 = authorizerFactory.createHiveAuthorizer(
+          new HiveMetastoreClientFactory() {
+            @Override
+            public IMetaStoreClient getHiveMetastoreClient() throws HiveAuthzPluginException {
+              return mClient;
+            }
+          },
+          hiveConf, authenticator, authzContextBuilder.build());
+
+      authorizerV2.applyAuthorizationConfigPolicy(hiveConfCopy);
+    } catch (final HiveException e) {
+      throw new DrillRuntimeException("Failed to initialize Hive authorization components: " + e.getMessage(), e);
+    }
+
+    logger.trace("Hive authorization enabled");
+  }
+
+  /**
+   * Check authorization for "SHOW DATABASES" command. A {@link HiveAccessControlException} is thrown
+   * for illegal access.
+   */
+  public void authorizeShowDatabases() throws HiveAccessControlException {
+    if (!authzEnabled) {
+      return;
+    }
+
+    authorize(HiveOperationType.SHOWDATABASES, Collections.EMPTY_LIST, Collections.EMPTY_LIST, "SHOW DATABASES");
+  }
+
+  /**
+   * Check authorization for "SHOW TABLES" command in given Hive db. A {@link HiveAccessControlException} is thrown
+   * for illegal access.
+   * @param dbName
+   */
+  public void authorizeShowTables(final String dbName) throws HiveAccessControlException {
+    if (!authzEnabled) {
+      return;
+    }
+
+    final HivePrivilegeObject toRead = new HivePrivilegeObject(HivePrivilegeObjectType.DATABASE, dbName, null);
+
+    authorize(HiveOperationType.SHOWTABLES, ImmutableList.of(toRead), Collections.EMPTY_LIST, "SHOW TABLES");
+  }
+
+  /**
+   * Check authorization for "READ TABLE" for given db.table. A {@link HiveAccessControlException} is thrown
+   * for illegal access.
+   * @param dbName
+   * @param tableName
+   */
+  public void authorizeReadTable(final String dbName, final String tableName) throws HiveAccessControlException {
+    if (!authzEnabled) {
+      return;
+    }
+
+    HivePrivilegeObject toRead = new HivePrivilegeObject(HivePrivilegeObjectType.TABLE_OR_VIEW, dbName, tableName);
+    authorize(HiveOperationType.QUERY, ImmutableList.of(toRead), Collections.EMPTY_LIST, "READ TABLE");
+  }
+
+  /* Helper method to check privileges */
+  private void authorize(final HiveOperationType hiveOpType, final List<HivePrivilegeObject> toRead,
+      final List<HivePrivilegeObject> toWrite, final String cmd) throws HiveAccessControlException {
+    try {
+      HiveAuthzContext.Builder authzContextBuilder = new HiveAuthzContext.Builder();
+      authzContextBuilder.setUserIpAddress("Not available");
+      authzContextBuilder.setCommandString(cmd);
+
+      authorizerV2.checkPrivileges(hiveOpType, toRead, toWrite, authzContextBuilder.build());
+    } catch (final HiveAccessControlException e) {
+      throw e;
+    } catch (final Exception e) {
+      throw new DrillRuntimeException("Failed to use the Hive authorization components: " + e.getMessage(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c1b847ac/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 8a2e498..9ada569 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.hive;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -40,6 +41,7 @@ import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -61,6 +63,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.io.ByteArrayDataOutput;
 import com.google.common.io.ByteStreams;
+import org.apache.hadoop.security.UserGroupInformation;
 
 @JsonTypeName("hive-scan")
 public class HiveScan extends AbstractGroupScan {
@@ -104,7 +107,7 @@ public class HiveScan extends AbstractGroupScan {
     this.storagePluginName = storagePluginName;
     this.storagePlugin = (HiveStoragePlugin) pluginRegistry.getPlugin(storagePluginName);
     this.columns = columns;
-    getSplits();
+    getSplitsWithUGI();
     endpoints = storagePlugin.getContext().getBits();
   }
 
@@ -113,7 +116,7 @@ public class HiveScan extends AbstractGroupScan {
     this.hiveReadEntry = hiveReadEntry;
     this.columns = columns;
     this.storagePlugin = storagePlugin;
-    getSplits();
+    getSplitsWithUGI();
     endpoints = storagePlugin.getContext().getBits();
     this.storagePluginName = storagePlugin.getName();
   }
@@ -135,6 +138,22 @@ public class HiveScan extends AbstractGroupScan {
     return columns;
   }
 
+  private void getSplitsWithUGI() throws ExecutionSetupException {
+    final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(getUserName());
+    try {
+      ugi.doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          getSplits();
+          return null;
+        }
+      });
+    } catch (final InterruptedException | IOException e) {
+      final String errMsg = String.format("Failed to create input splits: %s", e.getMessage());
+      logger.error(errMsg, e);
+      throw new DrillRuntimeException(errMsg, e);
+    }
+  }
+
   private void getSplits() throws ExecutionSetupException {
     try {
       final List<Partition> partitions = hiveReadEntry.getPartitions();
@@ -169,12 +188,10 @@ public class HiveScan extends AbstractGroupScan {
     final Path path = new Path(sd.getLocation());
     final FileSystem fs = path.getFileSystem(job);
 
-    // Use new JobConf that has FS configuration
-    final JobConf jobWithFsConf = new JobConf(fs.getConf());
     if (fs.exists(path)) {
-      FileInputFormat.addInputPath(jobWithFsConf, path);
-      format = jobWithFsConf.getInputFormat();
-      for (final InputSplit split : format.getSplits(jobWithFsConf, 1)) {
+      FileInputFormat.addInputPath(job, path);
+      format = job.getInputFormat();
+      for (final InputSplit split : format.getSplits(job, 1)) {
         inputSplits.add(split);
         partitionMap.put(split, partition);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/c1b847ac/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
index d0ea143..b459ee4 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
@@ -45,8 +45,8 @@ public class DrillHiveTable extends DrillTable{
 
   protected final Table hiveTable;
 
-  public DrillHiveTable(String storageEngineName, HiveStoragePlugin plugin, HiveReadEntry readEntry) {
-    super(storageEngineName, plugin, readEntry);
+  public DrillHiveTable(String storageEngineName, HiveStoragePlugin plugin, String userName, HiveReadEntry readEntry) {
+    super(storageEngineName, plugin, userName, readEntry);
     this.hiveTable = new Table(readEntry.getTable());
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c1b847ac/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveViewTable.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveViewTable.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveViewTable.java
index 1e02301..1a08433 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveViewTable.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveViewTable.java
@@ -25,8 +25,9 @@ import org.apache.drill.exec.store.hive.HiveStoragePlugin;
 
 public class DrillHiveViewTable extends DrillHiveTable implements DrillViewInfoProvider {
 
-  public DrillHiveViewTable(String storageEngineName, HiveStoragePlugin plugin, HiveReadEntry readEntry) {
-    super(storageEngineName, plugin, readEntry);
+  public DrillHiveViewTable(String storageEngineName, HiveStoragePlugin plugin, String userName,
+      HiveReadEntry readEntry) {
+    super(storageEngineName, plugin, userName, readEntry);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/c1b847ac/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
index 83f250b..c8f2490 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
@@ -18,187 +18,105 @@
 package org.apache.drill.exec.store.hive.schema;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.SchemaFactory;
+import org.apache.drill.exec.store.hive.DrillHiveMetaStoreClient;
 import org.apache.drill.exec.store.hive.HiveReadEntry;
 import org.apache.drill.exec.store.hive.HiveStoragePlugin;
 import org.apache.drill.exec.store.hive.HiveStoragePluginConfig;
-import org.apache.drill.exec.store.hive.HiveTable;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.UnknownTableException;
 import org.apache.thrift.TException;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 public class HiveSchemaFactory implements SchemaFactory {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveSchemaFactory.class);
 
-  private static final String DATABASES = "databases";
-
-  private final HiveMetaStoreClient mClient;
-  private LoadingCache<String, List<String>> databases;
-  private LoadingCache<String, List<String>> tableNameLoader;
-  private LoadingCache<String, LoadingCache<String, HiveReadEntry>> tableLoaders;
-  private HiveStoragePlugin plugin;
-  private final String schemaName;
+  private final DrillHiveMetaStoreClient globalMetastoreClient;
+  private final HiveStoragePlugin plugin;
   private final Map<String, String> hiveConfigOverride;
+  private final String schemaName;
+  private final HiveConf hiveConf;
+  private final boolean isDrillImpersonationEnabled;
+  private final boolean isHS2DoAsSet;
 
   public HiveSchemaFactory(HiveStoragePlugin plugin, String name, Map<String, String> hiveConfigOverride) throws ExecutionSetupException {
     this.schemaName = name;
     this.plugin = plugin;
 
     this.hiveConfigOverride = hiveConfigOverride;
-    HiveConf hiveConf = new HiveConf();
+    hiveConf = new HiveConf();
     if (hiveConfigOverride != null) {
       for (Map.Entry<String, String> entry : hiveConfigOverride.entrySet()) {
-        hiveConf.set(entry.getKey(), entry.getValue());
+        final String property = entry.getKey();
+        final String value = entry.getValue();
+        hiveConf.set(property, value);
+        logger.trace("HiveConfig Override {}={}", property, value);
       }
     }
 
-    try {
-      this.mClient = new HiveMetaStoreClient(hiveConf);
-    } catch (MetaException e) {
-      throw new ExecutionSetupException("Failure setting up Hive metastore client.", e);
-    }
-
-    databases = CacheBuilder //
-        .newBuilder() //
-        .expireAfterAccess(1, TimeUnit.MINUTES) //
-        .build(new DatabaseLoader());
-
-    tableNameLoader = CacheBuilder //
-        .newBuilder() //
-        .expireAfterAccess(1, TimeUnit.MINUTES) //
-        .build(new TableNameLoader());
-
-    tableLoaders = CacheBuilder //
-        .newBuilder() //
-        .expireAfterAccess(4, TimeUnit.HOURS) //
-        .maximumSize(20) //
-        .build(new TableLoaderLoader());
-  }
-
-  private class TableNameLoader extends CacheLoader<String, List<String>> {
+    isHS2DoAsSet = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS);
+    isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
 
-    @Override
-    public List<String> load(String dbName) throws Exception {
+    if (!isDrillImpersonationEnabled) {
       try {
-        return mClient.getAllTables(dbName);
-      } catch (TException e) {
-        logger.warn("Failure while attempting to get hive tables", e);
-        mClient.reconnect();
-        return mClient.getAllTables(dbName);
+        globalMetastoreClient = DrillHiveMetaStoreClient.createNonCloseableClientWithCaching(hiveConf, hiveConfigOverride);
+      } catch (MetaException e) {
+        throw new ExecutionSetupException("Failure setting up Hive metastore client.", e);
       }
+    } else {
+      globalMetastoreClient = null;
     }
-
   }
 
-  private class DatabaseLoader extends CacheLoader<String, List<String>> {
-
-    @Override
-    public List<String> load(String key) throws Exception {
-      if (!DATABASES.equals(key)) {
-        throw new UnsupportedOperationException();
-      }
-      try {
-        return mClient.getAllDatabases();
-      } catch (TException e) {
-        logger.warn("Failure while attempting to get hive tables", e);
-        mClient.reconnect();
-        return mClient.getAllDatabases();
-      }
-    }
+  /**
+   * Does Drill needs to impersonate as user connected to Drill when reading data from Hive warehouse location?
+   * @return True when both Drill impersonation and Hive impersonation are enabled.
+   */
+  private boolean needToImpersonateReadingData() {
+    return isDrillImpersonationEnabled && isHS2DoAsSet;
   }
 
-  private class TableLoaderLoader extends CacheLoader<String, LoadingCache<String, HiveReadEntry>> {
-
-    @Override
-    public LoadingCache<String, HiveReadEntry> load(String key) throws Exception {
-      return CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.MINUTES).build(new TableLoader(key));
-    }
-
-  }
-
-  private class TableLoader extends CacheLoader<String, HiveReadEntry> {
-
-    private final String dbName;
-
-    public TableLoader(String dbName) {
-      super();
-      this.dbName = dbName;
-    }
-
-    @Override
-    public HiveReadEntry load(String key) throws Exception {
-      Table t = null;
-      try {
-        t = mClient.getTable(dbName, key);
-      } catch (TException e) {
-        mClient.reconnect();
-        t = mClient.getTable(dbName, key);
-      }
-
-      if (t == null) {
-        throw new UnknownTableException(String.format("Unable to find table '%s'.", key));
-      }
-
-      List<Partition> partitions = null;
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    DrillHiveMetaStoreClient mClientForSchemaTree = globalMetastoreClient;
+    if (isDrillImpersonationEnabled) {
       try {
-        partitions = mClient.listPartitions(dbName, key, Short.MAX_VALUE);
-      } catch (TException e) {
-        mClient.reconnect();
-        partitions = mClient.listPartitions(dbName, key, Short.MAX_VALUE);
-      }
-
-      List<HiveTable.HivePartition> hivePartitions = Lists.newArrayList();
-      for (Partition part : partitions) {
-        hivePartitions.add(new HiveTable.HivePartition(part));
+        mClientForSchemaTree = DrillHiveMetaStoreClient.createClientWithAuthz(hiveConf, hiveConfigOverride,
+            schemaConfig.getUserName(), schemaConfig.getIgnoreAuthErrors());
+      } catch (final TException e) {
+        throw new IOException("Failure setting up Hive metastore client.", e);
       }
-
-      if (hivePartitions.size() == 0) {
-        hivePartitions = null;
-      }
-      return new HiveReadEntry(new HiveTable(t), hivePartitions, hiveConfigOverride);
-
     }
-
-  }
-
-  @Override
-  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    HiveSchema schema = new HiveSchema(schemaName);
+    HiveSchema schema = new HiveSchema(schemaConfig, mClientForSchemaTree, schemaName);
     SchemaPlus hPlus = parent.add(schemaName, schema);
     schema.setHolder(hPlus);
   }
 
   class HiveSchema extends AbstractSchema {
 
+    private final SchemaConfig schemaConfig;
+    private final DrillHiveMetaStoreClient mClient;
     private HiveDatabaseSchema defaultSchema;
 
-    public HiveSchema(String name) {
+    public HiveSchema(final SchemaConfig schemaConfig, final DrillHiveMetaStoreClient mClient, final String name) {
       super(ImmutableList.<String>of(), name);
+      this.schemaConfig = schemaConfig;
+      this.mClient = mClient;
       getSubSchema("default");
     }
 
@@ -206,25 +124,24 @@ public class HiveSchemaFactory implements SchemaFactory {
     public AbstractSchema getSubSchema(String name) {
       List<String> tables;
       try {
-        List<String> dbs = databases.get(DATABASES);
+        List<String> dbs = mClient.getDatabases();
         if (!dbs.contains(name)) {
-          logger.debug(String.format("Database '%s' doesn't exists in Hive storage '%s'", name, schemaName));
+          logger.debug("Database '{}' doesn't exists in Hive storage '{}'", name, schemaName);
           return null;
         }
-        tables = tableNameLoader.get(name);
+        tables = mClient.getTableNames(name);
         HiveDatabaseSchema schema = new HiveDatabaseSchema(tables, this, name);
         if (name.equals("default")) {
           this.defaultSchema = schema;
         }
         return schema;
-      } catch (ExecutionException e) {
+      } catch (final TException e) {
         logger.warn("Failure while attempting to access HiveDatabase '{}'.", name, e.getCause());
         return null;
       }
 
     }
 
-
     void setHolder(SchemaPlus plusOfThis) {
       for (String s : getSubSchemaNames()) {
         plusOfThis.add(s, getSubSchema(s));
@@ -239,9 +156,9 @@ public class HiveSchemaFactory implements SchemaFactory {
     @Override
     public Set<String> getSubSchemaNames() {
       try {
-        List<String> dbs = databases.get(DATABASES);
+        List<String> dbs = mClient.getDatabases();
         return Sets.newHashSet(dbs);
-      } catch (ExecutionException e) {
+      } catch (final TException e) {
         logger.warn("Failure while getting Hive database list.", e);
       }
       return super.getSubSchemaNames();
@@ -263,25 +180,19 @@ public class HiveSchemaFactory implements SchemaFactory {
       return defaultSchema.getTableNames();
     }
 
-    List<String> getTableNames(String dbName) {
-      try{
-        return tableNameLoader.get(dbName);
-      } catch (ExecutionException e) {
-        logger.warn("Failure while loading table names for database '{}'.", dbName, e.getCause());
-        return Collections.emptyList();
-      }
-    }
-
     DrillTable getDrillTable(String dbName, String t) {
       HiveReadEntry entry = getSelectionBaseOnName(dbName, t);
       if (entry == null) {
         return null;
       }
 
+      final String userToImpersonate = needToImpersonateReadingData() ? schemaConfig.getUserName() :
+          ImpersonationUtil.getProcessUserName();
+
       if (entry.getJdbcTableType() == TableType.VIEW) {
-        return new DrillHiveViewTable(schemaName, plugin, entry);
+        return new DrillHiveViewTable(schemaName, plugin, userToImpersonate, entry);
       } else {
-        return new DrillHiveTable(schemaName, plugin, entry);
+        return new DrillHiveTable(schemaName, plugin, userToImpersonate, entry);
       }
     }
 
@@ -290,8 +201,8 @@ public class HiveSchemaFactory implements SchemaFactory {
         dbName = "default";
       }
       try{
-        return tableLoaders.get(dbName).get(t);
-      }catch(ExecutionException e) {
+        return mClient.getHiveReadEntry(dbName, t);
+      }catch(final TException e) {
         logger.warn("Exception occurred while trying to read table. {}.{}", dbName, t, e.getCause());
         return null;
       }
@@ -307,6 +218,12 @@ public class HiveSchemaFactory implements SchemaFactory {
       return HiveStoragePluginConfig.NAME;
     }
 
+    @Override
+    public void close() throws Exception {
+      if (mClient != null) {
+        mClient.close();
+      }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c1b847ac/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestUtilities.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestUtilities.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestUtilities.java
new file mode 100644
index 0000000..49738aa
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestUtilities.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.hive;
+
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+
+public class HiveTestUtilities {
+
+  /**
+   * Execute the give <i>query</i> on given <i>hiveDriver</i> instance. If a {@link CommandNeedRetryException}
+   * exception is thrown, it tries upto 3 times before returning failure.
+   * @param hiveDriver
+   * @param query
+   */
+  public static void executeQuery(Driver hiveDriver, String query) {
+    CommandProcessorResponse response = null;
+    boolean failed = false;
+    int retryCount = 3;
+
+    try {
+      response = hiveDriver.run(query);
+    } catch(CommandNeedRetryException ex) {
+      if (--retryCount == 0) {
+        failed = true;
+      }
+    }
+
+    if (failed || response.getResponseCode() != 0 ) {
+      throw new RuntimeException(String.format("Failed to execute command '%s', errorMsg = '%s'",
+          query, (response != null ? response.getErrorMessage() : "")));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c1b847ac/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java
new file mode 100644
index 0000000..8004155
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.impersonation.hive;
+
+import org.apache.drill.TestBuilder;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.dotdrill.DotDrillType;
+import org.apache.drill.exec.impersonation.BaseTestImpersonation;
+import org.apache.drill.exec.store.hive.HiveStoragePluginConfig;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.shims.ShimLoader;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+
+public class BaseTestHiveImpersonation extends BaseTestImpersonation {
+  protected static final String hivePluginName = "hive";
+
+  protected static HiveConf hiveConf;
+  protected static String whDir;
+
+  protected static String studentData;
+  protected static String voterData;
+
+  protected static final String studentDef = "CREATE TABLE %s.%s" +
+      "(rownum int, name string, age int, gpa float, studentnum bigint) " +
+      "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE";
+  protected static final String voterDef = "CREATE TABLE %s.%s" +
+      "(voter_id int,name varchar(30), age tinyint, registration string, " +
+      "contributions double,voterzone smallint,create_time timestamp) " +
+      "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE";
+
+  protected static void prepHiveConfAndData() throws Exception {
+    hiveConf = new HiveConf();
+
+    // Configure metastore persistence db location on local filesystem
+    final String dbUrl = String.format("jdbc:derby:;databaseName=%s;create=true",  getTempDir("metastore_db"));
+    hiveConf.set(ConfVars.METASTORECONNECTURLKEY.varname, dbUrl);
+
+    hiveConf.set(ConfVars.SCRATCHDIR.varname, getTempDir("scratch_dir"));
+    hiveConf.set(ConfVars.LOCALSCRATCHDIR.varname, getTempDir("local_scratch_dir"));
+
+    // Set MiniDFS conf in HiveConf
+    hiveConf.set(FS_DEFAULT_NAME_KEY, dfsConf.get(FS_DEFAULT_NAME_KEY));
+
+    whDir = hiveConf.get(ConfVars.METASTOREWAREHOUSE.varname);
+    FileSystem.mkdirs(fs, new Path(whDir), new FsPermission((short) 0777));
+
+    studentData = getPhysicalFileFromResource("student.txt");
+    voterData = getPhysicalFileFromResource("voter.txt");
+  }
+
+  protected static void startHiveMetaStore() throws Exception {
+    final int port = MetaStoreUtils.findFreePort();
+
+    hiveConf.set(METASTOREURIS.varname, "thrift://localhost:" + port);
+
+    MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge(), hiveConf);
+  }
+
+  protected static HiveStoragePluginConfig createHiveStoragePlugin(final Map<String, String> hiveConfig) throws Exception {
+    HiveStoragePluginConfig pluginConfig = new HiveStoragePluginConfig(hiveConfig);
+    pluginConfig.setEnabled(true);
+    return pluginConfig;
+  }
+
+  protected static Path getWhPathForHiveObject(final String dbName, final String tableName) {
+    if (dbName == null) {
+      return new Path(whDir);
+    }
+
+    if (tableName == null) {
+      return new Path(whDir, dbName + ".db");
+    }
+
+    return new Path(new Path(whDir, dbName + ".db"), tableName);
+  }
+
+  protected static void addHiveStoragePlugin(final Map<String, String> hiveConfig) throws Exception {
+    getDrillbitContext().getStorage().createOrUpdate(hivePluginName, createHiveStoragePlugin(hiveConfig), true);
+  }
+
+  protected void showTablesHelper(final String db, List<String> expectedTables) throws Exception {
+    final String dbQualified = hivePluginName + "." + db;
+    final TestBuilder testBuilder = testBuilder()
+        .sqlQuery("SHOW TABLES IN " + dbQualified)
+        .unOrdered()
+        .baselineColumns("TABLE_SCHEMA", "TABLE_NAME");
+
+    if (expectedTables.size() == 0) {
+      testBuilder.expectsEmptyResultSet();
+    } else {
+      for (String tbl : expectedTables) {
+        testBuilder.baselineValues(dbQualified, tbl);
+      }
+    }
+
+    testBuilder.go();
+  }
+
+  protected static void createView(final String viewOwner, final String viewGroup, final String viewName,
+                                 final String viewDef) throws Exception {
+    updateClient(viewOwner);
+    test(String.format("ALTER SESSION SET `%s`='%o';", ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY, (short) 0750));
+    test("CREATE VIEW %s.%s.%s AS %s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", viewName, viewDef);
+    final Path viewFilePath = new Path("/tmp/", viewName + DotDrillType.VIEW.getEnding());
+    fs.setOwner(viewFilePath, viewOwner, viewGroup);
+  }
+
+  public static void stopHiveMetaStore() throws Exception {
+    // Unfortunately Hive metastore doesn't provide an API to shut it down. It will be exited as part of the test JVM
+    // exit. As each metastore server instance is using its own resources and not sharing it with other metastore
+    // server instances this should be ok.
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c1b847ac/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestSqlStdBasedAuthorization.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestSqlStdBasedAuthorization.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestSqlStdBasedAuthorization.java
new file mode 100644
index 0000000..8dc292d
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestSqlStdBasedAuthorization.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.impersonation.hive;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.security.SessionStateConfigUserAuthenticator;
+import org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.apache.drill.exec.hive.HiveTestUtilities.executeQuery;
+import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI;
+
+public class TestSqlStdBasedAuthorization extends BaseTestHiveImpersonation {
+
+  private static final String db_general = "db_general";
+
+  // Tables in "db_general"
+  private static final String g_student_user0 = "student_user0";
+  private static final String g_voter_role0 = "voter_role0";
+  private static final String g_student_user2 = "student_user2";
+
+
+  // Create a view on "g_student_user0". View is owned by user0:group0 and has permissions 750
+  private static final String v_student_u0g0_750 = "v_student_u0g0_750";
+
+  // Create a view on "v_student_u0g0_750". View is owned by user1:group1 and has permissions 750
+  private static final String v_student_u1g1_750 = "v_student_u1g1_750";
+
+  private static final String query_v_student_u0g0_750 = String.format(
+      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750);
+
+  private static final String query_v_student_u1g1_750 = String.format(
+      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u1g1_750);
+
+  // Role for testing purpose
+  private static final String test_role0 = "role0";
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    startMiniDfsCluster(TestSqlStdBasedAuthorization.class.getSimpleName());
+    prepHiveConfAndData();
+    setSqlStdBasedAuthorizationInHiveConf();
+    startHiveMetaStore();
+    startDrillCluster(true);
+    addHiveStoragePlugin(getHivePluginConfig());
+    addMiniDfsBasedStorage(Maps.<String, WorkspaceConfig>newHashMap());
+    generateTestData();
+  }
+
+  private static void setSqlStdBasedAuthorizationInHiveConf() {
+    hiveConf.set(ConfVars.HIVE_AUTHORIZATION_ENABLED.varname, "true");
+    hiveConf.set(HIVE_AUTHENTICATOR_MANAGER.varname, SessionStateConfigUserAuthenticator.class.getName());
+    hiveConf.set(HIVE_AUTHORIZATION_MANAGER.varname, SQLStdConfOnlyAuthorizerFactory.class.getName());
+    hiveConf.set(ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
+    hiveConf.set(ConfVars.METASTORE_EXECUTE_SET_UGI.varname, "false");
+    hiveConf.set(ConfVars.USERS_IN_ADMIN_ROLE.varname, processUser);
+  }
+
+  private static Map<String, String> getHivePluginConfig() {
+    final Map<String, String> hiveConfig = Maps.newHashMap();
+    hiveConfig.put(METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname));
+    hiveConfig.put(FS_DEFAULT_NAME_KEY, dfsConf.get(FS_DEFAULT_NAME_KEY));
+    hiveConfig.put(HIVE_SERVER2_ENABLE_DOAS.varname, hiveConf.get(HIVE_SERVER2_ENABLE_DOAS.varname));
+    hiveConfig.put(METASTORE_EXECUTE_SET_UGI.varname, hiveConf.get(METASTORE_EXECUTE_SET_UGI.varname));
+    hiveConfig.put(HIVE_AUTHORIZATION_ENABLED.varname, hiveConf.get(HIVE_AUTHORIZATION_ENABLED.varname));
+    hiveConfig.put(HIVE_AUTHENTICATOR_MANAGER.varname, SessionStateUserAuthenticator.class.getName());
+    hiveConfig.put(HIVE_AUTHORIZATION_MANAGER.varname, SQLStdHiveAuthorizerFactory.class.getName());
+    return hiveConfig;
+  }
+
+  private static void generateTestData() throws Exception {
+    final SessionState ss = new SessionState(hiveConf);
+    SessionState.start(ss);
+    final Driver driver = new Driver(hiveConf);
+
+    executeQuery(driver, "CREATE DATABASE " + db_general);
+    createTbl(driver, db_general, g_student_user0, studentDef, studentData);
+    createTbl(driver, db_general, g_voter_role0, voterDef, voterData);
+    createTbl(driver, db_general, g_student_user2, studentDef, studentData);
+
+    executeQuery(driver, "SET ROLE admin");
+    executeQuery(driver, "CREATE ROLE " + test_role0);
+    executeQuery(driver, "GRANT ROLE " + test_role0 + " TO USER " + org1Users[1]);
+    executeQuery(driver, "GRANT ROLE " + test_role0 + " TO USER " + org1Users[2]);
+
+    executeQuery(driver, String.format("GRANT SELECT ON %s.%s TO USER %s", db_general, g_student_user0, org1Users[0]));
+    executeQuery(driver, String.format("GRANT SELECT ON %s.%s TO ROLE %s", db_general, g_voter_role0, test_role0));
+    executeQuery(driver, String.format("GRANT SELECT ON %s.%s TO USER %s", db_general, g_student_user2, org1Users[2]));
+
+    createView(org1Users[0], org1Groups[0], v_student_u0g0_750,
+        String.format("SELECT rownum, name, age, studentnum FROM %s.%s.%s",
+            hivePluginName, db_general, g_student_user0));
+
+    createView(org1Users[1], org1Groups[1], v_student_u1g1_750,
+        String.format("SELECT rownum, name, age FROM %s.%s.%s",
+            MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750));
+  }
+
+  private static void createTbl(final Driver driver, final String db, final String tbl, final String tblDef,
+      final String data) throws Exception {
+    executeQuery(driver, String.format(tblDef, db, tbl));
+    executeQuery(driver, String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE %s.%s", data, db, tbl));
+  }
+
+  // Irrespective of each db permissions, all dbs show up in "SHOW SCHEMAS"
+  @Test
+  public void showSchemas() throws Exception {
+    testBuilder()
+        .sqlQuery("SHOW SCHEMAS LIKE 'hive.%'")
+        .unOrdered()
+        .baselineColumns("SCHEMA_NAME")
+        .baselineValues("hive.db_general")
+        .baselineValues("hive.default")
+        .go();
+  }
+
+  @Test
+  public void showTables_user0() throws Exception {
+    updateClient(org1Users[0]);
+    showTablesHelper(db_general,
+        // Users are expected to see all tables in a database even if they don't have permissions to read from tables.
+        ImmutableList.of(
+            g_student_user0,
+            g_student_user2,
+            g_voter_role0
+        ));
+  }
+
+  @Test
+  public void showTables_user1() throws Exception {
+    updateClient(org1Users[1]);
+    showTablesHelper(db_general,
+        // Users are expected to see all tables in a database even if they don't have permissions to read from tables.
+        ImmutableList.of(
+            g_student_user0,
+            g_student_user2,
+            g_voter_role0
+        ));
+  }
+
+  @Test
+  public void select_user0_1() throws Exception {
+    // SELECT on "student_user0" table is granted to user "user0"
+    updateClient(org1Users[0]);
+    test("USE " + hivePluginName + "." + db_general);
+    test(String.format("SELECT * FROM %s ORDER BY name LIMIT 2", g_student_user0));
+  }
+
+  @Test
+  public void select_user0_2() throws Exception {
+    // SELECT on table "student_user0" is NOT granted to user "user0" directly or indirectly through role "role0" as
+    // user "user0" is not part of role "role0"
+    updateClient(org1Users[0]);
+    test("USE " + hivePluginName + "." + db_general);
+    final String query = String.format("SELECT * FROM %s ORDER BY name LIMIT 2", g_voter_role0);
+    errorMsgTestHelper(query, "Principal [name=user0_1, type=USER] does not have following privileges for " +
+        "operation QUERY [[SELECT] on Object [type=TABLE_OR_VIEW, name=db_general.voter_role0]]\n");
+  }
+
+  @Test
+  public void select_user1_1() throws Exception {
+    // SELECT on table "student_user0" is NOT granted to user "user1"
+    updateClient(org1Users[1]);
+    test("USE " + hivePluginName + "." + db_general);
+    final String query = String.format("SELECT * FROM %s ORDER BY name LIMIT 2", g_student_user0);
+    errorMsgTestHelper(query, "Principal [name=user1_1, type=USER] does not have following privileges for " +
+        "operation QUERY [[SELECT] on Object [type=TABLE_OR_VIEW, name=db_general.student_user0]]\n");
+  }
+
+  @Test
+  public void select_user1_2() throws Exception {
+    // SELECT on "voter_role0" table is granted to role "role0" and user "user1" is part the role "role0"
+    updateClient(org1Users[1]);
+    test("USE " + hivePluginName + "." + db_general);
+    test(String.format("SELECT * FROM %s ORDER BY name LIMIT 2", g_voter_role0));
+  }
+
+  @Test
+  public void select_user1_3() throws Exception {
+    // SELECT on "voter_role0" table is granted to role "role0" and user "user1" is part the role "role0"
+    // SELECT on "student_user2" table is NOT granted to either role "role0" or user "user1"
+    updateClient(org1Users[1]);
+    test("USE " + hivePluginName + "." + db_general);
+    final String query =
+        String.format("SELECT * FROM %s v JOIN %s s on v.name = s.name limit 2;", g_voter_role0, g_student_user2);
+    errorMsgTestHelper(query, "Principal [name=user1_1, type=USER] does not have following privileges for " +
+        "operation QUERY [[SELECT] on Object [type=TABLE_OR_VIEW, name=db_general.student_user2]]");
+  }
+
+  @Test
+  public void select_user2_1() throws Exception {
+    // SELECT on "voter_role0" table is granted to role "role0" and user "user2" is part the role "role0"
+    updateClient(org1Users[2]);
+    test("USE " + hivePluginName + "." + db_general);
+    test(String.format("SELECT * FROM %s ORDER BY name LIMIT 2", g_voter_role0));
+  }
+
+  @Test
+  public void select_user2_2() throws Exception {
+    // SELECT on "student_user2" table is granted to user "user2"
+    updateClient(org1Users[2]);
+    test("USE " + hivePluginName + "." + db_general);
+    test(String.format("SELECT * FROM %s ORDER BY name LIMIT 2", g_student_user2));
+  }
+
+  @Test
+  public void select_user2_3() throws Exception {
+    // SELECT on "voter_role0" table is granted to role "role0" and user "user2" is part the role "role0"
+    // SELECT on "student_user2" table is granted to user "user2"
+    updateClient(org1Users[2]);
+    test("USE " + hivePluginName + "." + db_general);
+    test(String.format("SELECT * FROM %s v JOIN %s s on v.name = s.name limit 2;", g_voter_role0, g_student_user2));
+  }
+
+  private static void queryViewHelper(final String queryUser, final String query) throws Exception {
+    updateClient(queryUser);
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("rownum")
+        .baselineValues(1)
+        .go();
+  }
+
+  @Test
+  public void selectUser0_v_student_u0g0_750() throws Exception {
+    queryViewHelper(org1Users[0], query_v_student_u0g0_750);
+  }
+
+  @Test
+  public void selectUser1_v_student_u0g0_750() throws Exception {
+    queryViewHelper(org1Users[1], query_v_student_u0g0_750);
+  }
+
+  @Test
+  public void selectUser2_v_student_u0g0_750() throws Exception {
+    updateClient(org1Users[2]);
+    errorMsgTestHelper(query_v_student_u0g0_750,
+        "Not authorized to read view [v_student_u0g0_750] in schema [miniDfsPlugin.tmp]");
+  }
+
+  @Test
+  public void selectUser0_v_student_u1g1_750() throws Exception {
+    updateClient(org1Users[0]);
+    errorMsgTestHelper(query_v_student_u1g1_750,
+        "Not authorized to read view [v_student_u1g1_750] in schema [miniDfsPlugin.tmp]");
+  }
+
+  @Test
+  public void selectUser1_v_student_u1g1_750() throws Exception {
+    queryViewHelper(org1Users[1], query_v_student_u1g1_750);
+  }
+
+  @Test
+  public void selectUser2_v_student_u1g1_750() throws Exception {
+    queryViewHelper(org1Users[2], query_v_student_u1g1_750);
+  }
+
+  @AfterClass
+  public static void shutdown() throws Exception {
+    stopMiniDfsCluster();
+    stopHiveMetaStore();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c1b847ac/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestStorageBasedHiveAuthorization.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestStorageBasedHiveAuthorization.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestStorageBasedHiveAuthorization.java
new file mode 100644
index 0000000..69e4f8d
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestStorageBasedHiveAuthorization.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.impersonation.hive;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.dotdrill.DotDrillType;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.security.HadoopDefaultMetastoreAuthenticator;
+import org.apache.hadoop.hive.ql.security.authorization.AuthorizationPreEventListener;
+import org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.drill.exec.hive.HiveTestUtilities.executeQuery;
+import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_METASTORE_AUTHENTICATOR_MANAGER;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_MANAGER;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS;
+
+public class TestStorageBasedHiveAuthorization extends BaseTestHiveImpersonation {
+
+  // DB whose warehouse directory has permissions 755, available everyone to read
+  private static final String db_general = "db_general";
+
+  // Tables in "db_general"
+  private static final String g_student_u0_700 = "student_u0_700";
+  private static final String g_student_u0g0_750 = "student_u0g0_750";
+  private static final String g_student_all_755 = "student_all_755";
+  private static final String g_voter_u1_700 = "voter_u1_700";
+  private static final String g_voter_u2g1_750 = "voter_u2g1_750";
+  private static final String g_voter_all_755 = "voter_all_755";
+
+  // DB whose warehouse directory has permissions 700 and owned by user0
+  private static final String db_u0_only = "db_u0_only";
+
+  // Tables in "db_u0_only"
+  private static final String u0_student_all_755 = "student_all_755";
+  private static final String u0_voter_all_755 = "voter_all_755";
+
+  // DB whose warehouse directory has permissions 750 and owned by user1 and group1
+  private static final String db_u1g1_only = "db_u1g1_only";
+
+  // Tables in "db_u1g1_only"
+  private static final String u1g1_student_all_755 = "student_all_755";
+  private static final String u1g1_student_u1_700 = "student_u1_700";
+  private static final String u1g1_voter_all_755 = "voter_all_755";
+  private static final String u1g1_voter_u1_700 = "voter_u1_700";
+
+  // Create a view on "student_u0_700". View is owned by user0:group0 and has permissions 750
+  private static final String v_student_u0g0_750 = "v_student_u0g0_750";
+
+  // Create a view on "v_student_u0g0_750". View is owned by user1:group1 and has permissions 750
+  private static final String v_student_u1g1_750 = "v_student_u1g1_750";
+
+  private static final String query_v_student_u0g0_750 = String.format(
+      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750);
+
+  private static final String query_v_student_u1g1_750 = String.format(
+      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u1g1_750);
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    startMiniDfsCluster(TestStorageBasedHiveAuthorization.class.getName());
+    prepHiveConfAndData();
+    setStorabaseBasedAuthorizationInHiveConf();
+    startHiveMetaStore();
+    startDrillCluster(true);
+    addHiveStoragePlugin(getHivePluginConfig());
+    addMiniDfsBasedStorage(Maps.<String, WorkspaceConfig>newHashMap());
+    generateTestData();
+  }
+
+  private static void setStorabaseBasedAuthorizationInHiveConf() {
+    // Turn on metastore-side authorization
+    hiveConf.set(METASTORE_PRE_EVENT_LISTENERS.varname, AuthorizationPreEventListener.class.getName());
+    hiveConf.set(HIVE_METASTORE_AUTHENTICATOR_MANAGER.varname, HadoopDefaultMetastoreAuthenticator.class.getName());
+    hiveConf.set(HIVE_METASTORE_AUTHORIZATION_MANAGER.varname, StorageBasedAuthorizationProvider.class.getName());
+    hiveConf.set(HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname, "true");
+    hiveConf.set(METASTORE_EXECUTE_SET_UGI.varname, "true");
+  }
+
+  private static Map<String, String> getHivePluginConfig() {
+    final Map<String, String> hiveConfig = Maps.newHashMap();
+    hiveConfig.put(METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname));
+    hiveConfig.put(FS_DEFAULT_NAME_KEY, dfsConf.get(FS_DEFAULT_NAME_KEY));
+    hiveConfig.put(HIVE_SERVER2_ENABLE_DOAS.varname, hiveConf.get(HIVE_SERVER2_ENABLE_DOAS.varname));
+    hiveConfig.put(METASTORE_EXECUTE_SET_UGI.varname, hiveConf.get(METASTORE_EXECUTE_SET_UGI.varname));
+    return hiveConfig;
+  }
+
+  private static void generateTestData() throws Exception {
+
+    // Generate Hive test tables
+    final SessionState ss = new SessionState(hiveConf);
+    SessionState.start(ss);
+    final Driver driver = new Driver(hiveConf);
+
+    executeQuery(driver, "CREATE DATABASE " + db_general);
+
+    createTable(driver,
+        db_general, g_student_u0_700, studentDef, studentData, org1Users[0], org1Groups[0], (short) 0700);
+    createTable(driver,
+        db_general, g_student_u0g0_750, studentDef, studentData, org1Users[0], org1Groups[0], (short) 0750);
+    createTable(driver,
+        db_general, g_student_all_755, studentDef, studentData, org1Users[2], org1Groups[2], (short) 0755);
+    createTable(driver,
+        db_general, g_voter_u1_700, voterDef, voterData, org1Users[1], org1Groups[1], (short) 0700);
+    createTable(driver,
+        db_general, g_voter_u2g1_750, voterDef, voterData, org1Users[2], org1Groups[1], (short) 0750);
+    createTable(driver,
+        db_general, g_voter_all_755, voterDef, voterData, org1Users[1], org1Groups[1], (short) 0755);
+
+    changeDBPermissions(db_general, (short) 0755, org1Users[0], org1Groups[0]);
+
+    executeQuery(driver, "CREATE DATABASE " + db_u1g1_only);
+
+    createTable(driver,
+        db_u1g1_only, u1g1_student_all_755, studentDef, studentData, org1Users[1], org1Groups[1], (short) 0755);
+    createTable(driver,
+        db_u1g1_only, u1g1_student_u1_700, studentDef, studentData, org1Users[1], org1Groups[1], (short) 0700);
+    createTable(driver,
+        db_u1g1_only, u1g1_voter_all_755, voterDef, voterData, org1Users[1], org1Groups[1], (short) 0755);
+    createTable(driver,
+        db_u1g1_only, u1g1_voter_u1_700, voterDef, voterData, org1Users[1], org1Groups[1], (short) 0700);
+
+    changeDBPermissions(db_u1g1_only, (short) 0750, org1Users[1], org1Groups[1]);
+
+    executeQuery(driver, "CREATE DATABASE " + db_u0_only);
+
+    createTable(driver, db_u0_only, u0_student_all_755, studentDef, studentData, org1Users[0], org1Groups[0], (short) 0755);
+    createTable(driver, db_u0_only, u0_voter_all_755, voterDef, voterData, org1Users[0], org1Groups[0], (short) 0755);
+
+    changeDBPermissions(db_u0_only, (short) 0700, org1Users[0], org1Groups[0]);
+
+    createView(org1Users[0], org1Groups[0], v_student_u0g0_750,
+        String.format("SELECT rownum, name, age, studentnum FROM %s.%s.%s",
+            hivePluginName, db_general, g_student_u0_700));
+
+    createView(org1Users[1], org1Groups[1], v_student_u1g1_750,
+        String.format("SELECT rownum, name, age FROM %s.%s.%s",
+            MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750));
+  }
+
+  private static void createTable(final Driver hiveDriver, final String db, final String tbl, final String tblDef,
+      final String tblData, final String user, final String group, final short permissions) throws Exception {
+    executeQuery(hiveDriver, String.format(tblDef, db, tbl));
+    executeQuery(hiveDriver, String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE %s.%s", tblData, db, tbl));
+    final Path p = getWhPathForHiveObject(db, tbl);
+    fs.setPermission(p, new FsPermission(permissions));
+    fs.setOwner(p, user, group);
+  }
+
+  private static void changeDBPermissions(final String db, final short perm, final String u, final String g)
+      throws Exception {
+    Path p = getWhPathForHiveObject(db, null);
+    fs.setPermission(p, new FsPermission(perm));
+    fs.setOwner(p, u, g);
+  }
+
+  // Irrespective of each db permissions, all dbs show up in "SHOW SCHEMAS"
+  @Test
+  public void showSchemas() throws Exception {
+    testBuilder()
+        .sqlQuery("SHOW SCHEMAS LIKE 'hive.%'")
+        .unOrdered()
+        .baselineColumns("SCHEMA_NAME")
+        .baselineValues("hive.db_general")
+        .baselineValues("hive.db_u0_only")
+        .baselineValues("hive.db_u1g1_only")
+        .baselineValues("hive.default")
+        .go();
+  }
+
+  /**
+   * "SHOW TABLE" output for a db, should only contain the tables that the user
+   * has access to read. If the user has no read access to the db, the list will be always empty even if the user has
+   * read access to the tables inside the db.
+   * @throws Exception
+   */
+  @Test
+  public void showTablesUser0() throws Exception {
+    updateClient(org1Users[0]);
+
+    showTablesHelper(db_general,
+        ImmutableList.of(
+            g_student_u0_700,
+            g_student_u0g0_750,
+            g_student_all_755,
+            g_voter_all_755
+        ));
+
+    showTablesHelper(db_u0_only,
+        ImmutableList.of(
+            u0_student_all_755,
+            u0_voter_all_755
+        ));
+
+    showTablesHelper(db_u1g1_only, Collections.EMPTY_LIST);
+  }
+
+  @Test
+  public void showTablesUser1() throws Exception {
+    updateClient(org1Users[1]);
+
+    showTablesHelper(db_general,
+        ImmutableList.of(
+            g_student_u0g0_750,
+            g_student_all_755,
+            g_voter_u1_700,
+            g_voter_u2g1_750,
+            g_voter_all_755
+        ));
+
+    showTablesHelper(db_u1g1_only,
+        ImmutableList.of(
+            u1g1_student_all_755,
+            u1g1_student_u1_700,
+            u1g1_voter_all_755,
+            u1g1_voter_u1_700
+        ));
+
+    showTablesHelper(db_u0_only, Collections.EMPTY_LIST);
+  }
+
+  @Test
+  public void showTablesUser2() throws Exception {
+    updateClient(org1Users[2]);
+
+    showTablesHelper(db_general,
+        ImmutableList.of(
+            g_student_all_755,
+            g_voter_u2g1_750,
+            g_voter_all_755
+        ));
+
+    showTablesHelper(db_u1g1_only,
+        ImmutableList.of(
+            u1g1_student_all_755,
+            u1g1_voter_all_755
+        ));
+
+    showTablesHelper(db_u0_only, Collections.EMPTY_LIST);
+  }
+
+  // Try to read the tables "user0" has access to read in db_general.
+  @Test
+  public void selectUser0_db_general() throws Exception {
+    updateClient(org1Users[0]);
+
+    test(String.format("SELECT * FROM hive.%s.%s ORDER BY gpa DESC LIMIT 2", db_general, g_student_u0_700));
+    test(String.format("SELECT * FROM hive.%s.%s ORDER BY gpa DESC LIMIT 2", db_general, g_student_all_755));
+    test(String.format("SELECT * FROM hive.%s.%s ORDER BY name DESC LIMIT 2", db_general, g_voter_all_755));
+  }
+
+  // Try to read the table that "user0" has access to read in db_u0_only
+  @Test
+  public void selectUser0_db_u0_only() throws Exception {
+    updateClient(org1Users[0]);
+
+    test(String.format("SELECT * FROM hive.%s.%s ORDER BY gpa DESC LIMIT 2", db_u0_only, u0_student_all_755));
+    test(String.format("SELECT * FROM hive.%s.%s ORDER BY name DESC LIMIT 2", db_u0_only, u0_voter_all_755));
+  }
+
+  // Try to read the tables "user0" has no access to read in db_u1g1_only
+  @Test
+  public void selectUser0_db_u1g1_only() throws Exception {
+    updateClient(org1Users[0]);
+
+    errorMsgTestHelper(
+        String.format("SELECT * FROM hive.%s.%s ORDER BY gpa DESC LIMIT 2", db_u1g1_only, u1g1_student_all_755),
+        String.format("Table 'hive.%s.%s' not found", db_u1g1_only, u1g1_student_all_755));
+  }
+
+  // Try to read the tables "user1" has access to read in db_general.
+  @Test
+  public void selectUser1_db_general() throws Exception {
+    updateClient(org1Users[1]);
+
+    test(String.format("SELECT * FROM hive.%s.%s ORDER BY gpa DESC LIMIT 2", db_general, g_student_u0g0_750));
+    test(String.format("SELECT * FROM hive.%s.%s ORDER BY gpa DESC LIMIT 2", db_general, g_student_all_755));
+    test(String.format("SELECT * FROM hive.%s.%s ORDER BY name DESC LIMIT 2", db_general, g_voter_u2g1_750));
+  }
+
+  // Try to read the tables "user1" has no access to read in db_u0_only
+  @Test
+  public void selectUser1_db_u0_only() throws Exception {
+    updateClient(org1Users[1]);
+
+    errorMsgTestHelper(
+        String.format("SELECT * FROM hive.%s.%s ORDER BY gpa DESC LIMIT 2", db_u0_only, u0_student_all_755),
+        String.format("Table 'hive.%s.%s' not found", db_u0_only, u0_student_all_755));
+  }
+
+  private static void queryViewHelper(final String queryUser, final String query) throws Exception {
+    updateClient(queryUser);
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("rownum")
+        .baselineValues(1)
+        .go();
+  }
+
+  @Test
+  public void selectUser0_v_student_u0g0_750() throws Exception {
+    queryViewHelper(org1Users[0], query_v_student_u0g0_750);
+  }
+
+  @Test
+  public void selectUser1_v_student_u0g0_750() throws Exception {
+    queryViewHelper(org1Users[1], query_v_student_u0g0_750);
+  }
+
+  @Test
+  public void selectUser2_v_student_u0g0_750() throws Exception {
+    updateClient(org1Users[2]);
+    errorMsgTestHelper(query_v_student_u0g0_750,
+        "Not authorized to read view [v_student_u0g0_750] in schema [miniDfsPlugin.tmp]");
+  }
+
+  @Test
+  public void selectUser0_v_student_u1g1_750() throws Exception {
+    updateClient(org1Users[0]);
+    errorMsgTestHelper(query_v_student_u1g1_750,
+        "Not authorized to read view [v_student_u1g1_750] in schema [miniDfsPlugin.tmp]");
+  }
+
+  @Test
+  public void selectUser1_v_student_u1g1_750() throws Exception {
+    queryViewHelper(org1Users[1], query_v_student_u1g1_750);
+  }
+
+  @Test
+  public void selectUser2_v_student_u1g1_750() throws Exception {
+    queryViewHelper(org1Users[2], query_v_student_u1g1_750);
+  }
+
+  @AfterClass
+  public static void shutdown() throws Exception {
+    stopMiniDfsCluster();
+    stopHiveMetaStore();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c1b847ac/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index ea8d90f..965a863 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -24,7 +24,6 @@ import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.Map;
 
-import com.google.common.io.Files;
 import org.apache.commons.io.FileUtils;
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.exceptions.DrillException;
@@ -32,16 +31,16 @@ import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 
 import com.google.common.collect.Maps;
 
+import static org.apache.drill.BaseTestQuery.getTempDir;
+import static org.apache.drill.exec.hive.HiveTestUtilities.executeQuery;
+
 public class HiveTestDataGenerator {
   private static final String HIVE_TEST_PLUGIN_NAME = "hive";
-  private static final int RETRIES = 5;
   private static HiveTestDataGenerator instance;
 
   private final String dbDir;
@@ -50,13 +49,8 @@ public class HiveTestDataGenerator {
 
   public static synchronized HiveTestDataGenerator getInstance() throws Exception {
     if (instance == null) {
-      final File db = Files.createTempDir();
-      db.deleteOnExit();
-      final String dbDir = db.getAbsolutePath() + File.separator + "metastore_db";
-
-      final File wh = Files.createTempDir();
-      wh.deleteOnExit();
-      final String whDir = wh.getAbsolutePath();
+      final String dbDir = getTempDir("metastore_db");
+      final String whDir = getTempDir("warehouse");
 
       instance = new HiveTestDataGenerator(dbDir, whDir);
       instance.generateTestData();
@@ -121,7 +115,8 @@ public class HiveTestDataGenerator {
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
     conf.set("hive.metastore.warehouse.dir", whDir);
     conf.set("mapred.job.tracker", "local");
-    conf.set("hive.exec.scratchdir",  Files.createTempDir().getAbsolutePath() + File.separator + "scratch_dir");
+    conf.set(ConfVars.SCRATCHDIR.varname,  getTempDir("scratch_dir"));
+    conf.set(ConfVars.LOCALSCRATCHDIR.varname, getTempDir("local_scratch_dir"));
 
     SessionState ss = new SessionState(conf);
     SessionState.start(ss);
@@ -151,7 +146,8 @@ public class HiveTestDataGenerator {
         "ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' " +
         "STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' " +
         "OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' " +
-        "TBLPROPERTIES ('avro.schema.url'='file:///%s')", getSchemaFile("avro_test_schema.json"));
+        "TBLPROPERTIES ('avro.schema.url'='file:///%s')",
+        BaseTestQuery.getPhysicalFileFromResource("avro_test_schema.json"));
 
     executeQuery(hiveDriver, avroCreateQuery);
     executeQuery(hiveDriver, "INSERT INTO TABLE db1.avro SELECT * FROM default.kv");
@@ -320,15 +316,6 @@ public class HiveTestDataGenerator {
     return file.getPath();
   }
 
-  private String getSchemaFile(final String resource) throws Exception {
-    final File file = getTempFile();
-    PrintWriter printWriter = new PrintWriter(file);
-    printWriter.write(BaseTestQuery.getFile(resource));
-    printWriter.close();
-
-    return file.getPath();
-  }
-
   private String generateTestDataFileWithDate() throws Exception {
     final File file = getTempFile();
 
@@ -355,24 +342,4 @@ public class HiveTestDataGenerator {
 
     return file.getPath();
   }
-
-  private void executeQuery(Driver hiveDriver, String query) {
-    CommandProcessorResponse response = null;
-    boolean failed = false;
-    int retryCount = RETRIES;
-
-    try {
-      response = hiveDriver.run(query);
-    } catch(CommandNeedRetryException ex) {
-      if (--retryCount == 0) {
-        failed = true;
-      }
-    }
-
-    if (failed || response.getResponseCode() != 0 ) {
-      throw new RuntimeException(String.format("Failed to execute command '%s', errorMsg = '%s'",
-        query, (response != null ? response.getErrorMessage() : "")));
-    }
-  }
-
 }