You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2022/01/19 10:42:22 UTC

[GitHub] [drill] vdiravka commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

vdiravka commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r785419426



##########
File path: contrib/storage-phoenix/pom.xml
##########
@@ -33,6 +33,7 @@
     <phoenix.version>5.1.2</phoenix.version>
     <!-- Keep the 2.4.2 to reduce dependency conflict -->
     <hbase.minicluster.version>2.4.2</hbase.minicluster.version>
+    <jetty.version>9.4.31.v20200723</jetty.version>

Review comment:
       Jetty is used by testing PQS:
   https://github.com/apache/phoenix-queryserver/blob/master/pom.xml#L83
   Tests failed with Drill Jetty `9.4.41.v20210516` version

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
##########
@@ -71,58 +74,64 @@ public PhoenixBatchReader(PhoenixSubScan subScan) {
 
   @Override
   public boolean open(SchemaNegotiator negotiator) {
-    try {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+    return ugi.doAs((PrivilegedAction<Boolean>) () -> {
       errorContext = negotiator.parentErrorContext();
-      conn = subScan.getPlugin().getDataSource().getConnection();
-      pstmt = conn.prepareStatement(subScan.getSql());
-      results = pstmt.executeQuery();
-      meta = pstmt.getMetaData();
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to execute the phoenix sql query. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    try {
-      negotiator.tableSchema(defineMetadata(), true);
-      reader = new PhoenixReader(negotiator.build(), columns, results);
-      bindColumns(reader.getStorage());
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to get type of columns from metadata. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    watch = Stopwatch.createStarted();
-    return true;
+      try {
+        pstmt =
+          subScan.getPlugin().getDataSource(negotiator.userName()).getConnection().prepareStatement(subScan.getSql());

Review comment:
       ok

##########
File path: contrib/storage-phoenix/pom.xml
##########
@@ -187,12 +194,79 @@
       <version>${hbase.minicluster.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-asyncfs</artifactId>
+      <type>test-jar</type>
+      <version>${hbase.minicluster.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs-client</artifactId>
       <version>${hadoop.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <version>${hbase.minicluster.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-protocol-shaded</artifactId>

Review comment:
       Agreed. It is already declared. Thanks

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
##########
@@ -71,58 +74,64 @@ public PhoenixBatchReader(PhoenixSubScan subScan) {
 
   @Override
   public boolean open(SchemaNegotiator negotiator) {
-    try {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+    return ugi.doAs((PrivilegedAction<Boolean>) () -> {
       errorContext = negotiator.parentErrorContext();
-      conn = subScan.getPlugin().getDataSource().getConnection();
-      pstmt = conn.prepareStatement(subScan.getSql());
-      results = pstmt.executeQuery();
-      meta = pstmt.getMetaData();
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to execute the phoenix sql query. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    try {
-      negotiator.tableSchema(defineMetadata(), true);
-      reader = new PhoenixReader(negotiator.build(), columns, results);
-      bindColumns(reader.getStorage());
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to get type of columns from metadata. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    watch = Stopwatch.createStarted();
-    return true;
+      try {
+        pstmt =
+          subScan.getPlugin().getDataSource(negotiator.userName()).getConnection().prepareStatement(subScan.getSql());
+        results = pstmt.executeQuery();
+        meta = pstmt.getMetaData();
+      } catch (SQLException e) {
+        throw UserException
+          .dataReadError(e)
+          .message("Failed to execute the phoenix sql query. " + e.getMessage())
+          .addContext(negotiator.parentErrorContext())
+          .build(logger);
+      }
+      try {
+        negotiator.tableSchema(defineMetadata(), true);
+        reader = new PhoenixReader(negotiator.build(), columns, results);
+        bindColumns(reader.getStorage());
+      } catch (SQLException e) {
+        throw UserException
+          .dataReadError(e)
+          .message("Failed to get type of columns from metadata. " + e.getMessage())
+          .addContext(errorContext)
+          .build(logger);
+      }
+      watch = Stopwatch.createStarted();
+      return true;
+    });
   }
 
   @Override
   public boolean next() {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();

Review comment:
       sure. Done

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
##########
@@ -39,44 +40,49 @@
  * is not always left in a healthy state by the previous user. It is better to
  * create new Phoenix Connections to ensure that you avoid any potential issues.
  */
+@Slf4j
 public class PhoenixDataSource implements DataSource {
 
   private static final String DEFAULT_URL_HEADER = "jdbc:phoenix:thin:url=http://";
   private static final String DEFAULT_SERIALIZATION = "serialization=PROTOBUF";
+  private static final String IMPERSONATED_USER_VARIABLE = "$user";
+  private static final String DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM = "doAs";
 
-  private String url;
+  private final String url;
   private Map<String, Object> connectionProperties;
-  private boolean isFatClient; // Is a fat client
-
-  public PhoenixDataSource(String url) {
-    Preconditions.checkNotNull(url);
-    this.url = url;
-  }
-
-  public PhoenixDataSource(String host, int port) {
-    Preconditions.checkNotNull(host);
-    Preconditions.checkArgument(port > 0, "Please set the correct port.");
-    this.url = new StringBuilder()
-        .append(DEFAULT_URL_HEADER)
-        .append(host)
-        .append(":")
-        .append(port)
-        .append(";")
-        .append(DEFAULT_SERIALIZATION)
-        .toString();
-  }
-
-  public PhoenixDataSource(String url, Map<String, Object> connectionProperties) {
-    this(url);
+  private boolean isFatClient;
+  private String user;

Review comment:
       done. thanks

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -18,75 +18,100 @@
 package org.apache.drill.exec.store.phoenix;
 
 import java.io.IOException;
-import java.sql.Connection;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.sql.DataSource;
 
 import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.drill.exec.util.ImpersonationUtil.getProcessUserName;
 
 public class PhoenixSchemaFactory extends AbstractSchemaFactory {
 
   private final PhoenixStoragePlugin plugin;
   private final Map<String, PhoenixSchema> schemaMap;
   private PhoenixSchema rootSchema;
+  private final boolean isDrillImpersonationEnabled;
 
   public PhoenixSchemaFactory(PhoenixStoragePlugin plugin) {
     super(plugin.getName());
     this.plugin = plugin;
-    this.schemaMap = Maps.newHashMap();
+    this.schemaMap = new HashMap<>();
+    isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    rootSchema = new PhoenixSchema(plugin, Collections.emptyList(), plugin.getName());
-    locateSchemas();
+    try {
+      rootSchema = new PhoenixSchema(schemaConfig, plugin, Collections.emptyList(), plugin.getName());
+      String schemaUser = schemaConfig.getUserName();
+      locateSchemas(schemaConfig, rootSchema.getUser(schemaUser, getProcessUserName()));
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
     parent.add(getName(), rootSchema); // resolve the top-level schema.
     for (String schemaName : rootSchema.getSubSchemaNames()) {
       PhoenixSchema schema = (PhoenixSchema) rootSchema.getSubSchema(schemaName);
       parent.add(schemaName, schema); // provide all available schemas for calcite.
     }
   }
 
-  private void locateSchemas() {
-    DataSource ds = plugin.getDataSource();
-    try (Connection conn = ds.getConnection();
-          ResultSet rs = ds.getConnection().getMetaData().getSchemas()) {
-      while (rs.next()) {
-        final String schemaName = rs.getString(1); // lookup the schema (or called database).
-        PhoenixSchema schema = new PhoenixSchema(plugin, Arrays.asList(getName()), schemaName);
-        schemaMap.put(schemaName, schema);
-      }
-      rootSchema.addSchemas(schemaMap);
-    } catch (SQLException e) {
-      throw new DrillRuntimeException(e.getMessage(), e);
+  private void locateSchemas(SchemaConfig schemaConfig, String userName) throws SQLException {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+    try {
+      ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        try (ResultSet rs = plugin.getDataSource(userName).getConnection().getMetaData().getSchemas()) {
+          while (rs.next()) {
+            final String schemaName = rs.getString(1); // lookup the schema (or called database).
+            PhoenixSchema schema = new PhoenixSchema(schemaConfig, plugin, Arrays.asList(getName()), schemaName);
+            schemaMap.put(schemaName, schema);
+          }
+          rootSchema.addSchemas(schemaMap);
+        }
+        return null;
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new SQLException(e);
     }
   }
 
-  protected static class PhoenixSchema extends AbstractSchema {
+  @Override
+  public boolean needToImpersonateReadingData() {

Review comment:
       Trying to factor out to interface the similar logic with `HiveSchemaFactory`, to implement the similar Impersonation logic for these and future datasources. Currently we don't control impersonation enabling in Phoenix via Drill, but it is not the case for Hive and can be the same for other datasources.

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -101,21 +126,26 @@ public Schema getSubSchema(String name) {
 
     @Override
     public Table getTable(String name) {
-      Table table = jdbcSchema.getTable(StringUtils.upperCase(name));
-      return table;
+      final UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();

Review comment:
       It is possible. Thanks

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixBaseTest.java
##########
@@ -38,51 +38,63 @@
 
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
 import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.LoggerFactory;
 
 import com.univocity.parsers.csv.CsvParser;
 import com.univocity.parsers.csv.CsvParserSettings;
 
-public class PhoenixBaseTest extends ClusterTest {
+public abstract class PhoenixBaseTest extends ClusterTest {
 
   private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PhoenixBaseTest.class);
 
-  private static AtomicInteger initCount = new AtomicInteger(0);
-  protected static String U_U_I_D = UUID.randomUUID().toString();
+  public final static String U_U_I_D = UUID.randomUUID().toString();
+  private final static AtomicInteger initCount = new AtomicInteger(0);
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+    PhoenixTestSuite.initPhoenixQueryServer();
     if (PhoenixTestSuite.isRunningSuite()) {
       QueryServerBasicsIT.testCatalogs();
     }
-    bootMiniCluster();
+    bootDrillMiniCluster();
+    dirTestWatcher.copyResourceToRoot(Paths.get(""));

Review comment:
       Done.
   Also I renamed `bootSecuredDrillMiniCluster` -> `startSecuredDrillCluster`

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/HttpParamImpersonationQueryServerIT.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.queryserver.QueryServerOptions;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.client.Driver;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.drill.exec.store.phoenix.secured.QueryServerEnvironment.LOGIN_USER;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class HttpParamImpersonationQueryServerIT {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HttpParamImpersonationQueryServerIT.class);
+    public static QueryServerEnvironment environment;
+
+    private static final List<TableName> SYSTEM_TABLE_NAMES = Arrays.asList(
+        PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_FUNCTION_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_SCHEMA_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_STATS_HBASE_TABLE_NAME);
+
+    public static synchronized void startQueryServerEnvironment() throws Exception {
+        //Clean up previous environment if any (Junit 4.13 @BeforeParam / @AfterParam would be an alternative)
+        if(environment != null) {
+            stopEnvironment();
+        }
+
+        final Configuration conf = new Configuration();
+        conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
+        conf.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
+        conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName(), TokenProvider.class.getName());
+
+        // Set the proxyuser settings,
+        // so that the user who is running the Drillbits/MiniDfs can impersonate user1 and user2 (not user3)
+        conf.set(String.format("hadoop.proxyuser.%s.hosts", LOGIN_USER), "*");
+        conf.set(String.format("hadoop.proxyuser.%s.users", LOGIN_USER), "user1,user2");
+        conf.setBoolean(QueryServerProperties.QUERY_SERVER_WITH_REMOTEUSEREXTRACTOR_ATTRIB, true);
+        environment = new QueryServerEnvironment(conf, 3, false);
+    }
+
+    public static synchronized void stopEnvironment() throws Exception {
+        environment.stop();
+        environment = null;
+    }
+
+    static public String getUrlTemplate() {
+        String url = Driver.CONNECT_STRING_PREFIX + "url=%s://localhost:" + environment.getPqsPort() + "?"
+                + QueryServerOptions.DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM + "=%s;authentication=SPNEGO;serialization=PROTOBUF%s";
+        if (environment.getTls()) {
+            return String.format(url, "https", "%s", ";truststore=" + TlsUtil.getTrustStoreFile().getAbsolutePath()
+                + ";truststore_password="+TlsUtil.getTrustStorePassword());

Review comment:
       Done

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
##########
@@ -98,12 +98,7 @@ public RESULT call() throws Exception {
         currentThread.setName(proxyUgi.getUserName() + ":task-delegate-thread");
         final RESULT result;
         try {
-          result = proxyUgi.doAs(new PrivilegedExceptionAction<RESULT>() {
-            @Override
-            public RESULT run() throws Exception {
-              return callable.call();

Review comment:
       Lambda usage is preferable than Anonymus classes for >jdk8. And here is very simple function, so it is fine to use the new style.
   According to the breakpoints we can replace method reference with lambda function, it allows to set specific breakpoints:
   ![Screenshot from 2022-01-18 20-18-04](https://user-images.githubusercontent.com/11904420/149996184-5177e076-fe41-4011-9e59-e4b9d396b0d5.png)
   
   

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixBaseTest.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import ch.qos.logback.classic.Level;
+import com.sun.security.auth.module.Krb5LoginModule;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
+import org.apache.drill.exec.rpc.security.kerberos.KerberosFactory;
+import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
+import org.apache.drill.exec.server.TestDrillbitResilience;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.phoenix.PhoenixStoragePluginConfig;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createSampleData;
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createSchema;
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createTables;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.environment;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.getUrlTemplate;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.grantUsersToGlobalPhoenixUserTables;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.grantUsersToPhoenixSystemTables;
+import static org.apache.drill.exec.store.phoenix.secured.SecuredPhoenixTestSuite.initPhoenixQueryServer;
+
+@Slf4j
+public abstract class SecuredPhoenixBaseTest extends ClusterTest {
+  protected static LogFixture logFixture;
+  private final static Level CURRENT_LOG_LEVEL = Level.INFO;
+
+  private final static AtomicInteger initCount = new AtomicInteger(0);
+
+  @BeforeAll
+  public static void setUpBeforeClass() throws Exception {
+    TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+    initPhoenixQueryServer();
+    bootSecuredDrillMiniCluster();
+    initializeDatabase();
+  }
+
+  private static void bootSecuredDrillMiniCluster() throws Exception {
+    logFixture = LogFixture.builder()
+      .toConsole()
+      .logger(QueryServerEnvironment.class, CURRENT_LOG_LEVEL)
+      .logger(SecuredPhoenixBaseTest.class, CURRENT_LOG_LEVEL)
+      .logger(KerberosFactory.class, CURRENT_LOG_LEVEL)
+      .logger(Krb5LoginModule.class, CURRENT_LOG_LEVEL)
+      .logger(QueryServer.class, CURRENT_LOG_LEVEL)
+      .logger(ServerAuthenticationHandler.class, CURRENT_LOG_LEVEL)
+      .build();
+
+    Map.Entry<String, File> user1 = environment.getUser(1);
+    Map.Entry<String, File> user2 = environment.getUser(2);
+    Map.Entry<String, File> user3 = environment.getUser(3);
+
+    dirTestWatcher.start(TestDrillbitResilience.class); // until DirTestWatcher ClassRule is implemented for JUnit5
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+        .configProperty(ExecConstants.USER_AUTHENTICATION_ENABLED, true)
+        .configProperty(ExecConstants.USER_AUTHENTICATOR_IMPL, UserAuthenticatorTestImpl.TYPE)
+        .configNonStringProperty(ExecConstants.AUTHENTICATION_MECHANISMS, Lists.newArrayList("kerberos"))
+        .configProperty(ExecConstants.IMPERSONATION_ENABLED, true)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_ENABLED, true)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_MECHANISM, "kerberos")
+        .configProperty(ExecConstants.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting())
+        .configProperty(ExecConstants.SERVICE_KEYTAB_LOCATION, environment.getServiceKeytab().getAbsolutePath())
+        .configClientProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting())
+        .configClientProperty(DrillProperties.USER, user1.getKey())
+        .configClientProperty(DrillProperties.KEYTAB, user1.getValue().getAbsolutePath());
+    startCluster(builder);
+    Properties user2ClientProperties = new Properties();
+    user2ClientProperties.setProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting());
+    user2ClientProperties.setProperty(DrillProperties.USER, user2.getKey());
+    user2ClientProperties.setProperty(DrillProperties.KEYTAB, user2.getValue().getAbsolutePath());
+    cluster.addClientFixture(user2ClientProperties);
+    Properties user3ClientProperties = new Properties();
+    user3ClientProperties.setProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting());
+    user3ClientProperties.setProperty(DrillProperties.USER, user3.getKey());
+    user3ClientProperties.setProperty(DrillProperties.KEYTAB, user3.getValue().getAbsolutePath());
+    cluster.addClientFixture(user3ClientProperties);
+
+    Map<String, Object> phoenixProps = new HashMap<>();
+    phoenixProps.put("phoenix.query.timeoutMs", 90000);
+    phoenixProps.put("phoenix.query.keepAliveMs", "30000");
+    phoenixProps.put("phoenix.queryserver.withRemoteUserExtractor", true);
+    StoragePluginRegistry registry = cluster.drillbit().getContext().getStorage();
+    final String doAsUrl = String.format(getUrlTemplate(), "$user");
+    logger.debug("Phoenix Query Server URL: {}", environment.getPqsUrl());
+    PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, 0, null, null,
+      doAsUrl, null, phoenixProps);
+    config.setEnabled(true);
+    registry.put(PhoenixStoragePluginConfig.NAME + "123", config);
+  }
+
+
+  /**
+   * Initialize HBase via Phoenix
+   */
+  private static void initializeDatabase() throws Exception {
+    dirTestWatcher.copyResourceToRoot(Paths.get(""));
+    if (initCount.incrementAndGet() == 1) {
+      final Map.Entry<String, File> user1 = environment.getUser(1);
+      final Map.Entry<String, File> user2 = environment.getUser(2);
+      // Build the JDBC URL by hand with the doAs
+      final UserGroupInformation serviceUgi = ImpersonationUtil.getProcessUserUGI();
+      serviceUgi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        logger.debug("Phoenix conn url: {}", environment.getPqsUrl());
+        createSchema(environment.getPqsUrl());
+        createTables(environment.getPqsUrl());
+        createSampleData(environment.getPqsUrl());
+        grantUsersToPhoenixSystemTables(Arrays.asList(user1.getKey(), user2.getKey()));
+        grantUsersToGlobalPhoenixUserTables(Arrays.asList(user1.getKey()));
+        return null;
+      });
+    }
+  }
+
+  protected interface TestWrapper {
+    void apply() throws Exception;
+  }
+
+  public void runForThreeClients(SecuredPhoenixSQLTest.TestWrapper wrapper) throws Exception {
+    runForThreeClients(wrapper, UserRemoteException.class, RuntimeException.class);
+  }
+
+  /**
+   * @param wrapper actual test case execution
+   * @param user2ExpectedException the expected Exception for user2, which can be impersonated, but hasn't permissions to the tables
+   * @param user3ExpectedException the expected Exception for user3, isn't impersonated
+   */
+  public void runForThreeClients(SecuredPhoenixSQLTest.TestWrapper wrapper, Class user2ExpectedException, Class user3ExpectedException) throws Exception {

Review comment:
       Done

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/QueryServerEnvironment.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.security.PrivilegedAction;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.ThinClientUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Due to this bug https://bugzilla.redhat.com/show_bug.cgi?id=668830 We need to use
+ * `localhost.localdomain` as host name when running these tests on Jenkins (Centos) but for Mac OS
+ * it should be `localhost` to pass. The reason is kerberos principals in this tests are looked up
+ * from /etc/hosts and a reverse DNS lookup of 127.0.0.1 is resolved to `localhost.localdomain`
+ * rather than `localhost` on Centos. KDC sees `localhost` != `localhost.localdomain` and as the
+ * result test fails with authentication error. It's also important to note these principals are
+ * shared between HDFs and HBase in this mini HBase cluster. Some more reading
+ * https://access.redhat.com/solutions/57330
+ */
+public class QueryServerEnvironment {
+  private static final Logger LOG = LoggerFactory.getLogger(QueryServerEnvironment.class);
+
+  private final File TEMP_DIR = new File(getTempDir());
+  private final File KEYTAB_DIR = new File(TEMP_DIR, "keytabs");
+  private final List<File> USER_KEYTAB_FILES = new ArrayList<>();
+
+  private static final String LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  static final String LOGIN_USER;
+
+  static {
+    try {
+       System.setProperty("sun.security.krb5.debug", "true");
+      LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME = InetAddress.getByName("127.0.0.1").getCanonicalHostName();
+      String userName = System.getProperty("user.name");
+      LOGIN_USER = userName != null ? userName : "securecluster";
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static final String SPNEGO_PRINCIPAL = "HTTP/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private static final String PQS_PRINCIPAL = "phoenixqs/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private static final String SERVICE_PRINCIPAL = LOGIN_USER + "/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private File KEYTAB;
+
+  private MiniKdc KDC;
+  private HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private LocalHBaseCluster HBASE_CLUSTER;
+  private int NUM_CREATED_USERS;
+
+  private ExecutorService PQS_EXECUTOR;
+  private QueryServer PQS;
+  private int PQS_PORT;
+  private String PQS_URL;
+
+  private boolean tls;
+
+  private static String getTempDir() {
+    StringBuilder sb = new StringBuilder(32);
+    sb.append(System.getProperty("user.dir")).append(File.separator);
+    sb.append("target").append(File.separator);
+    sb.append(QueryServerEnvironment.class.getSimpleName());
+    sb.append("-").append(UUID.randomUUID());
+    return sb.toString();
+  }
+
+  public int getPqsPort() {
+    return PQS_PORT;
+  }
+
+  public String getPqsUrl() {
+    return PQS_URL;
+  }
+
+  public boolean getTls() {
+    return tls;
+  }
+
+  public HBaseTestingUtility getUtil() {
+    return UTIL;
+  }
+
+  public String getServicePrincipal() {
+    return SERVICE_PRINCIPAL;
+  }
+
+  public File getServiceKeytab() {
+    return KEYTAB;
+  }
+
+  private static void updateDefaultRealm() throws Exception {
+    // (at least) one other phoenix test triggers the caching of this field before the KDC is up
+    // which causes principal parsing to fail.
+    Field f = KerberosName.class.getDeclaredField("defaultRealm");
+    f.setAccessible(true);
+    // Default realm for MiniKDC
+    f.set(null, "EXAMPLE.COM");
+  }
+
+  private void createUsers(int numUsers) throws Exception {
+    assertNotNull("KDC is null, was setup method called?", KDC);
+    NUM_CREATED_USERS = numUsers;
+    for (int i = 1; i <= numUsers; i++) {
+      String principal = "user" + i;
+      File keytabFile = new File(KEYTAB_DIR, principal + ".keytab");
+      KDC.createPrincipal(keytabFile, principal);
+      USER_KEYTAB_FILES.add(keytabFile);
+    }
+  }
+
+  public Map.Entry<String, File> getUser(int offset) {
+    if (!(offset > 0 && offset <= NUM_CREATED_USERS)) {
+      throw new IllegalArgumentException();
+    }
+    return new AbstractMap.SimpleImmutableEntry<String, File>("user" + offset, USER_KEYTAB_FILES.get(offset - 1));
+  }
+
+  /**
+   * Setup the security configuration for hdfs.
+   */
+  private void setHdfsSecuredConfiguration(Configuration conf) throws Exception {
+    // Set principal+keytab configuration for HDFS
+    conf.set(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
+      SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY,
+      SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
+      SPNEGO_PRINCIPAL + "@" + KDC.getRealm());
+    // Enable token access for HDFS blocks
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    // Only use HTTPS (required because we aren't using "secure" ports)
+    conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+    // Bind on localhost for spnego to have a chance at working
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+
+    // Generate SSL certs
+    File keystoresDir = new File(UTIL.getDataTestDir("keystore").toUri().getPath());
+    keystoresDir.mkdirs();
+    String sslConfDir = TlsUtil.getClasspathDir(QueryServerEnvironment.class);
+    TlsUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, conf, false);
+
+    // Magic flag to tell hdfs to not fail on using ports above 1024
+    conf.setBoolean("ignore.secure.ports.for.testing", true);
+  }
+
+  private static void ensureIsEmptyDirectory(File f) throws IOException {
+    if (f.exists()) {
+      if (f.isDirectory()) {
+        FileUtils.deleteDirectory(f);
+      } else {
+        assertTrue("Failed to delete keytab directory", f.delete());
+      }
+    }
+    assertTrue("Failed to create keytab directory", f.mkdirs());
+  }
+
+  /**
+   * Setup and start kerberosed, hbase
+   * @throws Exception
+   */
+  public QueryServerEnvironment(final Configuration confIn, int numberOfUsers, boolean tls)

Review comment:
       This is almost original `org.apache.phoenix.end2end.QueryServerEnvironment`. 
   To keep easy updating possibility, it is better do not moving methods 

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
##########
@@ -71,58 +74,64 @@ public PhoenixBatchReader(PhoenixSubScan subScan) {
 
   @Override
   public boolean open(SchemaNegotiator negotiator) {
-    try {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+    return ugi.doAs((PrivilegedAction<Boolean>) () -> {
       errorContext = negotiator.parentErrorContext();
-      conn = subScan.getPlugin().getDataSource().getConnection();
-      pstmt = conn.prepareStatement(subScan.getSql());
-      results = pstmt.executeQuery();
-      meta = pstmt.getMetaData();
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to execute the phoenix sql query. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    try {
-      negotiator.tableSchema(defineMetadata(), true);
-      reader = new PhoenixReader(negotiator.build(), columns, results);
-      bindColumns(reader.getStorage());
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to get type of columns from metadata. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    watch = Stopwatch.createStarted();
-    return true;
+      try {
+        pstmt =
+          subScan.getPlugin().getDataSource(negotiator.userName()).getConnection().prepareStatement(subScan.getSql());
+        results = pstmt.executeQuery();
+        meta = pstmt.getMetaData();
+      } catch (SQLException e) {
+        throw UserException
+          .dataReadError(e)
+          .message("Failed to execute the phoenix sql query. " + e.getMessage())
+          .addContext(negotiator.parentErrorContext())
+          .build(logger);
+      }
+      try {
+        negotiator.tableSchema(defineMetadata(), true);
+        reader = new PhoenixReader(negotiator.build(), columns, results);
+        bindColumns(reader.getStorage());
+      } catch (SQLException e) {
+        throw UserException
+          .dataReadError(e)
+          .message("Failed to get type of columns from metadata. " + e.getMessage())
+          .addContext(errorContext)
+          .build(logger);
+      }
+      watch = Stopwatch.createStarted();
+      return true;
+    });
   }
 
   @Override
   public boolean next() {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
     try {
-      while(!reader.getStorage().isFull()) {
-        if (!reader.processRow()) { // return true if one row is processed.
-          watch.stop();
-          logger.debug("Phoenix fetch total record numbers : {}", reader.getRowCount());
-          return false; // the EOF is reached.
+      return ugi.doAs((PrivilegedExceptionAction<Boolean>) () -> {

Review comment:
       Looks like  we need using this only with impersonation enabled. 
   Changed

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
##########
@@ -39,44 +40,49 @@
  * is not always left in a healthy state by the previous user. It is better to
  * create new Phoenix Connections to ensure that you avoid any potential issues.
  */
+@Slf4j
 public class PhoenixDataSource implements DataSource {
 
   private static final String DEFAULT_URL_HEADER = "jdbc:phoenix:thin:url=http://";
   private static final String DEFAULT_SERIALIZATION = "serialization=PROTOBUF";
+  private static final String IMPERSONATED_USER_VARIABLE = "$user";
+  private static final String DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM = "doAs";
 
-  private String url;
+  private final String url;
   private Map<String, Object> connectionProperties;
-  private boolean isFatClient; // Is a fat client
-
-  public PhoenixDataSource(String url) {
-    Preconditions.checkNotNull(url);
-    this.url = url;
-  }
-
-  public PhoenixDataSource(String host, int port) {
-    Preconditions.checkNotNull(host);
-    Preconditions.checkArgument(port > 0, "Please set the correct port.");
-    this.url = new StringBuilder()
-        .append(DEFAULT_URL_HEADER)
-        .append(host)
-        .append(":")
-        .append(port)
-        .append(";")
-        .append(DEFAULT_SERIALIZATION)
-        .toString();
-  }
-
-  public PhoenixDataSource(String url, Map<String, Object> connectionProperties) {
-    this(url);
+  private boolean isFatClient;
+  private String user;
+
+  public PhoenixDataSource(String url,
+                           String userName,
+                           Map<String, Object> connectionProperties,
+                           boolean impersonationEnabled) {
+    Preconditions.checkNotNull(url, userName);
     Preconditions.checkNotNull(connectionProperties);
     connectionProperties.forEach((k, v)
         -> Preconditions.checkArgument(v != null, String.format("does not accept null values : %s", k)));
+    this.url = impersonationEnabled ? doAsUserUrl(url, userName) : url;
+    this.user = userName;
     this.connectionProperties = connectionProperties;
   }
 
-  public PhoenixDataSource(String host, int port, Map<String, Object> connectionProperties) {
-    this(host, port);
-    Preconditions.checkNotNull(connectionProperties);
+  public PhoenixDataSource(String host,
+                           int port,
+                           String userName,
+                           Map<String, Object> connectionProperties,
+                           boolean impersonationEnabled) {
+    Preconditions.checkNotNull(host, userName);
+    Preconditions.checkArgument(port > 0, "Please set the correct port.");
+    this.url = new StringBuilder()
+      .append(DEFAULT_URL_HEADER)
+      .append(host)
+      .append(":")
+      .append(port)
+      .append(impersonationEnabled ? "?doAs=" + userName : "")
+      .append(";")
+      .append(DEFAULT_SERIALIZATION)
+      .toString();
+    this.user = userName;
     connectionProperties.forEach((k, v)

Review comment:
       agreed, thanks

##########
File path: contrib/storage-phoenix/README.md
##########
@@ -150,4 +155,13 @@ apache drill (phoenix123.v1)> select n_name, n_regionkey from nation limit 3 off
 | JAPAN  | 2           |
 +--------+-------------+
 3 rows selected (0.77 seconds)
-```
\ No newline at end of file
+```
+### Impersonation

Review comment:
       Agreed. Done

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
##########
@@ -21,13 +21,14 @@
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
 import java.util.Map;
 import java.util.Properties;
 import java.util.logging.Logger;
 
 import javax.sql.DataSource;
 
+import lombok.extern.slf4j.Slf4j;

Review comment:
       It sounds like an objection to using lombok :)
   Lombok makes programming easier.
   But need to say I have the same feelings. It is like a questionable Guava. It helps programming, but that lib also brings a lot of difficulties in using different Guava versions in different libs. I would rather not start using Guava than shading and patching it.
   From other side Drill with old jdk language level and without modern libs starts to be boring, old and legacy for development.
   Here is I am not introducing a new thing. So let's leave it here and keep such discussions with all the Drill dev community, if needed

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -18,75 +18,100 @@
 package org.apache.drill.exec.store.phoenix;
 
 import java.io.IOException;
-import java.sql.Connection;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.sql.DataSource;
 
 import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.drill.exec.util.ImpersonationUtil.getProcessUserName;
 
 public class PhoenixSchemaFactory extends AbstractSchemaFactory {
 
   private final PhoenixStoragePlugin plugin;
   private final Map<String, PhoenixSchema> schemaMap;
   private PhoenixSchema rootSchema;
+  private final boolean isDrillImpersonationEnabled;
 
   public PhoenixSchemaFactory(PhoenixStoragePlugin plugin) {
     super(plugin.getName());
     this.plugin = plugin;
-    this.schemaMap = Maps.newHashMap();
+    this.schemaMap = new HashMap<>();
+    isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    rootSchema = new PhoenixSchema(plugin, Collections.emptyList(), plugin.getName());
-    locateSchemas();
+    try {
+      rootSchema = new PhoenixSchema(schemaConfig, plugin, Collections.emptyList(), plugin.getName());
+      String schemaUser = schemaConfig.getUserName();
+      locateSchemas(schemaConfig, rootSchema.getUser(schemaUser, getProcessUserName()));
+    } catch (SQLException e) {
+      throw new IOException(e);

Review comment:
       done for `registerSchemas`
   a bit changed for `locateSchemas` due to checking impersonation enabled.

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
##########
@@ -169,12 +174,23 @@ private void useDriverClass() throws SQLException {
    */
   private Properties useConfProperties() {
     Properties props = new Properties();
-    props.put("phoenix.trace.frequency", "never");
-    props.put("phoenix.query.timeoutMs", 30000);
-    props.put("phoenix.query.keepAliveMs", 120000);
     if (getConnectionProperties() != null) {
       props.putAll(getConnectionProperties());
     }
+    props.putIfAbsent("phoenix.trace.frequency", "never");
+    props.putIfAbsent("phoenix.query.timeoutMs", 30000);
+    props.putIfAbsent("phoenix.query.keepAliveMs", 120000);
     return props;
   }
+
+  private String doAsUserUrl(String url, String userName) {
+    if (url.contains(DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM)) {
+      return url.replace(IMPERSONATED_USER_VARIABLE, userName);
+    } else {
+      throw UserException
+        .connectionError()
+        .message("Invalid PQS URL. Please add `doAs=$user` parameter value in case Drill Impersonation enabled")

Review comment:
       Done

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -18,75 +18,100 @@
 package org.apache.drill.exec.store.phoenix;
 
 import java.io.IOException;
-import java.sql.Connection;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.sql.DataSource;
 
 import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.drill.exec.util.ImpersonationUtil.getProcessUserName;
 
 public class PhoenixSchemaFactory extends AbstractSchemaFactory {
 
   private final PhoenixStoragePlugin plugin;
   private final Map<String, PhoenixSchema> schemaMap;
   private PhoenixSchema rootSchema;
+  private final boolean isDrillImpersonationEnabled;
 
   public PhoenixSchemaFactory(PhoenixStoragePlugin plugin) {
     super(plugin.getName());
     this.plugin = plugin;
-    this.schemaMap = Maps.newHashMap();
+    this.schemaMap = new HashMap<>();
+    isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    rootSchema = new PhoenixSchema(plugin, Collections.emptyList(), plugin.getName());
-    locateSchemas();
+    try {
+      rootSchema = new PhoenixSchema(schemaConfig, plugin, Collections.emptyList(), plugin.getName());
+      String schemaUser = schemaConfig.getUserName();
+      locateSchemas(schemaConfig, rootSchema.getUser(schemaUser, getProcessUserName()));
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
     parent.add(getName(), rootSchema); // resolve the top-level schema.
     for (String schemaName : rootSchema.getSubSchemaNames()) {
       PhoenixSchema schema = (PhoenixSchema) rootSchema.getSubSchema(schemaName);
       parent.add(schemaName, schema); // provide all available schemas for calcite.
     }
   }
 
-  private void locateSchemas() {
-    DataSource ds = plugin.getDataSource();
-    try (Connection conn = ds.getConnection();
-          ResultSet rs = ds.getConnection().getMetaData().getSchemas()) {
-      while (rs.next()) {
-        final String schemaName = rs.getString(1); // lookup the schema (or called database).
-        PhoenixSchema schema = new PhoenixSchema(plugin, Arrays.asList(getName()), schemaName);
-        schemaMap.put(schemaName, schema);
-      }
-      rootSchema.addSchemas(schemaMap);
-    } catch (SQLException e) {
-      throw new DrillRuntimeException(e.getMessage(), e);
+  private void locateSchemas(SchemaConfig schemaConfig, String userName) throws SQLException {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+    try {
+      ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        try (ResultSet rs = plugin.getDataSource(userName).getConnection().getMetaData().getSchemas()) {
+          while (rs.next()) {
+            final String schemaName = rs.getString(1); // lookup the schema (or called database).
+            PhoenixSchema schema = new PhoenixSchema(schemaConfig, plugin, Arrays.asList(getName()), schemaName);
+            schemaMap.put(schemaName, schema);
+          }
+          rootSchema.addSchemas(schemaMap);
+        }
+        return null;
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new SQLException(e);
     }
   }
 
-  protected static class PhoenixSchema extends AbstractSchema {
+  @Override
+  public boolean needToImpersonateReadingData() {
+    return isDrillImpersonationEnabled;
+  }
 
+  class PhoenixSchema extends AbstractSchema {
+    private final SchemaConfig schemaConfig;

Review comment:
       right. Thanks

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
##########
@@ -62,8 +77,12 @@ public StoragePluginConfig getConfig() {
     return config;
   }
 
-  public DataSource getDataSource() {
-    return dataSource;
+  public PhoenixDataSource getDataSource(String userName) throws SQLException {

Review comment:
       Agreed.
   I moved it along with `getDialect` and `getConvention` methods

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixBaseTest.java
##########
@@ -38,51 +38,63 @@
 
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
 import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.LoggerFactory;
 
 import com.univocity.parsers.csv.CsvParser;
 import com.univocity.parsers.csv.CsvParserSettings;
 
-public class PhoenixBaseTest extends ClusterTest {
+public abstract class PhoenixBaseTest extends ClusterTest {

Review comment:
       Agreed. Thanks

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/HttpParamImpersonationQueryServerIT.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.queryserver.QueryServerOptions;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.client.Driver;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.drill.exec.store.phoenix.secured.QueryServerEnvironment.LOGIN_USER;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class HttpParamImpersonationQueryServerIT {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HttpParamImpersonationQueryServerIT.class);

Review comment:
       removed. Thanks

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/QueryServerEnvironment.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.security.PrivilegedAction;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.ThinClientUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Due to this bug https://bugzilla.redhat.com/show_bug.cgi?id=668830 We need to use
+ * `localhost.localdomain` as host name when running these tests on Jenkins (Centos) but for Mac OS
+ * it should be `localhost` to pass. The reason is kerberos principals in this tests are looked up
+ * from /etc/hosts and a reverse DNS lookup of 127.0.0.1 is resolved to `localhost.localdomain`
+ * rather than `localhost` on Centos. KDC sees `localhost` != `localhost.localdomain` and as the
+ * result test fails with authentication error. It's also important to note these principals are
+ * shared between HDFs and HBase in this mini HBase cluster. Some more reading
+ * https://access.redhat.com/solutions/57330
+ */
+public class QueryServerEnvironment {
+  private static final Logger LOG = LoggerFactory.getLogger(QueryServerEnvironment.class);
+
+  private final File TEMP_DIR = new File(getTempDir());
+  private final File KEYTAB_DIR = new File(TEMP_DIR, "keytabs");
+  private final List<File> USER_KEYTAB_FILES = new ArrayList<>();
+
+  private static final String LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  static final String LOGIN_USER;
+
+  static {
+    try {
+       System.setProperty("sun.security.krb5.debug", "true");

Review comment:
       comment it to use for debugging purposes only

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixBaseTest.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import ch.qos.logback.classic.Level;
+import com.sun.security.auth.module.Krb5LoginModule;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
+import org.apache.drill.exec.rpc.security.kerberos.KerberosFactory;
+import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
+import org.apache.drill.exec.server.TestDrillbitResilience;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.phoenix.PhoenixStoragePluginConfig;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createSampleData;
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createSchema;
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createTables;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.environment;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.getUrlTemplate;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.grantUsersToGlobalPhoenixUserTables;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.grantUsersToPhoenixSystemTables;
+import static org.apache.drill.exec.store.phoenix.secured.SecuredPhoenixTestSuite.initPhoenixQueryServer;
+
+@Slf4j
+public abstract class SecuredPhoenixBaseTest extends ClusterTest {
+  protected static LogFixture logFixture;
+  private final static Level CURRENT_LOG_LEVEL = Level.INFO;
+
+  private final static AtomicInteger initCount = new AtomicInteger(0);
+
+  @BeforeAll
+  public static void setUpBeforeClass() throws Exception {
+    TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+    initPhoenixQueryServer();
+    bootSecuredDrillMiniCluster();
+    initializeDatabase();
+  }
+
+  private static void bootSecuredDrillMiniCluster() throws Exception {
+    logFixture = LogFixture.builder()
+      .toConsole()
+      .logger(QueryServerEnvironment.class, CURRENT_LOG_LEVEL)
+      .logger(SecuredPhoenixBaseTest.class, CURRENT_LOG_LEVEL)
+      .logger(KerberosFactory.class, CURRENT_LOG_LEVEL)
+      .logger(Krb5LoginModule.class, CURRENT_LOG_LEVEL)
+      .logger(QueryServer.class, CURRENT_LOG_LEVEL)
+      .logger(ServerAuthenticationHandler.class, CURRENT_LOG_LEVEL)
+      .build();
+
+    Map.Entry<String, File> user1 = environment.getUser(1);
+    Map.Entry<String, File> user2 = environment.getUser(2);
+    Map.Entry<String, File> user3 = environment.getUser(3);
+
+    dirTestWatcher.start(TestDrillbitResilience.class); // until DirTestWatcher ClassRule is implemented for JUnit5
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+        .configProperty(ExecConstants.USER_AUTHENTICATION_ENABLED, true)
+        .configProperty(ExecConstants.USER_AUTHENTICATOR_IMPL, UserAuthenticatorTestImpl.TYPE)
+        .configNonStringProperty(ExecConstants.AUTHENTICATION_MECHANISMS, Lists.newArrayList("kerberos"))
+        .configProperty(ExecConstants.IMPERSONATION_ENABLED, true)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_ENABLED, true)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_MECHANISM, "kerberos")
+        .configProperty(ExecConstants.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting())
+        .configProperty(ExecConstants.SERVICE_KEYTAB_LOCATION, environment.getServiceKeytab().getAbsolutePath())
+        .configClientProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting())
+        .configClientProperty(DrillProperties.USER, user1.getKey())
+        .configClientProperty(DrillProperties.KEYTAB, user1.getValue().getAbsolutePath());
+    startCluster(builder);
+    Properties user2ClientProperties = new Properties();
+    user2ClientProperties.setProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting());
+    user2ClientProperties.setProperty(DrillProperties.USER, user2.getKey());
+    user2ClientProperties.setProperty(DrillProperties.KEYTAB, user2.getValue().getAbsolutePath());
+    cluster.addClientFixture(user2ClientProperties);
+    Properties user3ClientProperties = new Properties();
+    user3ClientProperties.setProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting());
+    user3ClientProperties.setProperty(DrillProperties.USER, user3.getKey());
+    user3ClientProperties.setProperty(DrillProperties.KEYTAB, user3.getValue().getAbsolutePath());
+    cluster.addClientFixture(user3ClientProperties);
+
+    Map<String, Object> phoenixProps = new HashMap<>();
+    phoenixProps.put("phoenix.query.timeoutMs", 90000);
+    phoenixProps.put("phoenix.query.keepAliveMs", "30000");
+    phoenixProps.put("phoenix.queryserver.withRemoteUserExtractor", true);
+    StoragePluginRegistry registry = cluster.drillbit().getContext().getStorage();
+    final String doAsUrl = String.format(getUrlTemplate(), "$user");
+    logger.debug("Phoenix Query Server URL: {}", environment.getPqsUrl());
+    PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, 0, null, null,
+      doAsUrl, null, phoenixProps);
+    config.setEnabled(true);
+    registry.put(PhoenixStoragePluginConfig.NAME + "123", config);
+  }
+
+
+  /**
+   * Initialize HBase via Phoenix
+   */
+  private static void initializeDatabase() throws Exception {
+    dirTestWatcher.copyResourceToRoot(Paths.get(""));
+    if (initCount.incrementAndGet() == 1) {
+      final Map.Entry<String, File> user1 = environment.getUser(1);
+      final Map.Entry<String, File> user2 = environment.getUser(2);
+      // Build the JDBC URL by hand with the doAs
+      final UserGroupInformation serviceUgi = ImpersonationUtil.getProcessUserUGI();
+      serviceUgi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        logger.debug("Phoenix conn url: {}", environment.getPqsUrl());

Review comment:
       Done

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/HttpParamImpersonationQueryServerIT.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.queryserver.QueryServerOptions;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.client.Driver;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.drill.exec.store.phoenix.secured.QueryServerEnvironment.LOGIN_USER;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class HttpParamImpersonationQueryServerIT {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HttpParamImpersonationQueryServerIT.class);
+    public static QueryServerEnvironment environment;
+
+    private static final List<TableName> SYSTEM_TABLE_NAMES = Arrays.asList(
+        PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_FUNCTION_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_SCHEMA_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_STATS_HBASE_TABLE_NAME);
+
+    public static synchronized void startQueryServerEnvironment() throws Exception {
+        //Clean up previous environment if any (Junit 4.13 @BeforeParam / @AfterParam would be an alternative)

Review comment:
       I wanted to keep the original `org.apache.phoenix.end2end.HttpParamImpersonationQueryServerIT` class from `phoenix-queryserver` project. But looks like, we need to lead it to Drill's style.

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -18,75 +18,100 @@
 package org.apache.drill.exec.store.phoenix;
 
 import java.io.IOException;
-import java.sql.Connection;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.sql.DataSource;
 
 import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.drill.exec.util.ImpersonationUtil.getProcessUserName;
 
 public class PhoenixSchemaFactory extends AbstractSchemaFactory {
 
   private final PhoenixStoragePlugin plugin;
   private final Map<String, PhoenixSchema> schemaMap;
   private PhoenixSchema rootSchema;
+  private final boolean isDrillImpersonationEnabled;
 
   public PhoenixSchemaFactory(PhoenixStoragePlugin plugin) {
     super(plugin.getName());
     this.plugin = plugin;
-    this.schemaMap = Maps.newHashMap();
+    this.schemaMap = new HashMap<>();
+    isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    rootSchema = new PhoenixSchema(plugin, Collections.emptyList(), plugin.getName());
-    locateSchemas();
+    try {
+      rootSchema = new PhoenixSchema(schemaConfig, plugin, Collections.emptyList(), plugin.getName());
+      String schemaUser = schemaConfig.getUserName();
+      locateSchemas(schemaConfig, rootSchema.getUser(schemaUser, getProcessUserName()));

Review comment:
       Changed

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -101,21 +126,26 @@ public Schema getSubSchema(String name) {
 
     @Override
     public Table getTable(String name) {
-      Table table = jdbcSchema.getTable(StringUtils.upperCase(name));
-      return table;
+      final UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+      return ugi.doAs((PrivilegedAction<Table>) () -> jdbcSchema.getTable(StringUtils.upperCase(name)));
     }
 
     @Override
     public Set<String> getTableNames() {
-      Set<String> tables = jdbcSchema.getTableNames();
-      return tables;
+      final UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+      return ugi.doAs((PrivilegedAction<Set<String>>) jdbcSchema::getTableNames);
     }
 
     @Override
     public String getTypeName() {
       return PhoenixStoragePluginConfig.NAME;
     }
 
+    @Override
+    public String getUser(String impersonated, String notImpersonated) {

Review comment:
       Agreed.
   Also I moved `addSchemas` in the end after the overrided methods.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java
##########
@@ -66,7 +69,11 @@ protected void validateFrom(SqlNode node, RelDataType targetRowType, SqlValidato
           }
       }
     }
-    super.validateFrom(node, targetRowType, scope);
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+      ugi.doAs((PrivilegedAction<Void>) () -> {

Review comment:
       fixed. Thanks

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixDataTypeTest.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.U_U_I_D;
+import static org.apache.drill.test.rowSet.RowSetUtilities.boolArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.byteArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.doubleArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.longArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.shortArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+
+@Tag(SlowTest.TAG)
+@Tag(RowSetTests.TAG)
+public class SecuredPhoenixDataTypeTest extends SecuredPhoenixBaseTest {
+
+  @Test
+  public void testDataType() throws Exception {
+    runForThreeClients(this::doTestDataType);
+  }
+
+  public void doTestDataType() throws Exception {

Review comment:
       Missed this method. Done

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/TlsUtil.java
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.bouncycastle.x509.X509V1CertificateGenerator;

Review comment:
       The `TlsUtil` is from `phoenix-queryserver-it` testing module similar to `QueryServerThread`, which you used.
   I don't think we need modifying it until any critical vulnerabilities are introduced.
   The other approach is to use dependency for `phoenix-queryserver-it`. Possibly this way will be cleaner

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
##########
@@ -181,22 +184,25 @@ public BasicRecordCollector(FilterEvaluator filterEvaluator, OptionManager optio
     @Override
     public List<Records.Column> columns(String schemaPath, SchemaPlus schema) {
       AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
-      List<Records.Column> records = new ArrayList<>();
-      for (Pair<String, ? extends Table> tableNameToTable : drillSchema.getTablesByNames(schema.getTableNames())) {
-        String tableName = tableNameToTable.getKey();
-        Table table = tableNameToTable.getValue();
-        Schema.TableType tableType = table.getJdbcTableType();
-
-        if (filterEvaluator.shouldVisitTable(schemaPath, tableName, tableType)) {
-          RelDataType tableRow = table.getRowType(new JavaTypeFactoryImpl(DRILL_REL_DATATYPE_SYSTEM));
-          for (RelDataTypeField field : tableRow.getFieldList()) {
-            if (filterEvaluator.shouldVisitColumn(schemaPath, tableName, field.getName())) {
-              records.add(new Records.Column(IS_CATALOG_NAME, schemaPath, tableName, field));
+      UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+      return ugi.doAs((PrivilegedAction<List<Records.Column>>) () -> {

Review comment:
       Added check impersonation enabled




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org