You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2022/01/05 04:04:57 UTC

[GitHub] [drill] vdiravka opened a new pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

vdiravka opened a new pull request #2422:
URL: https://github.com/apache/drill/pull/2422


   # [DRILL-8061](https://issues.apache.org/jira/browse/DRILL-8061): Add Impersonation Support for Phoenix
   
   ## Description
   
   Drill Hadoop User Impersonation for Apache Phoenix
   
   ## Documentation
   
   ### Impersonation
   Configurations :
   1. Enable [Drill User Impersonation](https://drill.apache.org/docs/configuring-user-impersonation/)
   2. Enable [PQS Impersonation](https://phoenix.apache.org/server.html#Impersonation)
   3. PQS URL:
      1. Provide `host` and `port` and Drill will generate the PQS URL with a doAs parameter of current session user
      2. Provide the `jdbcURL` with a `doAs` url param and `$user` placeholder as a value, for instance: 
         `jdbc:phoenix:thin:url=http://localhost:8765?doAs=$user`. In case Drill Impersonation is enabled, but `doAs=$user` 
         is missing the User Exception is thrown.
   
   ## Testing
   Added IT test cases for the kerberized `hadoop-minikdc` Phoenix + Drill cluster. 
   Tests run for 3 clients: 
   * the first has all permission
   * Impersonated, but don't have an permissions to access the tables
   * not impersonated by admin
   
   Draft until all test cases pass


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] Z0ltrix commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
Z0ltrix commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r787684404



##########
File path: contrib/storage-phoenix/README.md
##########
@@ -87,6 +79,16 @@ Tips :
 }
 ```
 
+### Impersonation
+Configurations :
+1. Enable [Drill User Impersonation](https://drill.apache.org/docs/configuring-user-impersonation/)
+2. Enable [PQS Impersonation](https://phoenix.apache.org/server.html#Impersonation)
+3. PQS URL:
+  1. Provide `host` and `port` and Drill will generate the PQS URL with a doAs parameter of current session user
+  2. Provide the `jdbcURL` with a `doAs` url param and `$user` placeholder as a value, for instance:

Review comment:
       wouldnt it be better, if drill would add the doAs=$user automatically if impersonation is enabled? 
   
   It is confusing, If you have to change the storage config AND drill-override.conf to enable impersonation without an error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong merged pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
luocooong merged pull request #2422:
URL: https://github.com/apache/drill/pull/2422


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
luocooong commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r793149883



##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/HttpParamImpersonationQueryServerIT.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.queryserver.QueryServerOptions;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.client.Driver;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.drill.exec.store.phoenix.secured.QueryServerEnvironment.LOGIN_USER;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class HttpParamImpersonationQueryServerIT {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HttpParamImpersonationQueryServerIT.class);
+    public static QueryServerEnvironment environment;
+
+    private static final List<TableName> SYSTEM_TABLE_NAMES = Arrays.asList(
+        PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_FUNCTION_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_SCHEMA_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_STATS_HBASE_TABLE_NAME);
+
+    public static synchronized void startQueryServerEnvironment() throws Exception {
+        //Clean up previous environment if any (Junit 4.13 @BeforeParam / @AfterParam would be an alternative)
+        if(environment != null) {
+            stopEnvironment();
+        }
+
+        final Configuration conf = new Configuration();
+        conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
+        conf.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
+        conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName(), TokenProvider.class.getName());
+
+        // Set the proxyuser settings,
+        // so that the user who is running the Drillbits/MiniDfs can impersonate user1 and user2 (not user3)
+        conf.set(String.format("hadoop.proxyuser.%s.hosts", LOGIN_USER), "*");
+        conf.set(String.format("hadoop.proxyuser.%s.users", LOGIN_USER), "user1,user2");
+        conf.setBoolean(QueryServerProperties.QUERY_SERVER_WITH_REMOTEUSEREXTRACTOR_ATTRIB, true);
+        environment = new QueryServerEnvironment(conf, 3, false);
+    }
+
+    public static synchronized void stopEnvironment() throws Exception {
+        environment.stop();
+        environment = null;
+    }
+
+    static public String getUrlTemplate() {
+        String url = Driver.CONNECT_STRING_PREFIX + "url=%s://localhost:" + environment.getPqsPort() + "?"
+                + QueryServerOptions.DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM + "=%s;authentication=SPNEGO;serialization=PROTOBUF%s";
+        if (environment.getTls()) {
+            return String.format(url, "https", "%s", ";truststore=" + TlsUtil.getTrustStoreFile().getAbsolutePath()
+                + ";truststore_password="+TlsUtil.getTrustStorePassword());

Review comment:
       this one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] vdiravka commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
vdiravka commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r790288210



##########
File path: contrib/storage-phoenix/README.md
##########
@@ -87,6 +79,16 @@ Tips :
 }
 ```
 
+### Impersonation
+Configurations :
+1. Enable [Drill User Impersonation](https://drill.apache.org/docs/configuring-user-impersonation/)
+2. Enable [PQS Impersonation](https://phoenix.apache.org/server.html#Impersonation)
+3. PQS URL:
+  1. Provide `host` and `port` and Drill will generate the PQS URL with a doAs parameter of current session user
+  2. Provide the `jdbcURL` with a `doAs` url param and `$user` placeholder as a value, for instance:

Review comment:
       Hi Christian,
   
   `doAs=$user` is adding automatically in case the first approach of configuring PQS is chosen - `host` and `port`. As Paul mentioned [earlier](https://github.com/apache/drill/pull/2332#discussion_r758116891) in the original PR #2332 for Phoenix plugin, the second approach is more advanced case and for more manual configuration. So I preferred to avoid automagic change of the final URL.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
luocooong commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r793150318



##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/QueryServerEnvironment.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.security.PrivilegedAction;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.ThinClientUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Due to this bug https://bugzilla.redhat.com/show_bug.cgi?id=668830 We need to use
+ * `localhost.localdomain` as host name when running these tests on Jenkins (Centos) but for Mac OS
+ * it should be `localhost` to pass. The reason is kerberos principals in this tests are looked up
+ * from /etc/hosts and a reverse DNS lookup of 127.0.0.1 is resolved to `localhost.localdomain`
+ * rather than `localhost` on Centos. KDC sees `localhost` != `localhost.localdomain` and as the
+ * result test fails with authentication error. It's also important to note these principals are
+ * shared between HDFs and HBase in this mini HBase cluster. Some more reading
+ * https://access.redhat.com/solutions/57330
+ */
+public class QueryServerEnvironment {
+  private static final Logger LOG = LoggerFactory.getLogger(QueryServerEnvironment.class);
+
+  private final File TEMP_DIR = new File(getTempDir());
+  private final File KEYTAB_DIR = new File(TEMP_DIR, "keytabs");
+  private final List<File> USER_KEYTAB_FILES = new ArrayList<>();
+
+  private static final String LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  static final String LOGIN_USER;
+
+  static {
+    try {
+       System.setProperty("sun.security.krb5.debug", "true");
+      LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME = InetAddress.getByName("127.0.0.1").getCanonicalHostName();
+      String userName = System.getProperty("user.name");
+      LOGIN_USER = userName != null ? userName : "securecluster";
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static final String SPNEGO_PRINCIPAL = "HTTP/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private static final String PQS_PRINCIPAL = "phoenixqs/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private static final String SERVICE_PRINCIPAL = LOGIN_USER + "/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private File KEYTAB;
+
+  private MiniKdc KDC;
+  private HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private LocalHBaseCluster HBASE_CLUSTER;
+  private int NUM_CREATED_USERS;
+
+  private ExecutorService PQS_EXECUTOR;
+  private QueryServer PQS;
+  private int PQS_PORT;
+  private String PQS_URL;
+
+  private boolean tls;
+
+  private static String getTempDir() {
+    StringBuilder sb = new StringBuilder(32);
+    sb.append(System.getProperty("user.dir")).append(File.separator);
+    sb.append("target").append(File.separator);
+    sb.append(QueryServerEnvironment.class.getSimpleName());
+    sb.append("-").append(UUID.randomUUID());
+    return sb.toString();
+  }
+
+  public int getPqsPort() {
+    return PQS_PORT;
+  }
+
+  public String getPqsUrl() {
+    return PQS_URL;
+  }
+
+  public boolean getTls() {
+    return tls;
+  }
+
+  public HBaseTestingUtility getUtil() {
+    return UTIL;
+  }
+
+  public String getServicePrincipal() {
+    return SERVICE_PRINCIPAL;
+  }
+
+  public File getServiceKeytab() {
+    return KEYTAB;
+  }
+
+  private static void updateDefaultRealm() throws Exception {
+    // (at least) one other phoenix test triggers the caching of this field before the KDC is up
+    // which causes principal parsing to fail.
+    Field f = KerberosName.class.getDeclaredField("defaultRealm");
+    f.setAccessible(true);
+    // Default realm for MiniKDC
+    f.set(null, "EXAMPLE.COM");
+  }
+
+  private void createUsers(int numUsers) throws Exception {
+    assertNotNull("KDC is null, was setup method called?", KDC);
+    NUM_CREATED_USERS = numUsers;
+    for (int i = 1; i <= numUsers; i++) {
+      String principal = "user" + i;
+      File keytabFile = new File(KEYTAB_DIR, principal + ".keytab");
+      KDC.createPrincipal(keytabFile, principal);
+      USER_KEYTAB_FILES.add(keytabFile);
+    }
+  }
+
+  public Map.Entry<String, File> getUser(int offset) {
+    if (!(offset > 0 && offset <= NUM_CREATED_USERS)) {
+      throw new IllegalArgumentException();
+    }
+    return new AbstractMap.SimpleImmutableEntry<String, File>("user" + offset, USER_KEYTAB_FILES.get(offset - 1));
+  }
+
+  /**
+   * Setup the security configuration for hdfs.
+   */
+  private void setHdfsSecuredConfiguration(Configuration conf) throws Exception {
+    // Set principal+keytab configuration for HDFS
+    conf.set(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
+      SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY,
+      SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
+      SPNEGO_PRINCIPAL + "@" + KDC.getRealm());
+    // Enable token access for HDFS blocks
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    // Only use HTTPS (required because we aren't using "secure" ports)
+    conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+    // Bind on localhost for spnego to have a chance at working
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+
+    // Generate SSL certs
+    File keystoresDir = new File(UTIL.getDataTestDir("keystore").toUri().getPath());
+    keystoresDir.mkdirs();
+    String sslConfDir = TlsUtil.getClasspathDir(QueryServerEnvironment.class);
+    TlsUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, conf, false);
+
+    // Magic flag to tell hdfs to not fail on using ports above 1024
+    conf.setBoolean("ignore.secure.ports.for.testing", true);
+  }
+
+  private static void ensureIsEmptyDirectory(File f) throws IOException {
+    if (f.exists()) {
+      if (f.isDirectory()) {
+        FileUtils.deleteDirectory(f);
+      } else {
+        assertTrue("Failed to delete keytab directory", f.delete());
+      }
+    }
+    assertTrue("Failed to create keytab directory", f.mkdirs());
+  }
+
+  /**
+   * Setup and start kerberosed, hbase
+   * @throws Exception
+   */
+  public QueryServerEnvironment(final Configuration confIn, int numberOfUsers, boolean tls)

Review comment:
       this one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] vdiravka commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
vdiravka commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r794507192



##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/QueryServerEnvironment.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.security.PrivilegedAction;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.ThinClientUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Due to this bug https://bugzilla.redhat.com/show_bug.cgi?id=668830 We need to use
+ * `localhost.localdomain` as host name when running these tests on Jenkins (Centos) but for Mac OS
+ * it should be `localhost` to pass. The reason is kerberos principals in this tests are looked up
+ * from /etc/hosts and a reverse DNS lookup of 127.0.0.1 is resolved to `localhost.localdomain`
+ * rather than `localhost` on Centos. KDC sees `localhost` != `localhost.localdomain` and as the
+ * result test fails with authentication error. It's also important to note these principals are
+ * shared between HDFs and HBase in this mini HBase cluster. Some more reading
+ * https://access.redhat.com/solutions/57330
+ */
+public class QueryServerEnvironment {
+  private static final Logger LOG = LoggerFactory.getLogger(QueryServerEnvironment.class);
+
+  private final File TEMP_DIR = new File(getTempDir());
+  private final File KEYTAB_DIR = new File(TEMP_DIR, "keytabs");
+  private final List<File> USER_KEYTAB_FILES = new ArrayList<>();
+
+  private static final String LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  static final String LOGIN_USER;
+
+  static {
+    try {
+       System.setProperty("sun.security.krb5.debug", "true");
+      LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME = InetAddress.getByName("127.0.0.1").getCanonicalHostName();
+      String userName = System.getProperty("user.name");
+      LOGIN_USER = userName != null ? userName : "securecluster";
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static final String SPNEGO_PRINCIPAL = "HTTP/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private static final String PQS_PRINCIPAL = "phoenixqs/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private static final String SERVICE_PRINCIPAL = LOGIN_USER + "/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private File KEYTAB;
+
+  private MiniKdc KDC;
+  private HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private LocalHBaseCluster HBASE_CLUSTER;
+  private int NUM_CREATED_USERS;
+
+  private ExecutorService PQS_EXECUTOR;
+  private QueryServer PQS;
+  private int PQS_PORT;
+  private String PQS_URL;
+
+  private boolean tls;
+
+  private static String getTempDir() {
+    StringBuilder sb = new StringBuilder(32);
+    sb.append(System.getProperty("user.dir")).append(File.separator);
+    sb.append("target").append(File.separator);
+    sb.append(QueryServerEnvironment.class.getSimpleName());
+    sb.append("-").append(UUID.randomUUID());
+    return sb.toString();
+  }
+
+  public int getPqsPort() {
+    return PQS_PORT;
+  }
+
+  public String getPqsUrl() {
+    return PQS_URL;
+  }
+
+  public boolean getTls() {
+    return tls;
+  }
+
+  public HBaseTestingUtility getUtil() {
+    return UTIL;
+  }
+
+  public String getServicePrincipal() {
+    return SERVICE_PRINCIPAL;
+  }
+
+  public File getServiceKeytab() {
+    return KEYTAB;
+  }
+
+  private static void updateDefaultRealm() throws Exception {
+    // (at least) one other phoenix test triggers the caching of this field before the KDC is up
+    // which causes principal parsing to fail.
+    Field f = KerberosName.class.getDeclaredField("defaultRealm");
+    f.setAccessible(true);
+    // Default realm for MiniKDC
+    f.set(null, "EXAMPLE.COM");
+  }
+
+  private void createUsers(int numUsers) throws Exception {
+    assertNotNull("KDC is null, was setup method called?", KDC);
+    NUM_CREATED_USERS = numUsers;
+    for (int i = 1; i <= numUsers; i++) {
+      String principal = "user" + i;
+      File keytabFile = new File(KEYTAB_DIR, principal + ".keytab");
+      KDC.createPrincipal(keytabFile, principal);
+      USER_KEYTAB_FILES.add(keytabFile);
+    }
+  }
+
+  public Map.Entry<String, File> getUser(int offset) {
+    if (!(offset > 0 && offset <= NUM_CREATED_USERS)) {
+      throw new IllegalArgumentException();
+    }
+    return new AbstractMap.SimpleImmutableEntry<String, File>("user" + offset, USER_KEYTAB_FILES.get(offset - 1));
+  }
+
+  /**
+   * Setup the security configuration for hdfs.
+   */
+  private void setHdfsSecuredConfiguration(Configuration conf) throws Exception {
+    // Set principal+keytab configuration for HDFS
+    conf.set(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
+      SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY,
+      SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
+      SPNEGO_PRINCIPAL + "@" + KDC.getRealm());
+    // Enable token access for HDFS blocks
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    // Only use HTTPS (required because we aren't using "secure" ports)
+    conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+    // Bind on localhost for spnego to have a chance at working
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+
+    // Generate SSL certs
+    File keystoresDir = new File(UTIL.getDataTestDir("keystore").toUri().getPath());
+    keystoresDir.mkdirs();
+    String sslConfDir = TlsUtil.getClasspathDir(QueryServerEnvironment.class);
+    TlsUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, conf, false);
+
+    // Magic flag to tell hdfs to not fail on using ports above 1024
+    conf.setBoolean("ignore.secure.ports.for.testing", true);
+  }
+
+  private static void ensureIsEmptyDirectory(File f) throws IOException {
+    if (f.exists()) {
+      if (f.isDirectory()) {
+        FileUtils.deleteDirectory(f);
+      } else {
+        assertTrue("Failed to delete keytab directory", f.delete());
+      }
+    }
+    assertTrue("Failed to create keytab directory", f.mkdirs());
+  }
+
+  /**
+   * Setup and start kerberosed, hbase
+   * @throws Exception
+   */
+  public QueryServerEnvironment(final Configuration confIn, int numberOfUsers, boolean tls)

Review comment:
       Sure. I made similar for `QueryServerBasicsIT`. 
   And also I will explain why not use original one.
   Also I have decided to add dependency and remove several unneeded classes. Also it allows to use `@link` in javadoc




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] lgtm-com[bot] commented on pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #2422:
URL: https://github.com/apache/drill/pull/2422#issuecomment-1005407994


   This pull request **introduces 1 alert** when merging 5040d117a2515677df6d4cf2c7c174edb8994a89 into fa2cb0f4937c0d8e797a675d8d6c13c316e48d4c - [view on LGTM.com](https://lgtm.com/projects/g/apache/drill/rev/pr-18b01353d3c800d75945d94dc316664ad25dc4f4)
   
   **new alerts:**
   
   * 1 for Potential database resource leak


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] jnturton commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
jnturton commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r787682748



##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
##########
@@ -67,42 +74,55 @@
 
   public PhoenixBatchReader(PhoenixSubScan subScan) {
     this.subScan = subScan;
+    this.impersonationEnabled = subScan.getPlugin().getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   @Override
   public boolean open(SchemaNegotiator negotiator) {
+    return impersonationEnabled
+      ? ugi.doAs((PrivilegedAction<Boolean>) () -> processOpen(negotiator))
+      : processOpen(negotiator);
+  }
+
+  private boolean processOpen(SchemaNegotiator negotiator) {
     try {
       errorContext = negotiator.parentErrorContext();
-      conn = subScan.getPlugin().getDataSource().getConnection();
-      pstmt = conn.prepareStatement(subScan.getSql());
+      DataSource ds = subScan.getPlugin().getDataSource(negotiator.userName());
+      PreparedStatement pstmt = ds.getConnection().prepareStatement(subScan.getSql());
       results = pstmt.executeQuery();
       meta = pstmt.getMetaData();
     } catch (SQLException e) {
       throw UserException
-              .dataReadError(e)
-              .message("Failed to execute the phoenix sql query. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
+        .dataReadError(e)
+        .message("Failed to execute the phoenix sql query. " + e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
     }

Review comment:
       ```suggestion
       } finally {
       pstmt.close();
   ```
   
   Something flagged by LGTM.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] vdiravka commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
vdiravka commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r785419426



##########
File path: contrib/storage-phoenix/pom.xml
##########
@@ -33,6 +33,7 @@
     <phoenix.version>5.1.2</phoenix.version>
     <!-- Keep the 2.4.2 to reduce dependency conflict -->
     <hbase.minicluster.version>2.4.2</hbase.minicluster.version>
+    <jetty.version>9.4.31.v20200723</jetty.version>

Review comment:
       Jetty is used by testing PQS:
   https://github.com/apache/phoenix-queryserver/blob/master/pom.xml#L83
   Tests failed with Drill Jetty `9.4.41.v20210516` version

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
##########
@@ -71,58 +74,64 @@ public PhoenixBatchReader(PhoenixSubScan subScan) {
 
   @Override
   public boolean open(SchemaNegotiator negotiator) {
-    try {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+    return ugi.doAs((PrivilegedAction<Boolean>) () -> {
       errorContext = negotiator.parentErrorContext();
-      conn = subScan.getPlugin().getDataSource().getConnection();
-      pstmt = conn.prepareStatement(subScan.getSql());
-      results = pstmt.executeQuery();
-      meta = pstmt.getMetaData();
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to execute the phoenix sql query. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    try {
-      negotiator.tableSchema(defineMetadata(), true);
-      reader = new PhoenixReader(negotiator.build(), columns, results);
-      bindColumns(reader.getStorage());
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to get type of columns from metadata. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    watch = Stopwatch.createStarted();
-    return true;
+      try {
+        pstmt =
+          subScan.getPlugin().getDataSource(negotiator.userName()).getConnection().prepareStatement(subScan.getSql());

Review comment:
       ok

##########
File path: contrib/storage-phoenix/pom.xml
##########
@@ -187,12 +194,79 @@
       <version>${hbase.minicluster.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-asyncfs</artifactId>
+      <type>test-jar</type>
+      <version>${hbase.minicluster.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs-client</artifactId>
       <version>${hadoop.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <version>${hbase.minicluster.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-protocol-shaded</artifactId>

Review comment:
       Agreed. It is already declared. Thanks

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
##########
@@ -71,58 +74,64 @@ public PhoenixBatchReader(PhoenixSubScan subScan) {
 
   @Override
   public boolean open(SchemaNegotiator negotiator) {
-    try {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+    return ugi.doAs((PrivilegedAction<Boolean>) () -> {
       errorContext = negotiator.parentErrorContext();
-      conn = subScan.getPlugin().getDataSource().getConnection();
-      pstmt = conn.prepareStatement(subScan.getSql());
-      results = pstmt.executeQuery();
-      meta = pstmt.getMetaData();
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to execute the phoenix sql query. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    try {
-      negotiator.tableSchema(defineMetadata(), true);
-      reader = new PhoenixReader(negotiator.build(), columns, results);
-      bindColumns(reader.getStorage());
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to get type of columns from metadata. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    watch = Stopwatch.createStarted();
-    return true;
+      try {
+        pstmt =
+          subScan.getPlugin().getDataSource(negotiator.userName()).getConnection().prepareStatement(subScan.getSql());
+        results = pstmt.executeQuery();
+        meta = pstmt.getMetaData();
+      } catch (SQLException e) {
+        throw UserException
+          .dataReadError(e)
+          .message("Failed to execute the phoenix sql query. " + e.getMessage())
+          .addContext(negotiator.parentErrorContext())
+          .build(logger);
+      }
+      try {
+        negotiator.tableSchema(defineMetadata(), true);
+        reader = new PhoenixReader(negotiator.build(), columns, results);
+        bindColumns(reader.getStorage());
+      } catch (SQLException e) {
+        throw UserException
+          .dataReadError(e)
+          .message("Failed to get type of columns from metadata. " + e.getMessage())
+          .addContext(errorContext)
+          .build(logger);
+      }
+      watch = Stopwatch.createStarted();
+      return true;
+    });
   }
 
   @Override
   public boolean next() {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();

Review comment:
       sure. Done

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
##########
@@ -39,44 +40,49 @@
  * is not always left in a healthy state by the previous user. It is better to
  * create new Phoenix Connections to ensure that you avoid any potential issues.
  */
+@Slf4j
 public class PhoenixDataSource implements DataSource {
 
   private static final String DEFAULT_URL_HEADER = "jdbc:phoenix:thin:url=http://";
   private static final String DEFAULT_SERIALIZATION = "serialization=PROTOBUF";
+  private static final String IMPERSONATED_USER_VARIABLE = "$user";
+  private static final String DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM = "doAs";
 
-  private String url;
+  private final String url;
   private Map<String, Object> connectionProperties;
-  private boolean isFatClient; // Is a fat client
-
-  public PhoenixDataSource(String url) {
-    Preconditions.checkNotNull(url);
-    this.url = url;
-  }
-
-  public PhoenixDataSource(String host, int port) {
-    Preconditions.checkNotNull(host);
-    Preconditions.checkArgument(port > 0, "Please set the correct port.");
-    this.url = new StringBuilder()
-        .append(DEFAULT_URL_HEADER)
-        .append(host)
-        .append(":")
-        .append(port)
-        .append(";")
-        .append(DEFAULT_SERIALIZATION)
-        .toString();
-  }
-
-  public PhoenixDataSource(String url, Map<String, Object> connectionProperties) {
-    this(url);
+  private boolean isFatClient;
+  private String user;

Review comment:
       done. thanks

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -18,75 +18,100 @@
 package org.apache.drill.exec.store.phoenix;
 
 import java.io.IOException;
-import java.sql.Connection;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.sql.DataSource;
 
 import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.drill.exec.util.ImpersonationUtil.getProcessUserName;
 
 public class PhoenixSchemaFactory extends AbstractSchemaFactory {
 
   private final PhoenixStoragePlugin plugin;
   private final Map<String, PhoenixSchema> schemaMap;
   private PhoenixSchema rootSchema;
+  private final boolean isDrillImpersonationEnabled;
 
   public PhoenixSchemaFactory(PhoenixStoragePlugin plugin) {
     super(plugin.getName());
     this.plugin = plugin;
-    this.schemaMap = Maps.newHashMap();
+    this.schemaMap = new HashMap<>();
+    isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    rootSchema = new PhoenixSchema(plugin, Collections.emptyList(), plugin.getName());
-    locateSchemas();
+    try {
+      rootSchema = new PhoenixSchema(schemaConfig, plugin, Collections.emptyList(), plugin.getName());
+      String schemaUser = schemaConfig.getUserName();
+      locateSchemas(schemaConfig, rootSchema.getUser(schemaUser, getProcessUserName()));
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
     parent.add(getName(), rootSchema); // resolve the top-level schema.
     for (String schemaName : rootSchema.getSubSchemaNames()) {
       PhoenixSchema schema = (PhoenixSchema) rootSchema.getSubSchema(schemaName);
       parent.add(schemaName, schema); // provide all available schemas for calcite.
     }
   }
 
-  private void locateSchemas() {
-    DataSource ds = plugin.getDataSource();
-    try (Connection conn = ds.getConnection();
-          ResultSet rs = ds.getConnection().getMetaData().getSchemas()) {
-      while (rs.next()) {
-        final String schemaName = rs.getString(1); // lookup the schema (or called database).
-        PhoenixSchema schema = new PhoenixSchema(plugin, Arrays.asList(getName()), schemaName);
-        schemaMap.put(schemaName, schema);
-      }
-      rootSchema.addSchemas(schemaMap);
-    } catch (SQLException e) {
-      throw new DrillRuntimeException(e.getMessage(), e);
+  private void locateSchemas(SchemaConfig schemaConfig, String userName) throws SQLException {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+    try {
+      ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        try (ResultSet rs = plugin.getDataSource(userName).getConnection().getMetaData().getSchemas()) {
+          while (rs.next()) {
+            final String schemaName = rs.getString(1); // lookup the schema (or called database).
+            PhoenixSchema schema = new PhoenixSchema(schemaConfig, plugin, Arrays.asList(getName()), schemaName);
+            schemaMap.put(schemaName, schema);
+          }
+          rootSchema.addSchemas(schemaMap);
+        }
+        return null;
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new SQLException(e);
     }
   }
 
-  protected static class PhoenixSchema extends AbstractSchema {
+  @Override
+  public boolean needToImpersonateReadingData() {

Review comment:
       Trying to factor out to interface the similar logic with `HiveSchemaFactory`, to implement the similar Impersonation logic for these and future datasources. Currently we don't control impersonation enabling in Phoenix via Drill, but it is not the case for Hive and can be the same for other datasources.

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -101,21 +126,26 @@ public Schema getSubSchema(String name) {
 
     @Override
     public Table getTable(String name) {
-      Table table = jdbcSchema.getTable(StringUtils.upperCase(name));
-      return table;
+      final UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();

Review comment:
       It is possible. Thanks

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixBaseTest.java
##########
@@ -38,51 +38,63 @@
 
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
 import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.LoggerFactory;
 
 import com.univocity.parsers.csv.CsvParser;
 import com.univocity.parsers.csv.CsvParserSettings;
 
-public class PhoenixBaseTest extends ClusterTest {
+public abstract class PhoenixBaseTest extends ClusterTest {
 
   private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PhoenixBaseTest.class);
 
-  private static AtomicInteger initCount = new AtomicInteger(0);
-  protected static String U_U_I_D = UUID.randomUUID().toString();
+  public final static String U_U_I_D = UUID.randomUUID().toString();
+  private final static AtomicInteger initCount = new AtomicInteger(0);
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+    PhoenixTestSuite.initPhoenixQueryServer();
     if (PhoenixTestSuite.isRunningSuite()) {
       QueryServerBasicsIT.testCatalogs();
     }
-    bootMiniCluster();
+    bootDrillMiniCluster();
+    dirTestWatcher.copyResourceToRoot(Paths.get(""));

Review comment:
       Done.
   Also I renamed `bootSecuredDrillMiniCluster` -> `startSecuredDrillCluster`

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/HttpParamImpersonationQueryServerIT.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.queryserver.QueryServerOptions;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.client.Driver;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.drill.exec.store.phoenix.secured.QueryServerEnvironment.LOGIN_USER;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class HttpParamImpersonationQueryServerIT {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HttpParamImpersonationQueryServerIT.class);
+    public static QueryServerEnvironment environment;
+
+    private static final List<TableName> SYSTEM_TABLE_NAMES = Arrays.asList(
+        PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_FUNCTION_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_SCHEMA_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_STATS_HBASE_TABLE_NAME);
+
+    public static synchronized void startQueryServerEnvironment() throws Exception {
+        //Clean up previous environment if any (Junit 4.13 @BeforeParam / @AfterParam would be an alternative)
+        if(environment != null) {
+            stopEnvironment();
+        }
+
+        final Configuration conf = new Configuration();
+        conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
+        conf.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
+        conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName(), TokenProvider.class.getName());
+
+        // Set the proxyuser settings,
+        // so that the user who is running the Drillbits/MiniDfs can impersonate user1 and user2 (not user3)
+        conf.set(String.format("hadoop.proxyuser.%s.hosts", LOGIN_USER), "*");
+        conf.set(String.format("hadoop.proxyuser.%s.users", LOGIN_USER), "user1,user2");
+        conf.setBoolean(QueryServerProperties.QUERY_SERVER_WITH_REMOTEUSEREXTRACTOR_ATTRIB, true);
+        environment = new QueryServerEnvironment(conf, 3, false);
+    }
+
+    public static synchronized void stopEnvironment() throws Exception {
+        environment.stop();
+        environment = null;
+    }
+
+    static public String getUrlTemplate() {
+        String url = Driver.CONNECT_STRING_PREFIX + "url=%s://localhost:" + environment.getPqsPort() + "?"
+                + QueryServerOptions.DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM + "=%s;authentication=SPNEGO;serialization=PROTOBUF%s";
+        if (environment.getTls()) {
+            return String.format(url, "https", "%s", ";truststore=" + TlsUtil.getTrustStoreFile().getAbsolutePath()
+                + ";truststore_password="+TlsUtil.getTrustStorePassword());

Review comment:
       Done

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
##########
@@ -98,12 +98,7 @@ public RESULT call() throws Exception {
         currentThread.setName(proxyUgi.getUserName() + ":task-delegate-thread");
         final RESULT result;
         try {
-          result = proxyUgi.doAs(new PrivilegedExceptionAction<RESULT>() {
-            @Override
-            public RESULT run() throws Exception {
-              return callable.call();

Review comment:
       Lambda usage is preferable than Anonymus classes for >jdk8. And here is very simple function, so it is fine to use the new style.
   According to the breakpoints we can replace method reference with lambda function, it allows to set specific breakpoints:
   ![Screenshot from 2022-01-18 20-18-04](https://user-images.githubusercontent.com/11904420/149996184-5177e076-fe41-4011-9e59-e4b9d396b0d5.png)
   
   

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixBaseTest.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import ch.qos.logback.classic.Level;
+import com.sun.security.auth.module.Krb5LoginModule;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
+import org.apache.drill.exec.rpc.security.kerberos.KerberosFactory;
+import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
+import org.apache.drill.exec.server.TestDrillbitResilience;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.phoenix.PhoenixStoragePluginConfig;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createSampleData;
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createSchema;
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createTables;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.environment;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.getUrlTemplate;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.grantUsersToGlobalPhoenixUserTables;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.grantUsersToPhoenixSystemTables;
+import static org.apache.drill.exec.store.phoenix.secured.SecuredPhoenixTestSuite.initPhoenixQueryServer;
+
+@Slf4j
+public abstract class SecuredPhoenixBaseTest extends ClusterTest {
+  protected static LogFixture logFixture;
+  private final static Level CURRENT_LOG_LEVEL = Level.INFO;
+
+  private final static AtomicInteger initCount = new AtomicInteger(0);
+
+  @BeforeAll
+  public static void setUpBeforeClass() throws Exception {
+    TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+    initPhoenixQueryServer();
+    bootSecuredDrillMiniCluster();
+    initializeDatabase();
+  }
+
+  private static void bootSecuredDrillMiniCluster() throws Exception {
+    logFixture = LogFixture.builder()
+      .toConsole()
+      .logger(QueryServerEnvironment.class, CURRENT_LOG_LEVEL)
+      .logger(SecuredPhoenixBaseTest.class, CURRENT_LOG_LEVEL)
+      .logger(KerberosFactory.class, CURRENT_LOG_LEVEL)
+      .logger(Krb5LoginModule.class, CURRENT_LOG_LEVEL)
+      .logger(QueryServer.class, CURRENT_LOG_LEVEL)
+      .logger(ServerAuthenticationHandler.class, CURRENT_LOG_LEVEL)
+      .build();
+
+    Map.Entry<String, File> user1 = environment.getUser(1);
+    Map.Entry<String, File> user2 = environment.getUser(2);
+    Map.Entry<String, File> user3 = environment.getUser(3);
+
+    dirTestWatcher.start(TestDrillbitResilience.class); // until DirTestWatcher ClassRule is implemented for JUnit5
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+        .configProperty(ExecConstants.USER_AUTHENTICATION_ENABLED, true)
+        .configProperty(ExecConstants.USER_AUTHENTICATOR_IMPL, UserAuthenticatorTestImpl.TYPE)
+        .configNonStringProperty(ExecConstants.AUTHENTICATION_MECHANISMS, Lists.newArrayList("kerberos"))
+        .configProperty(ExecConstants.IMPERSONATION_ENABLED, true)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_ENABLED, true)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_MECHANISM, "kerberos")
+        .configProperty(ExecConstants.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting())
+        .configProperty(ExecConstants.SERVICE_KEYTAB_LOCATION, environment.getServiceKeytab().getAbsolutePath())
+        .configClientProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting())
+        .configClientProperty(DrillProperties.USER, user1.getKey())
+        .configClientProperty(DrillProperties.KEYTAB, user1.getValue().getAbsolutePath());
+    startCluster(builder);
+    Properties user2ClientProperties = new Properties();
+    user2ClientProperties.setProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting());
+    user2ClientProperties.setProperty(DrillProperties.USER, user2.getKey());
+    user2ClientProperties.setProperty(DrillProperties.KEYTAB, user2.getValue().getAbsolutePath());
+    cluster.addClientFixture(user2ClientProperties);
+    Properties user3ClientProperties = new Properties();
+    user3ClientProperties.setProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting());
+    user3ClientProperties.setProperty(DrillProperties.USER, user3.getKey());
+    user3ClientProperties.setProperty(DrillProperties.KEYTAB, user3.getValue().getAbsolutePath());
+    cluster.addClientFixture(user3ClientProperties);
+
+    Map<String, Object> phoenixProps = new HashMap<>();
+    phoenixProps.put("phoenix.query.timeoutMs", 90000);
+    phoenixProps.put("phoenix.query.keepAliveMs", "30000");
+    phoenixProps.put("phoenix.queryserver.withRemoteUserExtractor", true);
+    StoragePluginRegistry registry = cluster.drillbit().getContext().getStorage();
+    final String doAsUrl = String.format(getUrlTemplate(), "$user");
+    logger.debug("Phoenix Query Server URL: {}", environment.getPqsUrl());
+    PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, 0, null, null,
+      doAsUrl, null, phoenixProps);
+    config.setEnabled(true);
+    registry.put(PhoenixStoragePluginConfig.NAME + "123", config);
+  }
+
+
+  /**
+   * Initialize HBase via Phoenix
+   */
+  private static void initializeDatabase() throws Exception {
+    dirTestWatcher.copyResourceToRoot(Paths.get(""));
+    if (initCount.incrementAndGet() == 1) {
+      final Map.Entry<String, File> user1 = environment.getUser(1);
+      final Map.Entry<String, File> user2 = environment.getUser(2);
+      // Build the JDBC URL by hand with the doAs
+      final UserGroupInformation serviceUgi = ImpersonationUtil.getProcessUserUGI();
+      serviceUgi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        logger.debug("Phoenix conn url: {}", environment.getPqsUrl());
+        createSchema(environment.getPqsUrl());
+        createTables(environment.getPqsUrl());
+        createSampleData(environment.getPqsUrl());
+        grantUsersToPhoenixSystemTables(Arrays.asList(user1.getKey(), user2.getKey()));
+        grantUsersToGlobalPhoenixUserTables(Arrays.asList(user1.getKey()));
+        return null;
+      });
+    }
+  }
+
+  protected interface TestWrapper {
+    void apply() throws Exception;
+  }
+
+  public void runForThreeClients(SecuredPhoenixSQLTest.TestWrapper wrapper) throws Exception {
+    runForThreeClients(wrapper, UserRemoteException.class, RuntimeException.class);
+  }
+
+  /**
+   * @param wrapper actual test case execution
+   * @param user2ExpectedException the expected Exception for user2, which can be impersonated, but hasn't permissions to the tables
+   * @param user3ExpectedException the expected Exception for user3, isn't impersonated
+   */
+  public void runForThreeClients(SecuredPhoenixSQLTest.TestWrapper wrapper, Class user2ExpectedException, Class user3ExpectedException) throws Exception {

Review comment:
       Done

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/QueryServerEnvironment.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.security.PrivilegedAction;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.ThinClientUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Due to this bug https://bugzilla.redhat.com/show_bug.cgi?id=668830 We need to use
+ * `localhost.localdomain` as host name when running these tests on Jenkins (Centos) but for Mac OS
+ * it should be `localhost` to pass. The reason is kerberos principals in this tests are looked up
+ * from /etc/hosts and a reverse DNS lookup of 127.0.0.1 is resolved to `localhost.localdomain`
+ * rather than `localhost` on Centos. KDC sees `localhost` != `localhost.localdomain` and as the
+ * result test fails with authentication error. It's also important to note these principals are
+ * shared between HDFs and HBase in this mini HBase cluster. Some more reading
+ * https://access.redhat.com/solutions/57330
+ */
+public class QueryServerEnvironment {
+  private static final Logger LOG = LoggerFactory.getLogger(QueryServerEnvironment.class);
+
+  private final File TEMP_DIR = new File(getTempDir());
+  private final File KEYTAB_DIR = new File(TEMP_DIR, "keytabs");
+  private final List<File> USER_KEYTAB_FILES = new ArrayList<>();
+
+  private static final String LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  static final String LOGIN_USER;
+
+  static {
+    try {
+       System.setProperty("sun.security.krb5.debug", "true");
+      LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME = InetAddress.getByName("127.0.0.1").getCanonicalHostName();
+      String userName = System.getProperty("user.name");
+      LOGIN_USER = userName != null ? userName : "securecluster";
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static final String SPNEGO_PRINCIPAL = "HTTP/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private static final String PQS_PRINCIPAL = "phoenixqs/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private static final String SERVICE_PRINCIPAL = LOGIN_USER + "/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private File KEYTAB;
+
+  private MiniKdc KDC;
+  private HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private LocalHBaseCluster HBASE_CLUSTER;
+  private int NUM_CREATED_USERS;
+
+  private ExecutorService PQS_EXECUTOR;
+  private QueryServer PQS;
+  private int PQS_PORT;
+  private String PQS_URL;
+
+  private boolean tls;
+
+  private static String getTempDir() {
+    StringBuilder sb = new StringBuilder(32);
+    sb.append(System.getProperty("user.dir")).append(File.separator);
+    sb.append("target").append(File.separator);
+    sb.append(QueryServerEnvironment.class.getSimpleName());
+    sb.append("-").append(UUID.randomUUID());
+    return sb.toString();
+  }
+
+  public int getPqsPort() {
+    return PQS_PORT;
+  }
+
+  public String getPqsUrl() {
+    return PQS_URL;
+  }
+
+  public boolean getTls() {
+    return tls;
+  }
+
+  public HBaseTestingUtility getUtil() {
+    return UTIL;
+  }
+
+  public String getServicePrincipal() {
+    return SERVICE_PRINCIPAL;
+  }
+
+  public File getServiceKeytab() {
+    return KEYTAB;
+  }
+
+  private static void updateDefaultRealm() throws Exception {
+    // (at least) one other phoenix test triggers the caching of this field before the KDC is up
+    // which causes principal parsing to fail.
+    Field f = KerberosName.class.getDeclaredField("defaultRealm");
+    f.setAccessible(true);
+    // Default realm for MiniKDC
+    f.set(null, "EXAMPLE.COM");
+  }
+
+  private void createUsers(int numUsers) throws Exception {
+    assertNotNull("KDC is null, was setup method called?", KDC);
+    NUM_CREATED_USERS = numUsers;
+    for (int i = 1; i <= numUsers; i++) {
+      String principal = "user" + i;
+      File keytabFile = new File(KEYTAB_DIR, principal + ".keytab");
+      KDC.createPrincipal(keytabFile, principal);
+      USER_KEYTAB_FILES.add(keytabFile);
+    }
+  }
+
+  public Map.Entry<String, File> getUser(int offset) {
+    if (!(offset > 0 && offset <= NUM_CREATED_USERS)) {
+      throw new IllegalArgumentException();
+    }
+    return new AbstractMap.SimpleImmutableEntry<String, File>("user" + offset, USER_KEYTAB_FILES.get(offset - 1));
+  }
+
+  /**
+   * Setup the security configuration for hdfs.
+   */
+  private void setHdfsSecuredConfiguration(Configuration conf) throws Exception {
+    // Set principal+keytab configuration for HDFS
+    conf.set(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
+      SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY,
+      SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
+      SPNEGO_PRINCIPAL + "@" + KDC.getRealm());
+    // Enable token access for HDFS blocks
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    // Only use HTTPS (required because we aren't using "secure" ports)
+    conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+    // Bind on localhost for spnego to have a chance at working
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+
+    // Generate SSL certs
+    File keystoresDir = new File(UTIL.getDataTestDir("keystore").toUri().getPath());
+    keystoresDir.mkdirs();
+    String sslConfDir = TlsUtil.getClasspathDir(QueryServerEnvironment.class);
+    TlsUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, conf, false);
+
+    // Magic flag to tell hdfs to not fail on using ports above 1024
+    conf.setBoolean("ignore.secure.ports.for.testing", true);
+  }
+
+  private static void ensureIsEmptyDirectory(File f) throws IOException {
+    if (f.exists()) {
+      if (f.isDirectory()) {
+        FileUtils.deleteDirectory(f);
+      } else {
+        assertTrue("Failed to delete keytab directory", f.delete());
+      }
+    }
+    assertTrue("Failed to create keytab directory", f.mkdirs());
+  }
+
+  /**
+   * Setup and start kerberosed, hbase
+   * @throws Exception
+   */
+  public QueryServerEnvironment(final Configuration confIn, int numberOfUsers, boolean tls)

Review comment:
       This is almost original `org.apache.phoenix.end2end.QueryServerEnvironment`. 
   To keep easy updating possibility, it is better do not moving methods 

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
##########
@@ -71,58 +74,64 @@ public PhoenixBatchReader(PhoenixSubScan subScan) {
 
   @Override
   public boolean open(SchemaNegotiator negotiator) {
-    try {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+    return ugi.doAs((PrivilegedAction<Boolean>) () -> {
       errorContext = negotiator.parentErrorContext();
-      conn = subScan.getPlugin().getDataSource().getConnection();
-      pstmt = conn.prepareStatement(subScan.getSql());
-      results = pstmt.executeQuery();
-      meta = pstmt.getMetaData();
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to execute the phoenix sql query. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    try {
-      negotiator.tableSchema(defineMetadata(), true);
-      reader = new PhoenixReader(negotiator.build(), columns, results);
-      bindColumns(reader.getStorage());
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to get type of columns from metadata. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    watch = Stopwatch.createStarted();
-    return true;
+      try {
+        pstmt =
+          subScan.getPlugin().getDataSource(negotiator.userName()).getConnection().prepareStatement(subScan.getSql());
+        results = pstmt.executeQuery();
+        meta = pstmt.getMetaData();
+      } catch (SQLException e) {
+        throw UserException
+          .dataReadError(e)
+          .message("Failed to execute the phoenix sql query. " + e.getMessage())
+          .addContext(negotiator.parentErrorContext())
+          .build(logger);
+      }
+      try {
+        negotiator.tableSchema(defineMetadata(), true);
+        reader = new PhoenixReader(negotiator.build(), columns, results);
+        bindColumns(reader.getStorage());
+      } catch (SQLException e) {
+        throw UserException
+          .dataReadError(e)
+          .message("Failed to get type of columns from metadata. " + e.getMessage())
+          .addContext(errorContext)
+          .build(logger);
+      }
+      watch = Stopwatch.createStarted();
+      return true;
+    });
   }
 
   @Override
   public boolean next() {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
     try {
-      while(!reader.getStorage().isFull()) {
-        if (!reader.processRow()) { // return true if one row is processed.
-          watch.stop();
-          logger.debug("Phoenix fetch total record numbers : {}", reader.getRowCount());
-          return false; // the EOF is reached.
+      return ugi.doAs((PrivilegedExceptionAction<Boolean>) () -> {

Review comment:
       Looks like  we need using this only with impersonation enabled. 
   Changed

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
##########
@@ -39,44 +40,49 @@
  * is not always left in a healthy state by the previous user. It is better to
  * create new Phoenix Connections to ensure that you avoid any potential issues.
  */
+@Slf4j
 public class PhoenixDataSource implements DataSource {
 
   private static final String DEFAULT_URL_HEADER = "jdbc:phoenix:thin:url=http://";
   private static final String DEFAULT_SERIALIZATION = "serialization=PROTOBUF";
+  private static final String IMPERSONATED_USER_VARIABLE = "$user";
+  private static final String DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM = "doAs";
 
-  private String url;
+  private final String url;
   private Map<String, Object> connectionProperties;
-  private boolean isFatClient; // Is a fat client
-
-  public PhoenixDataSource(String url) {
-    Preconditions.checkNotNull(url);
-    this.url = url;
-  }
-
-  public PhoenixDataSource(String host, int port) {
-    Preconditions.checkNotNull(host);
-    Preconditions.checkArgument(port > 0, "Please set the correct port.");
-    this.url = new StringBuilder()
-        .append(DEFAULT_URL_HEADER)
-        .append(host)
-        .append(":")
-        .append(port)
-        .append(";")
-        .append(DEFAULT_SERIALIZATION)
-        .toString();
-  }
-
-  public PhoenixDataSource(String url, Map<String, Object> connectionProperties) {
-    this(url);
+  private boolean isFatClient;
+  private String user;
+
+  public PhoenixDataSource(String url,
+                           String userName,
+                           Map<String, Object> connectionProperties,
+                           boolean impersonationEnabled) {
+    Preconditions.checkNotNull(url, userName);
     Preconditions.checkNotNull(connectionProperties);
     connectionProperties.forEach((k, v)
         -> Preconditions.checkArgument(v != null, String.format("does not accept null values : %s", k)));
+    this.url = impersonationEnabled ? doAsUserUrl(url, userName) : url;
+    this.user = userName;
     this.connectionProperties = connectionProperties;
   }
 
-  public PhoenixDataSource(String host, int port, Map<String, Object> connectionProperties) {
-    this(host, port);
-    Preconditions.checkNotNull(connectionProperties);
+  public PhoenixDataSource(String host,
+                           int port,
+                           String userName,
+                           Map<String, Object> connectionProperties,
+                           boolean impersonationEnabled) {
+    Preconditions.checkNotNull(host, userName);
+    Preconditions.checkArgument(port > 0, "Please set the correct port.");
+    this.url = new StringBuilder()
+      .append(DEFAULT_URL_HEADER)
+      .append(host)
+      .append(":")
+      .append(port)
+      .append(impersonationEnabled ? "?doAs=" + userName : "")
+      .append(";")
+      .append(DEFAULT_SERIALIZATION)
+      .toString();
+    this.user = userName;
     connectionProperties.forEach((k, v)

Review comment:
       agreed, thanks

##########
File path: contrib/storage-phoenix/README.md
##########
@@ -150,4 +155,13 @@ apache drill (phoenix123.v1)> select n_name, n_regionkey from nation limit 3 off
 | JAPAN  | 2           |
 +--------+-------------+
 3 rows selected (0.77 seconds)
-```
\ No newline at end of file
+```
+### Impersonation

Review comment:
       Agreed. Done

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
##########
@@ -21,13 +21,14 @@
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
 import java.util.Map;
 import java.util.Properties;
 import java.util.logging.Logger;
 
 import javax.sql.DataSource;
 
+import lombok.extern.slf4j.Slf4j;

Review comment:
       It sounds like an objection to using lombok :)
   Lombok makes programming easier.
   But need to say I have the same feelings. It is like a questionable Guava. It helps programming, but that lib also brings a lot of difficulties in using different Guava versions in different libs. I would rather not start using Guava than shading and patching it.
   From other side Drill with old jdk language level and without modern libs starts to be boring, old and legacy for development.
   Here is I am not introducing a new thing. So let's leave it here and keep such discussions with all the Drill dev community, if needed

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -18,75 +18,100 @@
 package org.apache.drill.exec.store.phoenix;
 
 import java.io.IOException;
-import java.sql.Connection;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.sql.DataSource;
 
 import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.drill.exec.util.ImpersonationUtil.getProcessUserName;
 
 public class PhoenixSchemaFactory extends AbstractSchemaFactory {
 
   private final PhoenixStoragePlugin plugin;
   private final Map<String, PhoenixSchema> schemaMap;
   private PhoenixSchema rootSchema;
+  private final boolean isDrillImpersonationEnabled;
 
   public PhoenixSchemaFactory(PhoenixStoragePlugin plugin) {
     super(plugin.getName());
     this.plugin = plugin;
-    this.schemaMap = Maps.newHashMap();
+    this.schemaMap = new HashMap<>();
+    isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    rootSchema = new PhoenixSchema(plugin, Collections.emptyList(), plugin.getName());
-    locateSchemas();
+    try {
+      rootSchema = new PhoenixSchema(schemaConfig, plugin, Collections.emptyList(), plugin.getName());
+      String schemaUser = schemaConfig.getUserName();
+      locateSchemas(schemaConfig, rootSchema.getUser(schemaUser, getProcessUserName()));
+    } catch (SQLException e) {
+      throw new IOException(e);

Review comment:
       done for `registerSchemas`
   a bit changed for `locateSchemas` due to checking impersonation enabled.

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
##########
@@ -169,12 +174,23 @@ private void useDriverClass() throws SQLException {
    */
   private Properties useConfProperties() {
     Properties props = new Properties();
-    props.put("phoenix.trace.frequency", "never");
-    props.put("phoenix.query.timeoutMs", 30000);
-    props.put("phoenix.query.keepAliveMs", 120000);
     if (getConnectionProperties() != null) {
       props.putAll(getConnectionProperties());
     }
+    props.putIfAbsent("phoenix.trace.frequency", "never");
+    props.putIfAbsent("phoenix.query.timeoutMs", 30000);
+    props.putIfAbsent("phoenix.query.keepAliveMs", 120000);
     return props;
   }
+
+  private String doAsUserUrl(String url, String userName) {
+    if (url.contains(DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM)) {
+      return url.replace(IMPERSONATED_USER_VARIABLE, userName);
+    } else {
+      throw UserException
+        .connectionError()
+        .message("Invalid PQS URL. Please add `doAs=$user` parameter value in case Drill Impersonation enabled")

Review comment:
       Done

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -18,75 +18,100 @@
 package org.apache.drill.exec.store.phoenix;
 
 import java.io.IOException;
-import java.sql.Connection;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.sql.DataSource;
 
 import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.drill.exec.util.ImpersonationUtil.getProcessUserName;
 
 public class PhoenixSchemaFactory extends AbstractSchemaFactory {
 
   private final PhoenixStoragePlugin plugin;
   private final Map<String, PhoenixSchema> schemaMap;
   private PhoenixSchema rootSchema;
+  private final boolean isDrillImpersonationEnabled;
 
   public PhoenixSchemaFactory(PhoenixStoragePlugin plugin) {
     super(plugin.getName());
     this.plugin = plugin;
-    this.schemaMap = Maps.newHashMap();
+    this.schemaMap = new HashMap<>();
+    isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    rootSchema = new PhoenixSchema(plugin, Collections.emptyList(), plugin.getName());
-    locateSchemas();
+    try {
+      rootSchema = new PhoenixSchema(schemaConfig, plugin, Collections.emptyList(), plugin.getName());
+      String schemaUser = schemaConfig.getUserName();
+      locateSchemas(schemaConfig, rootSchema.getUser(schemaUser, getProcessUserName()));
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
     parent.add(getName(), rootSchema); // resolve the top-level schema.
     for (String schemaName : rootSchema.getSubSchemaNames()) {
       PhoenixSchema schema = (PhoenixSchema) rootSchema.getSubSchema(schemaName);
       parent.add(schemaName, schema); // provide all available schemas for calcite.
     }
   }
 
-  private void locateSchemas() {
-    DataSource ds = plugin.getDataSource();
-    try (Connection conn = ds.getConnection();
-          ResultSet rs = ds.getConnection().getMetaData().getSchemas()) {
-      while (rs.next()) {
-        final String schemaName = rs.getString(1); // lookup the schema (or called database).
-        PhoenixSchema schema = new PhoenixSchema(plugin, Arrays.asList(getName()), schemaName);
-        schemaMap.put(schemaName, schema);
-      }
-      rootSchema.addSchemas(schemaMap);
-    } catch (SQLException e) {
-      throw new DrillRuntimeException(e.getMessage(), e);
+  private void locateSchemas(SchemaConfig schemaConfig, String userName) throws SQLException {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+    try {
+      ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        try (ResultSet rs = plugin.getDataSource(userName).getConnection().getMetaData().getSchemas()) {
+          while (rs.next()) {
+            final String schemaName = rs.getString(1); // lookup the schema (or called database).
+            PhoenixSchema schema = new PhoenixSchema(schemaConfig, plugin, Arrays.asList(getName()), schemaName);
+            schemaMap.put(schemaName, schema);
+          }
+          rootSchema.addSchemas(schemaMap);
+        }
+        return null;
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new SQLException(e);
     }
   }
 
-  protected static class PhoenixSchema extends AbstractSchema {
+  @Override
+  public boolean needToImpersonateReadingData() {
+    return isDrillImpersonationEnabled;
+  }
 
+  class PhoenixSchema extends AbstractSchema {
+    private final SchemaConfig schemaConfig;

Review comment:
       right. Thanks

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
##########
@@ -62,8 +77,12 @@ public StoragePluginConfig getConfig() {
     return config;
   }
 
-  public DataSource getDataSource() {
-    return dataSource;
+  public PhoenixDataSource getDataSource(String userName) throws SQLException {

Review comment:
       Agreed.
   I moved it along with `getDialect` and `getConvention` methods

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixBaseTest.java
##########
@@ -38,51 +38,63 @@
 
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
 import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.LoggerFactory;
 
 import com.univocity.parsers.csv.CsvParser;
 import com.univocity.parsers.csv.CsvParserSettings;
 
-public class PhoenixBaseTest extends ClusterTest {
+public abstract class PhoenixBaseTest extends ClusterTest {

Review comment:
       Agreed. Thanks

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/HttpParamImpersonationQueryServerIT.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.queryserver.QueryServerOptions;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.client.Driver;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.drill.exec.store.phoenix.secured.QueryServerEnvironment.LOGIN_USER;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class HttpParamImpersonationQueryServerIT {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HttpParamImpersonationQueryServerIT.class);

Review comment:
       removed. Thanks

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/QueryServerEnvironment.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.security.PrivilegedAction;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.ThinClientUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Due to this bug https://bugzilla.redhat.com/show_bug.cgi?id=668830 We need to use
+ * `localhost.localdomain` as host name when running these tests on Jenkins (Centos) but for Mac OS
+ * it should be `localhost` to pass. The reason is kerberos principals in this tests are looked up
+ * from /etc/hosts and a reverse DNS lookup of 127.0.0.1 is resolved to `localhost.localdomain`
+ * rather than `localhost` on Centos. KDC sees `localhost` != `localhost.localdomain` and as the
+ * result test fails with authentication error. It's also important to note these principals are
+ * shared between HDFs and HBase in this mini HBase cluster. Some more reading
+ * https://access.redhat.com/solutions/57330
+ */
+public class QueryServerEnvironment {
+  private static final Logger LOG = LoggerFactory.getLogger(QueryServerEnvironment.class);
+
+  private final File TEMP_DIR = new File(getTempDir());
+  private final File KEYTAB_DIR = new File(TEMP_DIR, "keytabs");
+  private final List<File> USER_KEYTAB_FILES = new ArrayList<>();
+
+  private static final String LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  static final String LOGIN_USER;
+
+  static {
+    try {
+       System.setProperty("sun.security.krb5.debug", "true");

Review comment:
       comment it to use for debugging purposes only

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixBaseTest.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import ch.qos.logback.classic.Level;
+import com.sun.security.auth.module.Krb5LoginModule;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
+import org.apache.drill.exec.rpc.security.kerberos.KerberosFactory;
+import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
+import org.apache.drill.exec.server.TestDrillbitResilience;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.phoenix.PhoenixStoragePluginConfig;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createSampleData;
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createSchema;
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createTables;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.environment;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.getUrlTemplate;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.grantUsersToGlobalPhoenixUserTables;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.grantUsersToPhoenixSystemTables;
+import static org.apache.drill.exec.store.phoenix.secured.SecuredPhoenixTestSuite.initPhoenixQueryServer;
+
+@Slf4j
+public abstract class SecuredPhoenixBaseTest extends ClusterTest {
+  protected static LogFixture logFixture;
+  private final static Level CURRENT_LOG_LEVEL = Level.INFO;
+
+  private final static AtomicInteger initCount = new AtomicInteger(0);
+
+  @BeforeAll
+  public static void setUpBeforeClass() throws Exception {
+    TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+    initPhoenixQueryServer();
+    bootSecuredDrillMiniCluster();
+    initializeDatabase();
+  }
+
+  private static void bootSecuredDrillMiniCluster() throws Exception {
+    logFixture = LogFixture.builder()
+      .toConsole()
+      .logger(QueryServerEnvironment.class, CURRENT_LOG_LEVEL)
+      .logger(SecuredPhoenixBaseTest.class, CURRENT_LOG_LEVEL)
+      .logger(KerberosFactory.class, CURRENT_LOG_LEVEL)
+      .logger(Krb5LoginModule.class, CURRENT_LOG_LEVEL)
+      .logger(QueryServer.class, CURRENT_LOG_LEVEL)
+      .logger(ServerAuthenticationHandler.class, CURRENT_LOG_LEVEL)
+      .build();
+
+    Map.Entry<String, File> user1 = environment.getUser(1);
+    Map.Entry<String, File> user2 = environment.getUser(2);
+    Map.Entry<String, File> user3 = environment.getUser(3);
+
+    dirTestWatcher.start(TestDrillbitResilience.class); // until DirTestWatcher ClassRule is implemented for JUnit5
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+        .configProperty(ExecConstants.USER_AUTHENTICATION_ENABLED, true)
+        .configProperty(ExecConstants.USER_AUTHENTICATOR_IMPL, UserAuthenticatorTestImpl.TYPE)
+        .configNonStringProperty(ExecConstants.AUTHENTICATION_MECHANISMS, Lists.newArrayList("kerberos"))
+        .configProperty(ExecConstants.IMPERSONATION_ENABLED, true)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_ENABLED, true)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_MECHANISM, "kerberos")
+        .configProperty(ExecConstants.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting())
+        .configProperty(ExecConstants.SERVICE_KEYTAB_LOCATION, environment.getServiceKeytab().getAbsolutePath())
+        .configClientProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting())
+        .configClientProperty(DrillProperties.USER, user1.getKey())
+        .configClientProperty(DrillProperties.KEYTAB, user1.getValue().getAbsolutePath());
+    startCluster(builder);
+    Properties user2ClientProperties = new Properties();
+    user2ClientProperties.setProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting());
+    user2ClientProperties.setProperty(DrillProperties.USER, user2.getKey());
+    user2ClientProperties.setProperty(DrillProperties.KEYTAB, user2.getValue().getAbsolutePath());
+    cluster.addClientFixture(user2ClientProperties);
+    Properties user3ClientProperties = new Properties();
+    user3ClientProperties.setProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting());
+    user3ClientProperties.setProperty(DrillProperties.USER, user3.getKey());
+    user3ClientProperties.setProperty(DrillProperties.KEYTAB, user3.getValue().getAbsolutePath());
+    cluster.addClientFixture(user3ClientProperties);
+
+    Map<String, Object> phoenixProps = new HashMap<>();
+    phoenixProps.put("phoenix.query.timeoutMs", 90000);
+    phoenixProps.put("phoenix.query.keepAliveMs", "30000");
+    phoenixProps.put("phoenix.queryserver.withRemoteUserExtractor", true);
+    StoragePluginRegistry registry = cluster.drillbit().getContext().getStorage();
+    final String doAsUrl = String.format(getUrlTemplate(), "$user");
+    logger.debug("Phoenix Query Server URL: {}", environment.getPqsUrl());
+    PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, 0, null, null,
+      doAsUrl, null, phoenixProps);
+    config.setEnabled(true);
+    registry.put(PhoenixStoragePluginConfig.NAME + "123", config);
+  }
+
+
+  /**
+   * Initialize HBase via Phoenix
+   */
+  private static void initializeDatabase() throws Exception {
+    dirTestWatcher.copyResourceToRoot(Paths.get(""));
+    if (initCount.incrementAndGet() == 1) {
+      final Map.Entry<String, File> user1 = environment.getUser(1);
+      final Map.Entry<String, File> user2 = environment.getUser(2);
+      // Build the JDBC URL by hand with the doAs
+      final UserGroupInformation serviceUgi = ImpersonationUtil.getProcessUserUGI();
+      serviceUgi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        logger.debug("Phoenix conn url: {}", environment.getPqsUrl());

Review comment:
       Done

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/HttpParamImpersonationQueryServerIT.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.queryserver.QueryServerOptions;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.client.Driver;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.drill.exec.store.phoenix.secured.QueryServerEnvironment.LOGIN_USER;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class HttpParamImpersonationQueryServerIT {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HttpParamImpersonationQueryServerIT.class);
+    public static QueryServerEnvironment environment;
+
+    private static final List<TableName> SYSTEM_TABLE_NAMES = Arrays.asList(
+        PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_FUNCTION_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_SCHEMA_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_STATS_HBASE_TABLE_NAME);
+
+    public static synchronized void startQueryServerEnvironment() throws Exception {
+        //Clean up previous environment if any (Junit 4.13 @BeforeParam / @AfterParam would be an alternative)

Review comment:
       I wanted to keep the original `org.apache.phoenix.end2end.HttpParamImpersonationQueryServerIT` class from `phoenix-queryserver` project. But looks like, we need to lead it to Drill's style.

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -18,75 +18,100 @@
 package org.apache.drill.exec.store.phoenix;
 
 import java.io.IOException;
-import java.sql.Connection;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.sql.DataSource;
 
 import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.drill.exec.util.ImpersonationUtil.getProcessUserName;
 
 public class PhoenixSchemaFactory extends AbstractSchemaFactory {
 
   private final PhoenixStoragePlugin plugin;
   private final Map<String, PhoenixSchema> schemaMap;
   private PhoenixSchema rootSchema;
+  private final boolean isDrillImpersonationEnabled;
 
   public PhoenixSchemaFactory(PhoenixStoragePlugin plugin) {
     super(plugin.getName());
     this.plugin = plugin;
-    this.schemaMap = Maps.newHashMap();
+    this.schemaMap = new HashMap<>();
+    isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    rootSchema = new PhoenixSchema(plugin, Collections.emptyList(), plugin.getName());
-    locateSchemas();
+    try {
+      rootSchema = new PhoenixSchema(schemaConfig, plugin, Collections.emptyList(), plugin.getName());
+      String schemaUser = schemaConfig.getUserName();
+      locateSchemas(schemaConfig, rootSchema.getUser(schemaUser, getProcessUserName()));

Review comment:
       Changed

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -101,21 +126,26 @@ public Schema getSubSchema(String name) {
 
     @Override
     public Table getTable(String name) {
-      Table table = jdbcSchema.getTable(StringUtils.upperCase(name));
-      return table;
+      final UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+      return ugi.doAs((PrivilegedAction<Table>) () -> jdbcSchema.getTable(StringUtils.upperCase(name)));
     }
 
     @Override
     public Set<String> getTableNames() {
-      Set<String> tables = jdbcSchema.getTableNames();
-      return tables;
+      final UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+      return ugi.doAs((PrivilegedAction<Set<String>>) jdbcSchema::getTableNames);
     }
 
     @Override
     public String getTypeName() {
       return PhoenixStoragePluginConfig.NAME;
     }
 
+    @Override
+    public String getUser(String impersonated, String notImpersonated) {

Review comment:
       Agreed.
   Also I moved `addSchemas` in the end after the overrided methods.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java
##########
@@ -66,7 +69,11 @@ protected void validateFrom(SqlNode node, RelDataType targetRowType, SqlValidato
           }
       }
     }
-    super.validateFrom(node, targetRowType, scope);
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+      ugi.doAs((PrivilegedAction<Void>) () -> {

Review comment:
       fixed. Thanks

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixDataTypeTest.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.U_U_I_D;
+import static org.apache.drill.test.rowSet.RowSetUtilities.boolArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.byteArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.doubleArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.longArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.shortArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+
+@Tag(SlowTest.TAG)
+@Tag(RowSetTests.TAG)
+public class SecuredPhoenixDataTypeTest extends SecuredPhoenixBaseTest {
+
+  @Test
+  public void testDataType() throws Exception {
+    runForThreeClients(this::doTestDataType);
+  }
+
+  public void doTestDataType() throws Exception {

Review comment:
       Missed this method. Done

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/TlsUtil.java
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.bouncycastle.x509.X509V1CertificateGenerator;

Review comment:
       The `TlsUtil` is from `phoenix-queryserver-it` testing module similar to `QueryServerThread`, which you used.
   I don't think we need modifying it until any critical vulnerabilities are introduced.
   The other approach is to use dependency for `phoenix-queryserver-it`. Possibly this way will be cleaner

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
##########
@@ -181,22 +184,25 @@ public BasicRecordCollector(FilterEvaluator filterEvaluator, OptionManager optio
     @Override
     public List<Records.Column> columns(String schemaPath, SchemaPlus schema) {
       AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
-      List<Records.Column> records = new ArrayList<>();
-      for (Pair<String, ? extends Table> tableNameToTable : drillSchema.getTablesByNames(schema.getTableNames())) {
-        String tableName = tableNameToTable.getKey();
-        Table table = tableNameToTable.getValue();
-        Schema.TableType tableType = table.getJdbcTableType();
-
-        if (filterEvaluator.shouldVisitTable(schemaPath, tableName, tableType)) {
-          RelDataType tableRow = table.getRowType(new JavaTypeFactoryImpl(DRILL_REL_DATATYPE_SYSTEM));
-          for (RelDataTypeField field : tableRow.getFieldList()) {
-            if (filterEvaluator.shouldVisitColumn(schemaPath, tableName, field.getName())) {
-              records.add(new Records.Column(IS_CATALOG_NAME, schemaPath, tableName, field));
+      UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+      return ugi.doAs((PrivilegedAction<List<Records.Column>>) () -> {

Review comment:
       Added check impersonation enabled




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
luocooong commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r790134460



##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
##########
@@ -71,58 +74,64 @@ public PhoenixBatchReader(PhoenixSubScan subScan) {
 
   @Override
   public boolean open(SchemaNegotiator negotiator) {
-    try {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+    return ugi.doAs((PrivilegedAction<Boolean>) () -> {
       errorContext = negotiator.parentErrorContext();
-      conn = subScan.getPlugin().getDataSource().getConnection();
-      pstmt = conn.prepareStatement(subScan.getSql());
-      results = pstmt.executeQuery();
-      meta = pstmt.getMetaData();
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to execute the phoenix sql query. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    try {
-      negotiator.tableSchema(defineMetadata(), true);
-      reader = new PhoenixReader(negotiator.build(), columns, results);
-      bindColumns(reader.getStorage());
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to get type of columns from metadata. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    watch = Stopwatch.createStarted();
-    return true;
+      try {
+        pstmt =
+          subScan.getPlugin().getDataSource(negotiator.userName()).getConnection().prepareStatement(subScan.getSql());
+        results = pstmt.executeQuery();
+        meta = pstmt.getMetaData();
+      } catch (SQLException e) {
+        throw UserException
+          .dataReadError(e)
+          .message("Failed to execute the phoenix sql query. " + e.getMessage())
+          .addContext(negotiator.parentErrorContext())
+          .build(logger);
+      }
+      try {
+        negotiator.tableSchema(defineMetadata(), true);
+        reader = new PhoenixReader(negotiator.build(), columns, results);
+        bindColumns(reader.getStorage());
+      } catch (SQLException e) {
+        throw UserException
+          .dataReadError(e)
+          .message("Failed to get type of columns from metadata. " + e.getMessage())
+          .addContext(errorContext)
+          .build(logger);
+      }
+      watch = Stopwatch.createStarted();
+      return true;
+    });
   }
 
   @Override
   public boolean next() {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
     try {
-      while(!reader.getStorage().isFull()) {
-        if (!reader.processRow()) { // return true if one row is processed.
-          watch.stop();
-          logger.debug("Phoenix fetch total record numbers : {}", reader.getRowCount());
-          return false; // the EOF is reached.
+      return ugi.doAs((PrivilegedExceptionAction<Boolean>) () -> {

Review comment:
       I fully agree that we will not use the impersonation feature until it is enabled, thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
luocooong commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r790134758



##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
##########
@@ -21,13 +21,14 @@
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
 import java.util.Map;
 import java.util.Properties;
 import java.util.logging.Logger;
 
 import javax.sql.DataSource;
 
+import lombok.extern.slf4j.Slf4j;

Review comment:
       It seems that we have the same understanding of the situation. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] jnturton commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
jnturton commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r787682748



##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
##########
@@ -67,42 +74,55 @@
 
   public PhoenixBatchReader(PhoenixSubScan subScan) {
     this.subScan = subScan;
+    this.impersonationEnabled = subScan.getPlugin().getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   @Override
   public boolean open(SchemaNegotiator negotiator) {
+    return impersonationEnabled
+      ? ugi.doAs((PrivilegedAction<Boolean>) () -> processOpen(negotiator))
+      : processOpen(negotiator);
+  }
+
+  private boolean processOpen(SchemaNegotiator negotiator) {
     try {
       errorContext = negotiator.parentErrorContext();
-      conn = subScan.getPlugin().getDataSource().getConnection();
-      pstmt = conn.prepareStatement(subScan.getSql());
+      DataSource ds = subScan.getPlugin().getDataSource(negotiator.userName());
+      PreparedStatement pstmt = ds.getConnection().prepareStatement(subScan.getSql());
       results = pstmt.executeQuery();
       meta = pstmt.getMetaData();
     } catch (SQLException e) {
       throw UserException
-              .dataReadError(e)
-              .message("Failed to execute the phoenix sql query. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
+        .dataReadError(e)
+        .message("Failed to execute the phoenix sql query. " + e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
     }

Review comment:
       ```suggestion
       } finally {
         pstmt.close();
       }
   ```
   
   Something flagged by LGTM.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] jnturton commented on pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
jnturton commented on pull request #2422:
URL: https://github.com/apache/drill/pull/2422#issuecomment-1017358574


   Hi @luocooong!  Has @vdiravka addressed everything with this latest push?  Excuse my nudging but this is one of two open PRs for 1.20.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] Z0ltrix commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
Z0ltrix commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r790289150



##########
File path: contrib/storage-phoenix/README.md
##########
@@ -87,6 +79,16 @@ Tips :
 }
 ```
 
+### Impersonation
+Configurations :
+1. Enable [Drill User Impersonation](https://drill.apache.org/docs/configuring-user-impersonation/)
+2. Enable [PQS Impersonation](https://phoenix.apache.org/server.html#Impersonation)
+3. PQS URL:
+  1. Provide `host` and `port` and Drill will generate the PQS URL with a doAs parameter of current session user
+  2. Provide the `jdbcURL` with a `doAs` url param and `$user` placeholder as a value, for instance:

Review comment:
       Ah... Sounds legit. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
luocooong commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r790244115



##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/QueryServerEnvironment.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.security.PrivilegedAction;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.ThinClientUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Due to this bug https://bugzilla.redhat.com/show_bug.cgi?id=668830 We need to use
+ * `localhost.localdomain` as host name when running these tests on Jenkins (Centos) but for Mac OS
+ * it should be `localhost` to pass. The reason is kerberos principals in this tests are looked up
+ * from /etc/hosts and a reverse DNS lookup of 127.0.0.1 is resolved to `localhost.localdomain`
+ * rather than `localhost` on Centos. KDC sees `localhost` != `localhost.localdomain` and as the
+ * result test fails with authentication error. It's also important to note these principals are
+ * shared between HDFs and HBase in this mini HBase cluster. Some more reading
+ * https://access.redhat.com/solutions/57330
+ */
+public class QueryServerEnvironment {
+  private static final Logger LOG = LoggerFactory.getLogger(QueryServerEnvironment.class);
+
+  private final File TEMP_DIR = new File(getTempDir());
+  private final File KEYTAB_DIR = new File(TEMP_DIR, "keytabs");
+  private final List<File> USER_KEYTAB_FILES = new ArrayList<>();
+
+  private static final String LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  static final String LOGIN_USER;
+
+  static {
+    try {
+       System.setProperty("sun.security.krb5.debug", "true");
+      LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME = InetAddress.getByName("127.0.0.1").getCanonicalHostName();
+      String userName = System.getProperty("user.name");
+      LOGIN_USER = userName != null ? userName : "securecluster";
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static final String SPNEGO_PRINCIPAL = "HTTP/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private static final String PQS_PRINCIPAL = "phoenixqs/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private static final String SERVICE_PRINCIPAL = LOGIN_USER + "/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private File KEYTAB;
+
+  private MiniKdc KDC;
+  private HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private LocalHBaseCluster HBASE_CLUSTER;
+  private int NUM_CREATED_USERS;
+
+  private ExecutorService PQS_EXECUTOR;
+  private QueryServer PQS;
+  private int PQS_PORT;
+  private String PQS_URL;
+
+  private boolean tls;
+
+  private static String getTempDir() {
+    StringBuilder sb = new StringBuilder(32);
+    sb.append(System.getProperty("user.dir")).append(File.separator);
+    sb.append("target").append(File.separator);
+    sb.append(QueryServerEnvironment.class.getSimpleName());
+    sb.append("-").append(UUID.randomUUID());
+    return sb.toString();
+  }
+
+  public int getPqsPort() {
+    return PQS_PORT;
+  }
+
+  public String getPqsUrl() {
+    return PQS_URL;
+  }
+
+  public boolean getTls() {
+    return tls;
+  }
+
+  public HBaseTestingUtility getUtil() {
+    return UTIL;
+  }
+
+  public String getServicePrincipal() {
+    return SERVICE_PRINCIPAL;
+  }
+
+  public File getServiceKeytab() {
+    return KEYTAB;
+  }
+
+  private static void updateDefaultRealm() throws Exception {
+    // (at least) one other phoenix test triggers the caching of this field before the KDC is up
+    // which causes principal parsing to fail.
+    Field f = KerberosName.class.getDeclaredField("defaultRealm");
+    f.setAccessible(true);
+    // Default realm for MiniKDC
+    f.set(null, "EXAMPLE.COM");
+  }
+
+  private void createUsers(int numUsers) throws Exception {
+    assertNotNull("KDC is null, was setup method called?", KDC);
+    NUM_CREATED_USERS = numUsers;
+    for (int i = 1; i <= numUsers; i++) {
+      String principal = "user" + i;
+      File keytabFile = new File(KEYTAB_DIR, principal + ".keytab");
+      KDC.createPrincipal(keytabFile, principal);
+      USER_KEYTAB_FILES.add(keytabFile);
+    }
+  }
+
+  public Map.Entry<String, File> getUser(int offset) {
+    if (!(offset > 0 && offset <= NUM_CREATED_USERS)) {
+      throw new IllegalArgumentException();
+    }
+    return new AbstractMap.SimpleImmutableEntry<String, File>("user" + offset, USER_KEYTAB_FILES.get(offset - 1));
+  }
+
+  /**
+   * Setup the security configuration for hdfs.
+   */
+  private void setHdfsSecuredConfiguration(Configuration conf) throws Exception {
+    // Set principal+keytab configuration for HDFS
+    conf.set(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
+      SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY,
+      SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
+      SPNEGO_PRINCIPAL + "@" + KDC.getRealm());
+    // Enable token access for HDFS blocks
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    // Only use HTTPS (required because we aren't using "secure" ports)
+    conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+    // Bind on localhost for spnego to have a chance at working
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+
+    // Generate SSL certs
+    File keystoresDir = new File(UTIL.getDataTestDir("keystore").toUri().getPath());
+    keystoresDir.mkdirs();
+    String sslConfDir = TlsUtil.getClasspathDir(QueryServerEnvironment.class);
+    TlsUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, conf, false);
+
+    // Magic flag to tell hdfs to not fail on using ports above 1024
+    conf.setBoolean("ignore.secure.ports.for.testing", true);
+  }
+
+  private static void ensureIsEmptyDirectory(File f) throws IOException {
+    if (f.exists()) {
+      if (f.isDirectory()) {
+        FileUtils.deleteDirectory(f);
+      } else {
+        assertTrue("Failed to delete keytab directory", f.delete());
+      }
+    }
+    assertTrue("Failed to create keytab directory", f.mkdirs());
+  }
+
+  /**
+   * Setup and start kerberosed, hbase
+   * @throws Exception
+   */
+  public QueryServerEnvironment(final Configuration confIn, int numberOfUsers, boolean tls)

Review comment:
       I have an idea, could we add comments to the heads of all copy of classes? for example :
   ```java
   public class QueryServerEnvironment { // This is a copy of {@link org.apache.phoenix.end2end.XXX}
   ```
   This can answer the same questions for new developers, thank you.
   ```
   TlsUtil.java
   QueryServerEnvironment.java
   HttpParamImpersonationQueryServerIT.java
   ...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
luocooong commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r790245946



##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
##########
@@ -98,12 +98,7 @@ public RESULT call() throws Exception {
         currentThread.setName(proxyUgi.getUserName() + ":task-delegate-thread");
         final RESULT result;
         try {
-          result = proxyUgi.doAs(new PrivilegedExceptionAction<RESULT>() {
-            @Override
-            public RESULT run() throws Exception {
-              return callable.call();

Review comment:
       Perhaps we should consider the users of Eclipse and IDEA.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
luocooong commented on pull request #2422:
URL: https://github.com/apache/drill/pull/2422#issuecomment-1022718968


   @vdiravka Thanks for the revision, we got better results.
   There are a few small suggestions that need to be worked on (marked "this one"), and we can squash all commits after the new revision is complete.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] jnturton commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
jnturton commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r787651312



##########
File path: contrib/storage-phoenix/pom.xml
##########
@@ -33,6 +33,7 @@
     <phoenix.version>5.1.2</phoenix.version>
     <!-- Keep the 2.4.2 to reduce dependency conflict -->
     <hbase.minicluster.version>2.4.2</hbase.minicluster.version>
+    <jetty.version>9.4.31.v20200723</jetty.version>

Review comment:
       @vdiravka I cannot find any pom file in the Drill tree that actually _uses_ the property jetty.version.  Or does it get used in the pom files of some of Drill's dependencies?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] jnturton commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
jnturton commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r787670219



##########
File path: contrib/storage-phoenix/pom.xml
##########
@@ -33,6 +33,7 @@
     <phoenix.version>5.1.2</phoenix.version>
     <!-- Keep the 2.4.2 to reduce dependency conflict -->
     <hbase.minicluster.version>2.4.2</hbase.minicluster.version>
+    <jetty.version>9.4.31.v20200723</jetty.version>

Review comment:
       Nevermind, my search was just broken!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] lgtm-com[bot] commented on pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #2422:
URL: https://github.com/apache/drill/pull/2422#issuecomment-1016404261


   This pull request **introduces 1 alert** when merging 2a2e37ca73f127731cf849bcd8bce006f5c8c7a0 into ee874af953ed99614c8e7291ea9164b4ddc5ff24 - [view on LGTM.com](https://lgtm.com/projects/g/apache/drill/rev/pr-9217048ea766a8f5841fc4cdae019a262ea5c758)
   
   **new alerts:**
   
   * 1 for Potential database resource leak


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
luocooong commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r790242245



##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/HttpParamImpersonationQueryServerIT.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.queryserver.QueryServerOptions;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.client.Driver;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.drill.exec.store.phoenix.secured.QueryServerEnvironment.LOGIN_USER;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class HttpParamImpersonationQueryServerIT {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HttpParamImpersonationQueryServerIT.class);
+    public static QueryServerEnvironment environment;
+
+    private static final List<TableName> SYSTEM_TABLE_NAMES = Arrays.asList(
+        PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_FUNCTION_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_SCHEMA_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_STATS_HBASE_TABLE_NAME);
+
+    public static synchronized void startQueryServerEnvironment() throws Exception {
+        //Clean up previous environment if any (Junit 4.13 @BeforeParam / @AfterParam would be an alternative)
+        if(environment != null) {
+            stopEnvironment();
+        }
+
+        final Configuration conf = new Configuration();
+        conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
+        conf.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
+        conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName(), TokenProvider.class.getName());
+
+        // Set the proxyuser settings,
+        // so that the user who is running the Drillbits/MiniDfs can impersonate user1 and user2 (not user3)
+        conf.set(String.format("hadoop.proxyuser.%s.hosts", LOGIN_USER), "*");
+        conf.set(String.format("hadoop.proxyuser.%s.users", LOGIN_USER), "user1,user2");
+        conf.setBoolean(QueryServerProperties.QUERY_SERVER_WITH_REMOTEUSEREXTRACTOR_ATTRIB, true);
+        environment = new QueryServerEnvironment(conf, 3, false);
+    }
+
+    public static synchronized void stopEnvironment() throws Exception {
+        environment.stop();
+        environment = null;
+    }
+
+    static public String getUrlTemplate() {
+        String url = Driver.CONNECT_STRING_PREFIX + "url=%s://localhost:" + environment.getPqsPort() + "?"
+                + QueryServerOptions.DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM + "=%s;authentication=SPNEGO;serialization=PROTOBUF%s";
+        if (environment.getTls()) {
+            return String.format(url, "https", "%s", ";truststore=" + TlsUtil.getTrustStoreFile().getAbsolutePath()
+                + ";truststore_password="+TlsUtil.getTrustStorePassword());

Review comment:
       ```
   ";truststore_password="+TlsUtil.getTrustStorePassword()
   ```
   It's best to add a whitespace before and after `+`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on a change in pull request #2422: DRILL-8061: Add Impersonation Support for Phoenix

Posted by GitBox <gi...@apache.org>.
luocooong commented on a change in pull request #2422:
URL: https://github.com/apache/drill/pull/2422#discussion_r780727669



##########
File path: contrib/storage-phoenix/README.md
##########
@@ -150,4 +155,13 @@ apache drill (phoenix123.v1)> select n_name, n_regionkey from nation limit 3 off
 | JAPAN  | 2           |
 +--------+-------------+
 3 rows selected (0.77 seconds)
-```
\ No newline at end of file
+```
+### Impersonation

Review comment:
       Is it possible to put this section after the `Configuration`?

##########
File path: contrib/storage-phoenix/pom.xml
##########
@@ -33,6 +33,7 @@
     <phoenix.version>5.1.2</phoenix.version>
     <!-- Keep the 2.4.2 to reduce dependency conflict -->
     <hbase.minicluster.version>2.4.2</hbase.minicluster.version>
+    <jetty.version>9.4.31.v20200723</jetty.version>

Review comment:
       If we do not actually use Jetty, can be removed.

##########
File path: contrib/storage-phoenix/pom.xml
##########
@@ -187,12 +194,79 @@
       <version>${hbase.minicluster.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-asyncfs</artifactId>
+      <type>test-jar</type>
+      <version>${hbase.minicluster.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs-client</artifactId>
       <version>${hadoop.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <version>${hbase.minicluster.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-protocol-shaded</artifactId>

Review comment:
       Duplicate declaration of dependencies, at line 266.

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
##########
@@ -21,13 +21,14 @@
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
 import java.util.Map;
 import java.util.Properties;
 import java.util.logging.Logger;
 
 import javax.sql.DataSource;
 
+import lombok.extern.slf4j.Slf4j;

Review comment:
       I have no objection to using lombok..
   But the usability that depends on the IDE plugin(Not JDK built-in) is difficult to accept (to me personally).

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
##########
@@ -71,58 +74,64 @@ public PhoenixBatchReader(PhoenixSubScan subScan) {
 
   @Override
   public boolean open(SchemaNegotiator negotiator) {
-    try {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+    return ugi.doAs((PrivilegedAction<Boolean>) () -> {
       errorContext = negotiator.parentErrorContext();
-      conn = subScan.getPlugin().getDataSource().getConnection();
-      pstmt = conn.prepareStatement(subScan.getSql());
-      results = pstmt.executeQuery();
-      meta = pstmt.getMetaData();
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to execute the phoenix sql query. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    try {
-      negotiator.tableSchema(defineMetadata(), true);
-      reader = new PhoenixReader(negotiator.build(), columns, results);
-      bindColumns(reader.getStorage());
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to get type of columns from metadata. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    watch = Stopwatch.createStarted();
-    return true;
+      try {
+        pstmt =
+          subScan.getPlugin().getDataSource(negotiator.userName()).getConnection().prepareStatement(subScan.getSql());
+        results = pstmt.executeQuery();
+        meta = pstmt.getMetaData();
+      } catch (SQLException e) {
+        throw UserException
+          .dataReadError(e)
+          .message("Failed to execute the phoenix sql query. " + e.getMessage())
+          .addContext(negotiator.parentErrorContext())
+          .build(logger);
+      }
+      try {
+        negotiator.tableSchema(defineMetadata(), true);
+        reader = new PhoenixReader(negotiator.build(), columns, results);
+        bindColumns(reader.getStorage());
+      } catch (SQLException e) {
+        throw UserException
+          .dataReadError(e)
+          .message("Failed to get type of columns from metadata. " + e.getMessage())
+          .addContext(errorContext)
+          .build(logger);
+      }
+      watch = Stopwatch.createStarted();
+      return true;
+    });
   }
 
   @Override
   public boolean next() {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();

Review comment:
       Is it possible to define the `ugi` variable in class scope? we have already used this variable in the `open()`.

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
##########
@@ -39,44 +40,49 @@
  * is not always left in a healthy state by the previous user. It is better to
  * create new Phoenix Connections to ensure that you avoid any potential issues.
  */
+@Slf4j
 public class PhoenixDataSource implements DataSource {
 
   private static final String DEFAULT_URL_HEADER = "jdbc:phoenix:thin:url=http://";
   private static final String DEFAULT_SERIALIZATION = "serialization=PROTOBUF";
+  private static final String IMPERSONATED_USER_VARIABLE = "$user";
+  private static final String DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM = "doAs";
 
-  private String url;
+  private final String url;
   private Map<String, Object> connectionProperties;
-  private boolean isFatClient; // Is a fat client
-
-  public PhoenixDataSource(String url) {
-    Preconditions.checkNotNull(url);
-    this.url = url;
-  }
-
-  public PhoenixDataSource(String host, int port) {
-    Preconditions.checkNotNull(host);
-    Preconditions.checkArgument(port > 0, "Please set the correct port.");
-    this.url = new StringBuilder()
-        .append(DEFAULT_URL_HEADER)
-        .append(host)
-        .append(":")
-        .append(port)
-        .append(";")
-        .append(DEFAULT_SERIALIZATION)
-        .toString();
-  }
-
-  public PhoenixDataSource(String url, Map<String, Object> connectionProperties) {
-    this(url);
+  private boolean isFatClient;
+  private String user;

Review comment:
       We can also define the `user` variable to final scope (and, we can put this variable after the url variable).

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixBaseTest.java
##########
@@ -38,51 +38,63 @@
 
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
 import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.LoggerFactory;
 
 import com.univocity.parsers.csv.CsvParser;
 import com.univocity.parsers.csv.CsvParserSettings;
 
-public class PhoenixBaseTest extends ClusterTest {
+public abstract class PhoenixBaseTest extends ClusterTest {
 
   private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PhoenixBaseTest.class);
 
-  private static AtomicInteger initCount = new AtomicInteger(0);
-  protected static String U_U_I_D = UUID.randomUUID().toString();
+  public final static String U_U_I_D = UUID.randomUUID().toString();
+  private final static AtomicInteger initCount = new AtomicInteger(0);
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+    PhoenixTestSuite.initPhoenixQueryServer();
     if (PhoenixTestSuite.isRunningSuite()) {
       QueryServerBasicsIT.testCatalogs();
     }
-    bootMiniCluster();
+    bootDrillMiniCluster();
+    dirTestWatcher.copyResourceToRoot(Paths.get(""));

Review comment:
       We can move the action of copy into the `bootDrillMiniCluster()`, then rename the function to `startDrillCluster()` for better.

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixBaseTest.java
##########
@@ -38,51 +38,63 @@
 
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
 import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.LoggerFactory;
 
 import com.univocity.parsers.csv.CsvParser;
 import com.univocity.parsers.csv.CsvParserSettings;
 
-public class PhoenixBaseTest extends ClusterTest {
+public abstract class PhoenixBaseTest extends ClusterTest {
 
   private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PhoenixBaseTest.class);
 
-  private static AtomicInteger initCount = new AtomicInteger(0);
-  protected static String U_U_I_D = UUID.randomUUID().toString();
+  public final static String U_U_I_D = UUID.randomUUID().toString();
+  private final static AtomicInteger initCount = new AtomicInteger(0);
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+    PhoenixTestSuite.initPhoenixQueryServer();
     if (PhoenixTestSuite.isRunningSuite()) {
       QueryServerBasicsIT.testCatalogs();
     }
-    bootMiniCluster();
+    bootDrillMiniCluster();
+    dirTestWatcher.copyResourceToRoot(Paths.get(""));
     if (initCount.incrementAndGet() == 1) {
-      createSchema();
-      createTables();
-      createSampleData();
+      createSchema(QueryServerBasicsIT.CONN_STRING);
+      createTables(QueryServerBasicsIT.CONN_STRING);
+      createSampleData(QueryServerBasicsIT.CONN_STRING);
     }
   }
 
-  public static void bootMiniCluster() throws Exception {
-    ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher);
+  @AfterClass
+  public static void tearDownCluster() throws Exception {
+    if (!PhoenixTestSuite.isRunningSuite()) {
+      PhoenixTestSuite.tearDownCluster();
+    }
+  }
+
+  public static void bootDrillMiniCluster() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
     startCluster(builder);
-    dirTestWatcher.copyResourceToRoot(Paths.get(""));
     Map<String, Object> props = Maps.newHashMap();
     props.put("phoenix.query.timeoutMs", 90000);
     props.put("phoenix.query.keepAliveMs", "30000");
     StoragePluginRegistry registry = cluster.drillbit().getContext().getStorage();
-    PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, 0, null, null, QueryServerBasicsIT.CONN_STRING, null, props);
+    PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, 0, null, null,
+      QueryServerBasicsIT.CONN_STRING, null, props);
     config.setEnabled(true);
     registry.put(PhoenixStoragePluginConfig.NAME + "123", config);
   }
 
-  public static void createSchema() throws Exception {
-    try (final Connection connection = DriverManager.getConnection(QueryServerBasicsIT.CONN_STRING)) {
+  public static void createSchema(String connString) throws Exception {
+    try (final Connection connection = DriverManager.getConnection(connString)) {
+      logger.debug("Phoenix connection established for createSchema with url {}", connString);

Review comment:
       ```suggestion
         logger.debug("Phoenix connection established with the specified url : {}", connString);
   ```

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/HttpParamImpersonationQueryServerIT.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.queryserver.QueryServerOptions;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.client.Driver;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.drill.exec.store.phoenix.secured.QueryServerEnvironment.LOGIN_USER;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class HttpParamImpersonationQueryServerIT {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HttpParamImpersonationQueryServerIT.class);
+    public static QueryServerEnvironment environment;
+
+    private static final List<TableName> SYSTEM_TABLE_NAMES = Arrays.asList(
+        PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_FUNCTION_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_SCHEMA_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_STATS_HBASE_TABLE_NAME);
+
+    public static synchronized void startQueryServerEnvironment() throws Exception {
+        //Clean up previous environment if any (Junit 4.13 @BeforeParam / @AfterParam would be an alternative)
+        if(environment != null) {
+            stopEnvironment();
+        }
+
+        final Configuration conf = new Configuration();
+        conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
+        conf.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
+        conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName(), TokenProvider.class.getName());
+
+        // Set the proxyuser settings,
+        // so that the user who is running the Drillbits/MiniDfs can impersonate user1 and user2 (not user3)
+        conf.set(String.format("hadoop.proxyuser.%s.hosts", LOGIN_USER), "*");
+        conf.set(String.format("hadoop.proxyuser.%s.users", LOGIN_USER), "user1,user2");
+        conf.setBoolean(QueryServerProperties.QUERY_SERVER_WITH_REMOTEUSEREXTRACTOR_ATTRIB, true);
+        environment = new QueryServerEnvironment(conf, 3, false);
+    }
+
+    public static synchronized void stopEnvironment() throws Exception {
+        environment.stop();
+        environment = null;
+    }
+
+    static public String getUrlTemplate() {
+        String url = Driver.CONNECT_STRING_PREFIX + "url=%s://localhost:" + environment.getPqsPort() + "?"
+                + QueryServerOptions.DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM + "=%s;authentication=SPNEGO;serialization=PROTOBUF%s";
+        if (environment.getTls()) {
+            return String.format(url, "https", "%s", ";truststore=" + TlsUtil.getTrustStoreFile().getAbsolutePath()
+                + ";truststore_password="+TlsUtil.getTrustStorePassword());

Review comment:
       We can add a whitespace before and after `+`.
   Also, the code indentation (usually two spaces) for this class seems incongruous and needs to be updated.

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/HttpParamImpersonationQueryServerIT.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.queryserver.QueryServerOptions;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.client.Driver;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.drill.exec.store.phoenix.secured.QueryServerEnvironment.LOGIN_USER;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class HttpParamImpersonationQueryServerIT {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HttpParamImpersonationQueryServerIT.class);

Review comment:
       The value of the field HttpParamImpersonationQueryServerIT.LOG is not used.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
##########
@@ -98,12 +98,7 @@ public RESULT call() throws Exception {
         currentThread.setName(proxyUgi.getUserName() + ":task-delegate-thread");
         final RESULT result;
         try {
-          result = proxyUgi.doAs(new PrivilegedExceptionAction<RESULT>() {
-            @Override
-            public RESULT run() throws Exception {
-              return callable.call();

Review comment:
       +1.
   But, this style is better to understand and easily put breakpoints.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
##########
@@ -181,22 +184,25 @@ public BasicRecordCollector(FilterEvaluator filterEvaluator, OptionManager optio
     @Override
     public List<Records.Column> columns(String schemaPath, SchemaPlus schema) {
       AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
-      List<Records.Column> records = new ArrayList<>();
-      for (Pair<String, ? extends Table> tableNameToTable : drillSchema.getTablesByNames(schema.getTableNames())) {
-        String tableName = tableNameToTable.getKey();
-        Table table = tableNameToTable.getValue();
-        Schema.TableType tableType = table.getJdbcTableType();
-
-        if (filterEvaluator.shouldVisitTable(schemaPath, tableName, tableType)) {
-          RelDataType tableRow = table.getRowType(new JavaTypeFactoryImpl(DRILL_REL_DATATYPE_SYSTEM));
-          for (RelDataTypeField field : tableRow.getFieldList()) {
-            if (filterEvaluator.shouldVisitColumn(schemaPath, tableName, field.getName())) {
-              records.add(new Records.Column(IS_CATALOG_NAME, schemaPath, tableName, field));
+      UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+      return ugi.doAs((PrivilegedAction<List<Records.Column>>) () -> {

Review comment:
       Does it mean that this handle always run as `doAs`? if don't enable the Impersonation feature?

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/HttpParamImpersonationQueryServerIT.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.queryserver.QueryServerOptions;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.client.Driver;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.drill.exec.store.phoenix.secured.QueryServerEnvironment.LOGIN_USER;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class HttpParamImpersonationQueryServerIT {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HttpParamImpersonationQueryServerIT.class);
+    public static QueryServerEnvironment environment;
+
+    private static final List<TableName> SYSTEM_TABLE_NAMES = Arrays.asList(
+        PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_FUNCTION_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_SCHEMA_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_HBASE_TABLE_NAME,
+        PhoenixDatabaseMetaData.SYSTEM_STATS_HBASE_TABLE_NAME);
+
+    public static synchronized void startQueryServerEnvironment() throws Exception {
+        //Clean up previous environment if any (Junit 4.13 @BeforeParam / @AfterParam would be an alternative)

Review comment:
       Missing a whitespace in front of the comment.

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -101,21 +126,26 @@ public Schema getSubSchema(String name) {
 
     @Override
     public Table getTable(String name) {
-      Table table = jdbcSchema.getTable(StringUtils.upperCase(name));
-      return table;
+      final UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+      return ugi.doAs((PrivilegedAction<Table>) () -> jdbcSchema.getTable(StringUtils.upperCase(name)));
     }
 
     @Override
     public Set<String> getTableNames() {
-      Set<String> tables = jdbcSchema.getTableNames();
-      return tables;
+      final UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+      return ugi.doAs((PrivilegedAction<Set<String>>) jdbcSchema::getTableNames);
     }
 
     @Override
     public String getTypeName() {
       return PhoenixStoragePluginConfig.NAME;
     }
 
+    @Override
+    public String getUser(String impersonated, String notImpersonated) {

Review comment:
       We can put this function after `areTableNamesCaseSensitive()`.

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
##########
@@ -71,58 +74,64 @@ public PhoenixBatchReader(PhoenixSubScan subScan) {
 
   @Override
   public boolean open(SchemaNegotiator negotiator) {
-    try {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+    return ugi.doAs((PrivilegedAction<Boolean>) () -> {
       errorContext = negotiator.parentErrorContext();
-      conn = subScan.getPlugin().getDataSource().getConnection();
-      pstmt = conn.prepareStatement(subScan.getSql());
-      results = pstmt.executeQuery();
-      meta = pstmt.getMetaData();
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to execute the phoenix sql query. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    try {
-      negotiator.tableSchema(defineMetadata(), true);
-      reader = new PhoenixReader(negotiator.build(), columns, results);
-      bindColumns(reader.getStorage());
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to get type of columns from metadata. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    watch = Stopwatch.createStarted();
-    return true;
+      try {
+        pstmt =
+          subScan.getPlugin().getDataSource(negotiator.userName()).getConnection().prepareStatement(subScan.getSql());

Review comment:
       ```suggestion
   DataSource ds = subScan.getPlugin().getDataSource(negotiator.userName());
   pstmt = ds.getConnection().prepareStatement(subScan.getSql());
   ```

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
##########
@@ -71,58 +74,64 @@ public PhoenixBatchReader(PhoenixSubScan subScan) {
 
   @Override
   public boolean open(SchemaNegotiator negotiator) {
-    try {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+    return ugi.doAs((PrivilegedAction<Boolean>) () -> {
       errorContext = negotiator.parentErrorContext();
-      conn = subScan.getPlugin().getDataSource().getConnection();
-      pstmt = conn.prepareStatement(subScan.getSql());
-      results = pstmt.executeQuery();
-      meta = pstmt.getMetaData();
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to execute the phoenix sql query. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    try {
-      negotiator.tableSchema(defineMetadata(), true);
-      reader = new PhoenixReader(negotiator.build(), columns, results);
-      bindColumns(reader.getStorage());
-    } catch (SQLException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to get type of columns from metadata. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
-    watch = Stopwatch.createStarted();
-    return true;
+      try {
+        pstmt =
+          subScan.getPlugin().getDataSource(negotiator.userName()).getConnection().prepareStatement(subScan.getSql());
+        results = pstmt.executeQuery();
+        meta = pstmt.getMetaData();
+      } catch (SQLException e) {
+        throw UserException
+          .dataReadError(e)
+          .message("Failed to execute the phoenix sql query. " + e.getMessage())
+          .addContext(negotiator.parentErrorContext())
+          .build(logger);
+      }
+      try {
+        negotiator.tableSchema(defineMetadata(), true);
+        reader = new PhoenixReader(negotiator.build(), columns, results);
+        bindColumns(reader.getStorage());
+      } catch (SQLException e) {
+        throw UserException
+          .dataReadError(e)
+          .message("Failed to get type of columns from metadata. " + e.getMessage())
+          .addContext(errorContext)
+          .build(logger);
+      }
+      watch = Stopwatch.createStarted();
+      return true;
+    });
   }
 
   @Override
   public boolean next() {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
     try {
-      while(!reader.getStorage().isFull()) {
-        if (!reader.processRow()) { // return true if one row is processed.
-          watch.stop();
-          logger.debug("Phoenix fetch total record numbers : {}", reader.getRowCount());
-          return false; // the EOF is reached.
+      return ugi.doAs((PrivilegedExceptionAction<Boolean>) () -> {

Review comment:
       Do we really need to `doAs()` in `ResultSet#next()`? For a large table, I'm concerned with the performance..

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -101,21 +126,26 @@ public Schema getSubSchema(String name) {
 
     @Override
     public Table getTable(String name) {
-      Table table = jdbcSchema.getTable(StringUtils.upperCase(name));
-      return table;
+      final UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();

Review comment:
       Is it possible to define the `ugi` variable in class scope? 

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -18,75 +18,100 @@
 package org.apache.drill.exec.store.phoenix;
 
 import java.io.IOException;
-import java.sql.Connection;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.sql.DataSource;
 
 import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.drill.exec.util.ImpersonationUtil.getProcessUserName;
 
 public class PhoenixSchemaFactory extends AbstractSchemaFactory {
 
   private final PhoenixStoragePlugin plugin;
   private final Map<String, PhoenixSchema> schemaMap;
   private PhoenixSchema rootSchema;
+  private final boolean isDrillImpersonationEnabled;
 
   public PhoenixSchemaFactory(PhoenixStoragePlugin plugin) {
     super(plugin.getName());
     this.plugin = plugin;
-    this.schemaMap = Maps.newHashMap();
+    this.schemaMap = new HashMap<>();
+    isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    rootSchema = new PhoenixSchema(plugin, Collections.emptyList(), plugin.getName());
-    locateSchemas();
+    try {
+      rootSchema = new PhoenixSchema(schemaConfig, plugin, Collections.emptyList(), plugin.getName());
+      String schemaUser = schemaConfig.getUserName();
+      locateSchemas(schemaConfig, rootSchema.getUser(schemaUser, getProcessUserName()));
+    } catch (SQLException e) {
+      throw new IOException(e);

Review comment:
       It recommended that :
   ```java
   public void registerSchemas(..) throws IOException {
     try {
       // ...
       locateSchemas(..);
     } catch (SQLException | InterruptedException e) {
       throw new IOException(e); // Catch the SQL and Interrupted EX
     }
   }
   
   private void locateSchemas(..) throws IOException, InterruptedException { // Throw EX directly here.
     // ...
   }
   ```

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
##########
@@ -39,44 +40,49 @@
  * is not always left in a healthy state by the previous user. It is better to
  * create new Phoenix Connections to ensure that you avoid any potential issues.
  */
+@Slf4j
 public class PhoenixDataSource implements DataSource {
 
   private static final String DEFAULT_URL_HEADER = "jdbc:phoenix:thin:url=http://";
   private static final String DEFAULT_SERIALIZATION = "serialization=PROTOBUF";
+  private static final String IMPERSONATED_USER_VARIABLE = "$user";
+  private static final String DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM = "doAs";
 
-  private String url;
+  private final String url;
   private Map<String, Object> connectionProperties;
-  private boolean isFatClient; // Is a fat client
-
-  public PhoenixDataSource(String url) {
-    Preconditions.checkNotNull(url);
-    this.url = url;
-  }
-
-  public PhoenixDataSource(String host, int port) {
-    Preconditions.checkNotNull(host);
-    Preconditions.checkArgument(port > 0, "Please set the correct port.");
-    this.url = new StringBuilder()
-        .append(DEFAULT_URL_HEADER)
-        .append(host)
-        .append(":")
-        .append(port)
-        .append(";")
-        .append(DEFAULT_SERIALIZATION)
-        .toString();
-  }
-
-  public PhoenixDataSource(String url, Map<String, Object> connectionProperties) {
-    this(url);
+  private boolean isFatClient;
+  private String user;
+
+  public PhoenixDataSource(String url,
+                           String userName,
+                           Map<String, Object> connectionProperties,
+                           boolean impersonationEnabled) {
+    Preconditions.checkNotNull(url, userName);
     Preconditions.checkNotNull(connectionProperties);
     connectionProperties.forEach((k, v)
         -> Preconditions.checkArgument(v != null, String.format("does not accept null values : %s", k)));
+    this.url = impersonationEnabled ? doAsUserUrl(url, userName) : url;
+    this.user = userName;
     this.connectionProperties = connectionProperties;
   }
 
-  public PhoenixDataSource(String host, int port, Map<String, Object> connectionProperties) {
-    this(host, port);
-    Preconditions.checkNotNull(connectionProperties);
+  public PhoenixDataSource(String host,
+                           int port,
+                           String userName,
+                           Map<String, Object> connectionProperties,
+                           boolean impersonationEnabled) {
+    Preconditions.checkNotNull(host, userName);
+    Preconditions.checkArgument(port > 0, "Please set the correct port.");
+    this.url = new StringBuilder()
+      .append(DEFAULT_URL_HEADER)
+      .append(host)
+      .append(":")
+      .append(port)
+      .append(impersonationEnabled ? "?doAs=" + userName : "")
+      .append(";")
+      .append(DEFAULT_SERIALIZATION)
+      .toString();
+    this.user = userName;
     connectionProperties.forEach((k, v)

Review comment:
       We can put the check of properties after the check of port.

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -18,75 +18,100 @@
 package org.apache.drill.exec.store.phoenix;
 
 import java.io.IOException;
-import java.sql.Connection;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.sql.DataSource;
 
 import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.drill.exec.util.ImpersonationUtil.getProcessUserName;
 
 public class PhoenixSchemaFactory extends AbstractSchemaFactory {
 
   private final PhoenixStoragePlugin plugin;
   private final Map<String, PhoenixSchema> schemaMap;
   private PhoenixSchema rootSchema;
+  private final boolean isDrillImpersonationEnabled;
 
   public PhoenixSchemaFactory(PhoenixStoragePlugin plugin) {
     super(plugin.getName());
     this.plugin = plugin;
-    this.schemaMap = Maps.newHashMap();
+    this.schemaMap = new HashMap<>();
+    isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    rootSchema = new PhoenixSchema(plugin, Collections.emptyList(), plugin.getName());
-    locateSchemas();
+    try {
+      rootSchema = new PhoenixSchema(schemaConfig, plugin, Collections.emptyList(), plugin.getName());
+      String schemaUser = schemaConfig.getUserName();
+      locateSchemas(schemaConfig, rootSchema.getUser(schemaUser, getProcessUserName()));
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
     parent.add(getName(), rootSchema); // resolve the top-level schema.
     for (String schemaName : rootSchema.getSubSchemaNames()) {
       PhoenixSchema schema = (PhoenixSchema) rootSchema.getSubSchema(schemaName);
       parent.add(schemaName, schema); // provide all available schemas for calcite.
     }
   }
 
-  private void locateSchemas() {
-    DataSource ds = plugin.getDataSource();
-    try (Connection conn = ds.getConnection();
-          ResultSet rs = ds.getConnection().getMetaData().getSchemas()) {
-      while (rs.next()) {
-        final String schemaName = rs.getString(1); // lookup the schema (or called database).
-        PhoenixSchema schema = new PhoenixSchema(plugin, Arrays.asList(getName()), schemaName);
-        schemaMap.put(schemaName, schema);
-      }
-      rootSchema.addSchemas(schemaMap);
-    } catch (SQLException e) {
-      throw new DrillRuntimeException(e.getMessage(), e);
+  private void locateSchemas(SchemaConfig schemaConfig, String userName) throws SQLException {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+    try {
+      ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        try (ResultSet rs = plugin.getDataSource(userName).getConnection().getMetaData().getSchemas()) {
+          while (rs.next()) {
+            final String schemaName = rs.getString(1); // lookup the schema (or called database).
+            PhoenixSchema schema = new PhoenixSchema(schemaConfig, plugin, Arrays.asList(getName()), schemaName);
+            schemaMap.put(schemaName, schema);
+          }
+          rootSchema.addSchemas(schemaMap);
+        }
+        return null;
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new SQLException(e);
     }
   }
 
-  protected static class PhoenixSchema extends AbstractSchema {
+  @Override
+  public boolean needToImpersonateReadingData() {

Review comment:
       Is it possible to using `IsDrillImpersonationEnabled` directly as the function name?

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
##########
@@ -169,12 +174,23 @@ private void useDriverClass() throws SQLException {
    */
   private Properties useConfProperties() {
     Properties props = new Properties();
-    props.put("phoenix.trace.frequency", "never");
-    props.put("phoenix.query.timeoutMs", 30000);
-    props.put("phoenix.query.keepAliveMs", 120000);
     if (getConnectionProperties() != null) {
       props.putAll(getConnectionProperties());
     }
+    props.putIfAbsent("phoenix.trace.frequency", "never");
+    props.putIfAbsent("phoenix.query.timeoutMs", 30000);
+    props.putIfAbsent("phoenix.query.keepAliveMs", 120000);
     return props;
   }
+
+  private String doAsUserUrl(String url, String userName) {
+    if (url.contains(DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM)) {
+      return url.replace(IMPERSONATED_USER_VARIABLE, userName);
+    } else {
+      throw UserException
+        .connectionError()
+        .message("Invalid PQS URL. Please add `doAs=$user` parameter value in case Drill Impersonation enabled")

Review comment:
       ```suggestion
           .message("Invalid PQS URL. Please add the value of the `doAs=$user` parameter if Impersonation is enabled.")
   ```

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
##########
@@ -62,8 +77,12 @@ public StoragePluginConfig getConfig() {
     return config;
   }
 
-  public DataSource getDataSource() {
-    return dataSource;
+  public PhoenixDataSource getDataSource(String userName) throws SQLException {

Review comment:
       We can put this function after `close()`, before `createDataSource()`.

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -18,75 +18,100 @@
 package org.apache.drill.exec.store.phoenix;
 
 import java.io.IOException;
-import java.sql.Connection;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.sql.DataSource;
 
 import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.drill.exec.util.ImpersonationUtil.getProcessUserName;
 
 public class PhoenixSchemaFactory extends AbstractSchemaFactory {
 
   private final PhoenixStoragePlugin plugin;
   private final Map<String, PhoenixSchema> schemaMap;
   private PhoenixSchema rootSchema;
+  private final boolean isDrillImpersonationEnabled;
 
   public PhoenixSchemaFactory(PhoenixStoragePlugin plugin) {
     super(plugin.getName());
     this.plugin = plugin;
-    this.schemaMap = Maps.newHashMap();
+    this.schemaMap = new HashMap<>();
+    isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    rootSchema = new PhoenixSchema(plugin, Collections.emptyList(), plugin.getName());
-    locateSchemas();
+    try {
+      rootSchema = new PhoenixSchema(schemaConfig, plugin, Collections.emptyList(), plugin.getName());
+      String schemaUser = schemaConfig.getUserName();
+      locateSchemas(schemaConfig, rootSchema.getUser(schemaUser, getProcessUserName()));
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
     parent.add(getName(), rootSchema); // resolve the top-level schema.
     for (String schemaName : rootSchema.getSubSchemaNames()) {
       PhoenixSchema schema = (PhoenixSchema) rootSchema.getSubSchema(schemaName);
       parent.add(schemaName, schema); // provide all available schemas for calcite.
     }
   }
 
-  private void locateSchemas() {
-    DataSource ds = plugin.getDataSource();
-    try (Connection conn = ds.getConnection();
-          ResultSet rs = ds.getConnection().getMetaData().getSchemas()) {
-      while (rs.next()) {
-        final String schemaName = rs.getString(1); // lookup the schema (or called database).
-        PhoenixSchema schema = new PhoenixSchema(plugin, Arrays.asList(getName()), schemaName);
-        schemaMap.put(schemaName, schema);
-      }
-      rootSchema.addSchemas(schemaMap);
-    } catch (SQLException e) {
-      throw new DrillRuntimeException(e.getMessage(), e);
+  private void locateSchemas(SchemaConfig schemaConfig, String userName) throws SQLException {
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+    try {
+      ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        try (ResultSet rs = plugin.getDataSource(userName).getConnection().getMetaData().getSchemas()) {
+          while (rs.next()) {
+            final String schemaName = rs.getString(1); // lookup the schema (or called database).
+            PhoenixSchema schema = new PhoenixSchema(schemaConfig, plugin, Arrays.asList(getName()), schemaName);
+            schemaMap.put(schemaName, schema);
+          }
+          rootSchema.addSchemas(schemaMap);
+        }
+        return null;
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new SQLException(e);
     }
   }
 
-  protected static class PhoenixSchema extends AbstractSchema {
+  @Override
+  public boolean needToImpersonateReadingData() {
+    return isDrillImpersonationEnabled;
+  }
 
+  class PhoenixSchema extends AbstractSchema {
+    private final SchemaConfig schemaConfig;

Review comment:
       The value of the field `schemaConfig` is not used, can be removed.

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixBaseTest.java
##########
@@ -38,51 +38,63 @@
 
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
 import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.LoggerFactory;
 
 import com.univocity.parsers.csv.CsvParser;
 import com.univocity.parsers.csv.CsvParserSettings;
 
-public class PhoenixBaseTest extends ClusterTest {
+public abstract class PhoenixBaseTest extends ClusterTest {

Review comment:
       We can remove the `abstract` if we do not define any abstract function.

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixBaseTest.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import ch.qos.logback.classic.Level;
+import com.sun.security.auth.module.Krb5LoginModule;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
+import org.apache.drill.exec.rpc.security.kerberos.KerberosFactory;
+import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
+import org.apache.drill.exec.server.TestDrillbitResilience;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.phoenix.PhoenixStoragePluginConfig;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createSampleData;
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createSchema;
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createTables;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.environment;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.getUrlTemplate;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.grantUsersToGlobalPhoenixUserTables;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.grantUsersToPhoenixSystemTables;
+import static org.apache.drill.exec.store.phoenix.secured.SecuredPhoenixTestSuite.initPhoenixQueryServer;
+
+@Slf4j
+public abstract class SecuredPhoenixBaseTest extends ClusterTest {
+  protected static LogFixture logFixture;
+  private final static Level CURRENT_LOG_LEVEL = Level.INFO;
+
+  private final static AtomicInteger initCount = new AtomicInteger(0);
+
+  @BeforeAll
+  public static void setUpBeforeClass() throws Exception {
+    TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+    initPhoenixQueryServer();
+    bootSecuredDrillMiniCluster();
+    initializeDatabase();
+  }
+
+  private static void bootSecuredDrillMiniCluster() throws Exception {
+    logFixture = LogFixture.builder()
+      .toConsole()
+      .logger(QueryServerEnvironment.class, CURRENT_LOG_LEVEL)
+      .logger(SecuredPhoenixBaseTest.class, CURRENT_LOG_LEVEL)
+      .logger(KerberosFactory.class, CURRENT_LOG_LEVEL)
+      .logger(Krb5LoginModule.class, CURRENT_LOG_LEVEL)
+      .logger(QueryServer.class, CURRENT_LOG_LEVEL)
+      .logger(ServerAuthenticationHandler.class, CURRENT_LOG_LEVEL)
+      .build();
+
+    Map.Entry<String, File> user1 = environment.getUser(1);
+    Map.Entry<String, File> user2 = environment.getUser(2);
+    Map.Entry<String, File> user3 = environment.getUser(3);
+
+    dirTestWatcher.start(TestDrillbitResilience.class); // until DirTestWatcher ClassRule is implemented for JUnit5
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+        .configProperty(ExecConstants.USER_AUTHENTICATION_ENABLED, true)
+        .configProperty(ExecConstants.USER_AUTHENTICATOR_IMPL, UserAuthenticatorTestImpl.TYPE)
+        .configNonStringProperty(ExecConstants.AUTHENTICATION_MECHANISMS, Lists.newArrayList("kerberos"))
+        .configProperty(ExecConstants.IMPERSONATION_ENABLED, true)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_ENABLED, true)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_MECHANISM, "kerberos")
+        .configProperty(ExecConstants.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting())
+        .configProperty(ExecConstants.SERVICE_KEYTAB_LOCATION, environment.getServiceKeytab().getAbsolutePath())
+        .configClientProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting())
+        .configClientProperty(DrillProperties.USER, user1.getKey())
+        .configClientProperty(DrillProperties.KEYTAB, user1.getValue().getAbsolutePath());
+    startCluster(builder);
+    Properties user2ClientProperties = new Properties();
+    user2ClientProperties.setProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting());
+    user2ClientProperties.setProperty(DrillProperties.USER, user2.getKey());
+    user2ClientProperties.setProperty(DrillProperties.KEYTAB, user2.getValue().getAbsolutePath());
+    cluster.addClientFixture(user2ClientProperties);
+    Properties user3ClientProperties = new Properties();
+    user3ClientProperties.setProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting());
+    user3ClientProperties.setProperty(DrillProperties.USER, user3.getKey());
+    user3ClientProperties.setProperty(DrillProperties.KEYTAB, user3.getValue().getAbsolutePath());
+    cluster.addClientFixture(user3ClientProperties);
+
+    Map<String, Object> phoenixProps = new HashMap<>();
+    phoenixProps.put("phoenix.query.timeoutMs", 90000);
+    phoenixProps.put("phoenix.query.keepAliveMs", "30000");
+    phoenixProps.put("phoenix.queryserver.withRemoteUserExtractor", true);
+    StoragePluginRegistry registry = cluster.drillbit().getContext().getStorage();
+    final String doAsUrl = String.format(getUrlTemplate(), "$user");
+    logger.debug("Phoenix Query Server URL: {}", environment.getPqsUrl());
+    PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, 0, null, null,
+      doAsUrl, null, phoenixProps);
+    config.setEnabled(true);
+    registry.put(PhoenixStoragePluginConfig.NAME + "123", config);
+  }
+
+
+  /**
+   * Initialize HBase via Phoenix
+   */
+  private static void initializeDatabase() throws Exception {
+    dirTestWatcher.copyResourceToRoot(Paths.get(""));
+    if (initCount.incrementAndGet() == 1) {
+      final Map.Entry<String, File> user1 = environment.getUser(1);
+      final Map.Entry<String, File> user2 = environment.getUser(2);
+      // Build the JDBC URL by hand with the doAs
+      final UserGroupInformation serviceUgi = ImpersonationUtil.getProcessUserUGI();
+      serviceUgi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        logger.debug("Phoenix conn url: {}", environment.getPqsUrl());

Review comment:
       It has been printed once in the above code and can be removed here.

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/QueryServerEnvironment.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.security.PrivilegedAction;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.ThinClientUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Due to this bug https://bugzilla.redhat.com/show_bug.cgi?id=668830 We need to use
+ * `localhost.localdomain` as host name when running these tests on Jenkins (Centos) but for Mac OS
+ * it should be `localhost` to pass. The reason is kerberos principals in this tests are looked up
+ * from /etc/hosts and a reverse DNS lookup of 127.0.0.1 is resolved to `localhost.localdomain`
+ * rather than `localhost` on Centos. KDC sees `localhost` != `localhost.localdomain` and as the
+ * result test fails with authentication error. It's also important to note these principals are
+ * shared between HDFs and HBase in this mini HBase cluster. Some more reading
+ * https://access.redhat.com/solutions/57330
+ */
+public class QueryServerEnvironment {
+  private static final Logger LOG = LoggerFactory.getLogger(QueryServerEnvironment.class);
+
+  private final File TEMP_DIR = new File(getTempDir());
+  private final File KEYTAB_DIR = new File(TEMP_DIR, "keytabs");
+  private final List<File> USER_KEYTAB_FILES = new ArrayList<>();
+
+  private static final String LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  static final String LOGIN_USER;
+
+  static {
+    try {
+       System.setProperty("sun.security.krb5.debug", "true");

Review comment:
       There is an extra whitespace.

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/QueryServerEnvironment.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.security.PrivilegedAction;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.ThinClientUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Due to this bug https://bugzilla.redhat.com/show_bug.cgi?id=668830 We need to use
+ * `localhost.localdomain` as host name when running these tests on Jenkins (Centos) but for Mac OS
+ * it should be `localhost` to pass. The reason is kerberos principals in this tests are looked up
+ * from /etc/hosts and a reverse DNS lookup of 127.0.0.1 is resolved to `localhost.localdomain`
+ * rather than `localhost` on Centos. KDC sees `localhost` != `localhost.localdomain` and as the
+ * result test fails with authentication error. It's also important to note these principals are
+ * shared between HDFs and HBase in this mini HBase cluster. Some more reading
+ * https://access.redhat.com/solutions/57330
+ */
+public class QueryServerEnvironment {
+  private static final Logger LOG = LoggerFactory.getLogger(QueryServerEnvironment.class);
+
+  private final File TEMP_DIR = new File(getTempDir());
+  private final File KEYTAB_DIR = new File(TEMP_DIR, "keytabs");
+  private final List<File> USER_KEYTAB_FILES = new ArrayList<>();
+
+  private static final String LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  static final String LOGIN_USER;
+
+  static {
+    try {
+       System.setProperty("sun.security.krb5.debug", "true");
+      LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME = InetAddress.getByName("127.0.0.1").getCanonicalHostName();
+      String userName = System.getProperty("user.name");
+      LOGIN_USER = userName != null ? userName : "securecluster";
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static final String SPNEGO_PRINCIPAL = "HTTP/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private static final String PQS_PRINCIPAL = "phoenixqs/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private static final String SERVICE_PRINCIPAL = LOGIN_USER + "/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private File KEYTAB;
+
+  private MiniKdc KDC;
+  private HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private LocalHBaseCluster HBASE_CLUSTER;
+  private int NUM_CREATED_USERS;
+
+  private ExecutorService PQS_EXECUTOR;
+  private QueryServer PQS;
+  private int PQS_PORT;
+  private String PQS_URL;
+
+  private boolean tls;
+
+  private static String getTempDir() {
+    StringBuilder sb = new StringBuilder(32);
+    sb.append(System.getProperty("user.dir")).append(File.separator);
+    sb.append("target").append(File.separator);
+    sb.append(QueryServerEnvironment.class.getSimpleName());
+    sb.append("-").append(UUID.randomUUID());
+    return sb.toString();
+  }
+
+  public int getPqsPort() {
+    return PQS_PORT;
+  }
+
+  public String getPqsUrl() {
+    return PQS_URL;
+  }
+
+  public boolean getTls() {
+    return tls;
+  }
+
+  public HBaseTestingUtility getUtil() {
+    return UTIL;
+  }
+
+  public String getServicePrincipal() {
+    return SERVICE_PRINCIPAL;
+  }
+
+  public File getServiceKeytab() {
+    return KEYTAB;
+  }
+
+  private static void updateDefaultRealm() throws Exception {
+    // (at least) one other phoenix test triggers the caching of this field before the KDC is up
+    // which causes principal parsing to fail.
+    Field f = KerberosName.class.getDeclaredField("defaultRealm");
+    f.setAccessible(true);
+    // Default realm for MiniKDC
+    f.set(null, "EXAMPLE.COM");
+  }
+
+  private void createUsers(int numUsers) throws Exception {
+    assertNotNull("KDC is null, was setup method called?", KDC);
+    NUM_CREATED_USERS = numUsers;
+    for (int i = 1; i <= numUsers; i++) {
+      String principal = "user" + i;
+      File keytabFile = new File(KEYTAB_DIR, principal + ".keytab");
+      KDC.createPrincipal(keytabFile, principal);
+      USER_KEYTAB_FILES.add(keytabFile);
+    }
+  }
+
+  public Map.Entry<String, File> getUser(int offset) {
+    if (!(offset > 0 && offset <= NUM_CREATED_USERS)) {
+      throw new IllegalArgumentException();
+    }
+    return new AbstractMap.SimpleImmutableEntry<String, File>("user" + offset, USER_KEYTAB_FILES.get(offset - 1));
+  }
+
+  /**
+   * Setup the security configuration for hdfs.
+   */
+  private void setHdfsSecuredConfiguration(Configuration conf) throws Exception {
+    // Set principal+keytab configuration for HDFS
+    conf.set(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
+      SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY,
+      SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
+      SPNEGO_PRINCIPAL + "@" + KDC.getRealm());
+    // Enable token access for HDFS blocks
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    // Only use HTTPS (required because we aren't using "secure" ports)
+    conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+    // Bind on localhost for spnego to have a chance at working
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+
+    // Generate SSL certs
+    File keystoresDir = new File(UTIL.getDataTestDir("keystore").toUri().getPath());
+    keystoresDir.mkdirs();
+    String sslConfDir = TlsUtil.getClasspathDir(QueryServerEnvironment.class);
+    TlsUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, conf, false);
+
+    // Magic flag to tell hdfs to not fail on using ports above 1024
+    conf.setBoolean("ignore.secure.ports.for.testing", true);
+  }
+
+  private static void ensureIsEmptyDirectory(File f) throws IOException {
+    if (f.exists()) {
+      if (f.isDirectory()) {
+        FileUtils.deleteDirectory(f);
+      } else {
+        assertTrue("Failed to delete keytab directory", f.delete());
+      }
+    }
+    assertTrue("Failed to create keytab directory", f.mkdirs());
+  }
+
+  /**
+   * Setup and start kerberosed, hbase
+   * @throws Exception
+   */
+  public QueryServerEnvironment(final Configuration confIn, int numberOfUsers, boolean tls)

Review comment:
       Is it possible to organise the order of functions?
   First all private (or public) functions, then public (or private) functions, the constructor is as front as possible.
   
   ![image](https://user-images.githubusercontent.com/50079619/148733342-c9596cfd-9c9f-477d-9f8b-6d15c6c5bb3a.png)

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java
##########
@@ -66,7 +69,11 @@ protected void validateFrom(SqlNode node, RelDataType targetRowType, SqlValidato
           }
       }
     }
-    super.validateFrom(node, targetRowType, scope);
+    UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+      ugi.doAs((PrivilegedAction<Void>) () -> {

Review comment:
       Adjust indentation of code.

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/TlsUtil.java
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.bouncycastle.x509.X509V1CertificateGenerator;

Review comment:
       The type `X509V1CertificateGenerator` is deprecated. I suggest to do an update, or create a new issue for newcomers.
   And, the code indentation (usually two spaces) for this class seems incongruous and needs to be updated.

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixBaseTest.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import ch.qos.logback.classic.Level;
+import com.sun.security.auth.module.Krb5LoginModule;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
+import org.apache.drill.exec.rpc.security.kerberos.KerberosFactory;
+import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
+import org.apache.drill.exec.server.TestDrillbitResilience;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.phoenix.PhoenixStoragePluginConfig;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createSampleData;
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createSchema;
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createTables;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.environment;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.getUrlTemplate;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.grantUsersToGlobalPhoenixUserTables;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.grantUsersToPhoenixSystemTables;
+import static org.apache.drill.exec.store.phoenix.secured.SecuredPhoenixTestSuite.initPhoenixQueryServer;
+
+@Slf4j
+public abstract class SecuredPhoenixBaseTest extends ClusterTest {
+  protected static LogFixture logFixture;
+  private final static Level CURRENT_LOG_LEVEL = Level.INFO;
+
+  private final static AtomicInteger initCount = new AtomicInteger(0);
+
+  @BeforeAll
+  public static void setUpBeforeClass() throws Exception {
+    TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+    initPhoenixQueryServer();
+    bootSecuredDrillMiniCluster();
+    initializeDatabase();
+  }
+
+  private static void bootSecuredDrillMiniCluster() throws Exception {
+    logFixture = LogFixture.builder()
+      .toConsole()
+      .logger(QueryServerEnvironment.class, CURRENT_LOG_LEVEL)
+      .logger(SecuredPhoenixBaseTest.class, CURRENT_LOG_LEVEL)
+      .logger(KerberosFactory.class, CURRENT_LOG_LEVEL)
+      .logger(Krb5LoginModule.class, CURRENT_LOG_LEVEL)
+      .logger(QueryServer.class, CURRENT_LOG_LEVEL)
+      .logger(ServerAuthenticationHandler.class, CURRENT_LOG_LEVEL)
+      .build();
+
+    Map.Entry<String, File> user1 = environment.getUser(1);
+    Map.Entry<String, File> user2 = environment.getUser(2);
+    Map.Entry<String, File> user3 = environment.getUser(3);
+
+    dirTestWatcher.start(TestDrillbitResilience.class); // until DirTestWatcher ClassRule is implemented for JUnit5
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+        .configProperty(ExecConstants.USER_AUTHENTICATION_ENABLED, true)
+        .configProperty(ExecConstants.USER_AUTHENTICATOR_IMPL, UserAuthenticatorTestImpl.TYPE)
+        .configNonStringProperty(ExecConstants.AUTHENTICATION_MECHANISMS, Lists.newArrayList("kerberos"))
+        .configProperty(ExecConstants.IMPERSONATION_ENABLED, true)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_ENABLED, true)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_MECHANISM, "kerberos")
+        .configProperty(ExecConstants.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting())
+        .configProperty(ExecConstants.SERVICE_KEYTAB_LOCATION, environment.getServiceKeytab().getAbsolutePath())
+        .configClientProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting())
+        .configClientProperty(DrillProperties.USER, user1.getKey())
+        .configClientProperty(DrillProperties.KEYTAB, user1.getValue().getAbsolutePath());
+    startCluster(builder);
+    Properties user2ClientProperties = new Properties();
+    user2ClientProperties.setProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting());
+    user2ClientProperties.setProperty(DrillProperties.USER, user2.getKey());
+    user2ClientProperties.setProperty(DrillProperties.KEYTAB, user2.getValue().getAbsolutePath());
+    cluster.addClientFixture(user2ClientProperties);
+    Properties user3ClientProperties = new Properties();
+    user3ClientProperties.setProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting());
+    user3ClientProperties.setProperty(DrillProperties.USER, user3.getKey());
+    user3ClientProperties.setProperty(DrillProperties.KEYTAB, user3.getValue().getAbsolutePath());
+    cluster.addClientFixture(user3ClientProperties);
+
+    Map<String, Object> phoenixProps = new HashMap<>();
+    phoenixProps.put("phoenix.query.timeoutMs", 90000);
+    phoenixProps.put("phoenix.query.keepAliveMs", "30000");
+    phoenixProps.put("phoenix.queryserver.withRemoteUserExtractor", true);
+    StoragePluginRegistry registry = cluster.drillbit().getContext().getStorage();
+    final String doAsUrl = String.format(getUrlTemplate(), "$user");
+    logger.debug("Phoenix Query Server URL: {}", environment.getPqsUrl());
+    PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, 0, null, null,
+      doAsUrl, null, phoenixProps);
+    config.setEnabled(true);
+    registry.put(PhoenixStoragePluginConfig.NAME + "123", config);
+  }
+
+
+  /**
+   * Initialize HBase via Phoenix
+   */
+  private static void initializeDatabase() throws Exception {
+    dirTestWatcher.copyResourceToRoot(Paths.get(""));
+    if (initCount.incrementAndGet() == 1) {
+      final Map.Entry<String, File> user1 = environment.getUser(1);
+      final Map.Entry<String, File> user2 = environment.getUser(2);
+      // Build the JDBC URL by hand with the doAs
+      final UserGroupInformation serviceUgi = ImpersonationUtil.getProcessUserUGI();
+      serviceUgi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        logger.debug("Phoenix conn url: {}", environment.getPqsUrl());
+        createSchema(environment.getPqsUrl());
+        createTables(environment.getPqsUrl());
+        createSampleData(environment.getPqsUrl());
+        grantUsersToPhoenixSystemTables(Arrays.asList(user1.getKey(), user2.getKey()));
+        grantUsersToGlobalPhoenixUserTables(Arrays.asList(user1.getKey()));
+        return null;
+      });
+    }
+  }
+
+  protected interface TestWrapper {
+    void apply() throws Exception;
+  }
+
+  public void runForThreeClients(SecuredPhoenixSQLTest.TestWrapper wrapper) throws Exception {
+    runForThreeClients(wrapper, UserRemoteException.class, RuntimeException.class);
+  }
+
+  /**
+   * @param wrapper actual test case execution
+   * @param user2ExpectedException the expected Exception for user2, which can be impersonated, but hasn't permissions to the tables
+   * @param user3ExpectedException the expected Exception for user3, isn't impersonated
+   */
+  public void runForThreeClients(SecuredPhoenixSQLTest.TestWrapper wrapper, Class user2ExpectedException, Class user3ExpectedException) throws Exception {

Review comment:
       There are a few yellow warnings here.
   ```suggestion
     @SuppressWarnings({ "unchecked", "rawtypes" })
     public void runForThreeClients(SecuredPhoenixSQLTest.TestWrapper wrapper, Class user2ExpectedException, Class user3ExpectedException) throws Exception {
   ```
   

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
##########
@@ -138,21 +138,12 @@ public SaslClient createSaslClient(final UserGroupInformation ugi, final Map<Str
 
     // ignore parts[2]; GSSAPI gets the realm info from the ticket
     try {
-      final SaslClient saslClient = ugi.doAs(new PrivilegedExceptionAction<SaslClient>() {
-
-        @Override
-        public SaslClient run() throws Exception {
-          return FastSaslClientFactory.getInstance().createSaslClient(new String[]{KerberosUtil.KERBEROS_SASL_NAME},
-              null /** authorization ID */, serviceName, serviceHostName, properties,
-              new CallbackHandler() {
-                @Override
-                public void handle(final Callback[] callbacks)
-                    throws IOException, UnsupportedCallbackException {
-                  throw new UnsupportedCallbackException(callbacks[0]);
-                }
-              });
-        }
-      });
+      final SaslClient saslClient = ugi.doAs((PrivilegedExceptionAction<SaslClient>) () ->
+        FastSaslClientFactory.getInstance().createSaslClient(new String[]{KerberosUtil.KERBEROS_SASL_NAME},

Review comment:
       ```suggestion
           FastSaslClientFactory.getInstance().createSaslClient(new String[] { KerberosUtil.KERBEROS_SASL_NAME },
   ```

##########
File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
##########
@@ -18,75 +18,100 @@
 package org.apache.drill.exec.store.phoenix;
 
 import java.io.IOException;
-import java.sql.Connection;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.sql.DataSource;
 
 import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.drill.exec.util.ImpersonationUtil.getProcessUserName;
 
 public class PhoenixSchemaFactory extends AbstractSchemaFactory {
 
   private final PhoenixStoragePlugin plugin;
   private final Map<String, PhoenixSchema> schemaMap;
   private PhoenixSchema rootSchema;
+  private final boolean isDrillImpersonationEnabled;
 
   public PhoenixSchemaFactory(PhoenixStoragePlugin plugin) {
     super(plugin.getName());
     this.plugin = plugin;
-    this.schemaMap = Maps.newHashMap();
+    this.schemaMap = new HashMap<>();
+    isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    rootSchema = new PhoenixSchema(plugin, Collections.emptyList(), plugin.getName());
-    locateSchemas();
+    try {
+      rootSchema = new PhoenixSchema(schemaConfig, plugin, Collections.emptyList(), plugin.getName());
+      String schemaUser = schemaConfig.getUserName();
+      locateSchemas(schemaConfig, rootSchema.getUser(schemaUser, getProcessUserName()));

Review comment:
       ```suggestion
         locateSchemas(schemaConfig, rootSchema.getUser(schemaConfig.getUserName(), getProcessUserName()));
   ```

##########
File path: contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixDataTypeTest.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.U_U_I_D;
+import static org.apache.drill.test.rowSet.RowSetUtilities.boolArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.byteArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.doubleArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.longArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.shortArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+
+@Tag(SlowTest.TAG)
+@Tag(RowSetTests.TAG)
+public class SecuredPhoenixDataTypeTest extends SecuredPhoenixBaseTest {
+
+  @Test
+  public void testDataType() throws Exception {
+    runForThreeClients(this::doTestDataType);
+  }
+
+  public void doTestDataType() throws Exception {

Review comment:
       ```suggestion
     private void doTestDataType() throws Exception {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org