You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by lu...@apache.org on 2022/01/29 02:37:18 UTC

[drill] branch master updated: DRILL-8061: Add Impersonation Support for Phoenix (#2422)

This is an automated email from the ASF dual-hosted git repository.

luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 2decae1  DRILL-8061: Add Impersonation Support for Phoenix (#2422)
2decae1 is described below

commit 2decae18b85eeda51816e92d5a9e9e6e2f9ce8d5
Author: Vitalii Diravka <vi...@apache.org>
AuthorDate: Sat Jan 29 04:36:42 2022 +0200

    DRILL-8061: Add Impersonation Support for Phoenix (#2422)
---
 .../org/apache/drill/common/PlanStringBuilder.java |   2 +-
 .../drill/common/config/DrillProperties.java       |   3 +
 .../org/apache/drill/categories/RowSetTests.java   |   9 +-
 .../exec/store/hive/schema/HiveSchemaFactory.java  |  21 +-
 contrib/storage-phoenix/README.md                  |  37 +-
 contrib/storage-phoenix/pom.xml                    |  96 ++++-
 .../exec/store/phoenix/PhoenixBatchReader.java     |  56 ++-
 .../exec/store/phoenix/PhoenixDataSource.java      | 106 +++---
 .../drill/exec/store/phoenix/PhoenixGroupScan.java |  14 +-
 .../drill/exec/store/phoenix/PhoenixReader.java    |   7 +-
 .../exec/store/phoenix/PhoenixSchemaFactory.java   |  81 +++--
 .../exec/store/phoenix/PhoenixStoragePlugin.java   |  95 +++--
 .../drill/exec/store/phoenix/PhoenixSubScan.java   |  12 +-
 .../exec/store/phoenix/rules/PhoenixPrel.java      |  11 +-
 .../drill/exec/store/phoenix/PhoenixBaseTest.java  |  44 ++-
 .../exec/store/phoenix/PhoenixCommandTest.java     |   1 +
 .../drill/exec/store/phoenix/PhoenixTestSuite.java |   6 +-
 .../exec/store/phoenix/QueryServerBasicsIT.java    |   5 +
 .../exec/store/phoenix/QueryServerThread.java      |  45 ---
 .../HttpParamImpersonationQueryServerIT.java       | 115 ++++++
 .../phoenix/secured/QueryServerEnvironment.java    | 367 +++++++++++++++++++
 .../phoenix/secured/SecuredPhoenixBaseTest.java    | 188 ++++++++++
 .../SecuredPhoenixCommandTest.java}                |  55 ++-
 .../secured/SecuredPhoenixDataTypeTest.java        | 136 +++++++
 .../phoenix/secured/SecuredPhoenixSQLTest.java     | 390 +++++++++++++++++++++
 .../SecuredPhoenixTestSuite.java}                  |  56 +--
 .../apache/drill/exec/ops/OperatorContextImpl.java |   7 +-
 .../drill/exec/physical/base/PhysicalOperator.java |   1 -
 .../drill/exec/physical/impl/ImplCreator.java      |  23 +-
 .../planner/sql/conversion/DrillValidator.java     |  16 +-
 .../exec/planner/sql/conversion/SqlConverter.java  |   9 +-
 .../rpc/security/kerberos/KerberosFactory.java     |  21 +-
 .../exec/rpc/user/InboundImpersonationManager.java |  11 +-
 .../drill/exec/rpc/user/UserConnectionConfig.java  |  12 +-
 .../apache/drill/exec/store/AbstractSchema.java    |  17 +
 .../drill/exec/store/ischema/RecordCollector.java  |  10 +
 .../apache/drill/exec/util/ImpersonationUtil.java  |   4 +-
 .../drill/exec/work/fragment/FragmentExecutor.java |  33 +-
 .../drill/exec/rpc/security/KerberosHelper.java    |   2 +-
 .../java/org/apache/drill/test/ClientFixture.java  |   5 +
 .../java/org/apache/drill/test/ClusterFixture.java |  23 ++
 .../apache/drill/test/ClusterFixtureBuilder.java   |  13 +-
 pom.xml                                            |  29 ++
 43 files changed, 1817 insertions(+), 377 deletions(-)

diff --git a/common/src/main/java/org/apache/drill/common/PlanStringBuilder.java b/common/src/main/java/org/apache/drill/common/PlanStringBuilder.java
index ef04a9a..c4f58fd 100644
--- a/common/src/main/java/org/apache/drill/common/PlanStringBuilder.java
+++ b/common/src/main/java/org/apache/drill/common/PlanStringBuilder.java
@@ -79,7 +79,7 @@ public class PlanStringBuilder {
   public PlanStringBuilder field(String key, Object value) {
     if (value != null) {
       startField(key);
-      buf.append(value.toString());
+      buf.append(value);
     }
     return this;
   }
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillProperties.java b/common/src/main/java/org/apache/drill/common/config/DrillProperties.java
index 4df66f9..7726808 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillProperties.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillProperties.java
@@ -46,6 +46,9 @@ public final class DrillProperties extends Properties {
 
   public static final String PASSWORD = "password";
 
+  /**
+   * Impersonation target name for Drill Inbound Impersonation
+   */
   public static final String IMPERSONATION_TARGET = "impersonation_target";
 
   public static final String AUTH_MECHANISM = "auth";
diff --git a/common/src/test/java/org/apache/drill/categories/RowSetTests.java b/common/src/test/java/org/apache/drill/categories/RowSetTests.java
index 98ad675..eb83006 100644
--- a/common/src/test/java/org/apache/drill/categories/RowSetTests.java
+++ b/common/src/test/java/org/apache/drill/categories/RowSetTests.java
@@ -18,9 +18,12 @@
 package org.apache.drill.categories;
 
 /**
- * A category for tests that test the RowSet, ResultSetLoader
- * and related mechanisms.
+ * Junit category marker. <br>
+ * A category for tests that test the RowSet, ResultSetLoader and related mechanisms.
  */
 public interface RowSetTests {
-  // Junit category marker
+  /**
+   * tag for JUnit5
+   */
+  String TAG = "row-set-tests";
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
index 36e32bd..e82274e 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
@@ -95,22 +95,11 @@ public class HiveSchemaFactory extends AbstractSchemaFactory {
   }
 
   /**
-   * Does Drill needs to impersonate as user connected to Drill when reading data from Hive warehouse location?
-   * @return True when both Drill impersonation and Hive impersonation are enabled.
-   */
-  private boolean needToImpersonateReadingData() {
-    return isDrillImpersonationEnabled && isHS2DoAsSet;
-  }
-
-  /**
    * Close this schema factory in preparation for retrying. Attempt to close
    * connections, but just ignore any errors.
    */
   public void close() {
-    AutoCloseables.closeSilently(
-        processUserMetastoreClient::close,
-        metaStoreClientLoadingCache::invalidateAll
-    );
+    AutoCloseables.closeSilently(processUserMetastoreClient, metaStoreClientLoadingCache::invalidateAll);
   }
 
   @Override
@@ -220,7 +209,8 @@ public class HiveSchemaFactory extends AbstractSchemaFactory {
           : new DrillHiveTable(getName(), plugin, getUser(schemaUser, getProcessUserName()), entry);
     }
 
-    private String getUser(String impersonated, String notImpersonated) {
+    @Override
+    public String getUser(String impersonated, String notImpersonated) {
       return needToImpersonateReadingData() ? impersonated : notImpersonated;
     }
 
@@ -245,6 +235,11 @@ public class HiveSchemaFactory extends AbstractSchemaFactory {
     public String getTypeName() {
       return HiveStoragePluginConfig.NAME;
     }
+
+    @Override
+    public boolean needToImpersonateReadingData() {
+      return isDrillImpersonationEnabled && isHS2DoAsSet;
+    }
   }
 
 }
diff --git a/contrib/storage-phoenix/README.md b/contrib/storage-phoenix/README.md
index ab178b3..01278b5 100644
--- a/contrib/storage-phoenix/README.md
+++ b/contrib/storage-phoenix/README.md
@@ -11,32 +11,24 @@
 Features :
 
  - Full support for Enhanced Vector Framework.
- 
  - Tested in phoenix 4.14 and 5.1.2.
- 
  - Support the array data type.
- 
  - Support the pushdown (Project, Limit, Filter, Aggregate, Join, CrossJoin, Join_Filter, GroupBy, Distinct and more).
- 
  - Use the PQS client (6.0).
 
 Related Information :
 
  1. PHOENIX-6398: Returns uniform SQL dialect in calcite for the PQS
-
  2. PHOENIX-6582: Bump default HBase version to 2.3.7 and 2.4.8
-
  3. PHOENIX-6605, PHOENIX-6606 and PHOENIX-6607.
-
  4. DRILL-8060, DRILL-8061 and DRILL-8062.
-
  5. [QueryServer 6.0.0-drill-r1](https://github.com/luocooong/phoenix-queryserver/releases/tag/6.0.0-drill-r1)
 
 ## Configuration
 
  To connect Drill to Phoenix, create a new storage plugin with the following configuration :
 
-Option 1 (Use the host and port) :
+Option 1 (Use the host and port):
 
 ```json
 {
@@ -74,7 +66,7 @@ Use the connection properties :
 Tips :
  * More connection properties, see also [PQS Configuration](http://phoenix.apache.org/server.html).
  * If you provide the `jdbcURL`, the connection will ignore the value of `host` and `port`.
- * If you extended the authentication of QueryServer, you can also pass the `userName` and `password`.
+ * If you [extended the authentication of QueryServer](https://github.com/Boostport/avatica/issues/28), you can also pass the `userName` and `password`.
 
 ```json
 {
@@ -87,6 +79,16 @@ Tips :
 }
 ```
 
+### Impersonation
+Configurations :
+1. Enable [Drill User Impersonation](https://drill.apache.org/docs/configuring-user-impersonation/)
+2. Enable [PQS Impersonation](https://phoenix.apache.org/server.html#Impersonation)
+3. PQS URL:
+  1. Provide `host` and `port` and Drill will generate the PQS URL with a doAs parameter of current session user
+  2. Provide the `jdbcURL` with a `doAs` url param and `$user` placeholder as a value, for instance:
+     `jdbc:phoenix:thin:url=http://localhost:8765?doAs=$user`. In case Drill Impersonation is enabled, but `doAs=$user`
+     is missing the User Exception is thrown.
+
 ## Testing
 
  The test framework of phoenix queryserver required the Hadoop 3, but exist `PHOENIX-5993` and `HBASE-22394` :
@@ -101,10 +103,23 @@ requires a recompilation of HBase because of incompatible changes between Hadoop
 
  1. Download HBase 2.4.2 sources and rebuild with Hadoop 3.
 
- 2. Remove the `Ignore` annotation in `PhoenixTestSuite.java`.
+    ```mvn clean install -DskipTests -Dhadoop.profile=3.0 -Dhadoop-three.version=3.2.2```
 
+ 2. Remove the `Ignore` annotation in `PhoenixTestSuite.java`.
+    
+  ```
+    @Ignore
+    @Category({ SlowTest.class })
+    public class PhoenixTestSuite extends ClusterTest {
+   ```
+    
  3. Go to the phoenix root folder and run test.
 
+  ```
+  cd contrib/storage-phoenix/
+  mvn test
+  ```
+
 ### To Add Features
 
  - Don't forget to add a test function to the test class.
diff --git a/contrib/storage-phoenix/pom.xml b/contrib/storage-phoenix/pom.xml
index 5d85575..277641e 100644
--- a/contrib/storage-phoenix/pom.xml
+++ b/contrib/storage-phoenix/pom.xml
@@ -33,6 +33,7 @@
     <phoenix.version>5.1.2</phoenix.version>
     <!-- Keep the 2.4.2 to reduce dependency conflict -->
     <hbase.minicluster.version>2.4.2</hbase.minicluster.version>
+    <jetty.test.version>9.4.31.v20200723</jetty.test.version>
   </properties>
   
   <dependencies>
@@ -92,6 +93,14 @@
       </exclusions>
     </dependency>
     <dependency>
+      <!-- PHOENIX-6605 -->
+      <groupId>com.github.luocooong.phoenix-queryserver</groupId>
+      <artifactId>phoenix-queryserver-it</artifactId>
+      <version>6.0.0-drill-r1</version>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+    <dependency>
       <groupId>org.apache.phoenix</groupId>
       <artifactId>phoenix-core</artifactId>
       <version>${phoenix.version}</version>
@@ -110,7 +119,7 @@
           <artifactId>commons-logging</artifactId>
           <groupId>commons-logging</groupId>
         </exclusion>
-         <exclusion>
+        <exclusion>
           <groupId>javax.servlet</groupId>
           <artifactId>*</artifactId>
         </exclusion>
@@ -143,6 +152,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.kerby</groupId>
+      <artifactId>kerb-core</artifactId>
+      <version>1.0.1</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-it</artifactId>
       <version>${hbase.minicluster.version}</version>
@@ -188,17 +203,96 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-asyncfs</artifactId>
+      <type>test-jar</type>
+      <version>${hbase.minicluster.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs-client</artifactId>
       <version>${hadoop.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <version>${hbase.minicluster.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>com.univocity</groupId>
       <artifactId>univocity-parsers</artifactId>
       <version>${univocity-parsers.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+      <version>${jetty.test.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-http</artifactId>
+      <version>${jetty.test.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlet</artifactId>
+      <version>${jetty.test.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
index c56369b..db48603 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.store.phoenix;
 
-import java.sql.Connection;
+import java.security.PrivilegedAction;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
@@ -30,6 +30,7 @@ import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
@@ -48,17 +49,23 @@ import org.apache.drill.exec.store.phoenix.PhoenixReader.GenericDateDefn;
 import org.apache.drill.exec.store.phoenix.PhoenixReader.GenericDefn;
 import org.apache.drill.exec.store.phoenix.PhoenixReader.GenericTimeDefn;
 import org.apache.drill.exec.store.phoenix.PhoenixReader.GenericTimestampDefn;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.LoggerFactory;
 
+import javax.sql.DataSource;
+
+
 public class PhoenixBatchReader implements ManagedReader<SchemaNegotiator> {
 
   private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PhoenixBatchReader.class);
 
   private final PhoenixSubScan subScan;
+  private final boolean impersonationEnabled;
+  private final UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
   private CustomErrorContext errorContext;
   private PhoenixReader reader;
-  private Connection conn;
   private PreparedStatement pstmt;
   private ResultSet results;
   private ResultSetMetaData meta;
@@ -67,22 +74,29 @@ public class PhoenixBatchReader implements ManagedReader<SchemaNegotiator> {
 
   public PhoenixBatchReader(PhoenixSubScan subScan) {
     this.subScan = subScan;
+    this.impersonationEnabled = subScan.getPlugin().getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   @Override
   public boolean open(SchemaNegotiator negotiator) {
+    return impersonationEnabled
+      ? ugi.doAs((PrivilegedAction<Boolean>) () -> processOpen(negotiator))
+      : processOpen(negotiator);
+  }
+
+  private boolean processOpen(SchemaNegotiator negotiator) {
     try {
       errorContext = negotiator.parentErrorContext();
-      conn = subScan.getPlugin().getDataSource().getConnection();
-      pstmt = conn.prepareStatement(subScan.getSql());
+      DataSource ds = subScan.getPlugin().getDataSource(negotiator.userName());
+      pstmt = ds.getConnection().prepareStatement(subScan.getSql());
       results = pstmt.executeQuery();
       meta = pstmt.getMetaData();
     } catch (SQLException e) {
       throw UserException
-              .dataReadError(e)
-              .message("Failed to execute the phoenix sql query. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
+        .dataReadError(e)
+        .message("Failed to execute the phoenix sql query. " + e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
     }
     try {
       negotiator.tableSchema(defineMetadata(), true);
@@ -90,10 +104,10 @@ public class PhoenixBatchReader implements ManagedReader<SchemaNegotiator> {
       bindColumns(reader.getStorage());
     } catch (SQLException e) {
       throw UserException
-              .dataReadError(e)
-              .message("Failed to get type of columns from metadata. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
+        .dataReadError(e)
+        .message("Failed to get type of columns from metadata. " + e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
     }
     watch = Stopwatch.createStarted();
     return true;
@@ -101,8 +115,14 @@ public class PhoenixBatchReader implements ManagedReader<SchemaNegotiator> {
 
   @Override
   public boolean next() {
+    return impersonationEnabled
+      ? ugi.doAs((PrivilegedAction<Boolean>) this::processNext)
+      : processNext();
+  }
+
+  private boolean processNext() {
     try {
-      while(!reader.getStorage().isFull()) {
+      while (!reader.getStorage().isFull()) {
         if (!reader.processRow()) { // return true if one row is processed.
           watch.stop();
           logger.debug("Phoenix fetch total record numbers : {}", reader.getRowCount());
@@ -112,17 +132,17 @@ public class PhoenixBatchReader implements ManagedReader<SchemaNegotiator> {
       return true; // batch full but not reached the EOF.
     } catch (SQLException e) {
       throw UserException
-              .dataReadError(e)
-              .message("Failed to get the data from the result set. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
+        .dataReadError(e)
+        .message("Failed to get the data from the result set. " + e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
     }
   }
 
   @Override
   public void close() {
     logger.debug("Phoenix fetch batch size : {}, took {} ms. ", reader.getBatchCount(), watch.elapsed(TimeUnit.MILLISECONDS));
-    AutoCloseables.closeSilently(results, pstmt, conn);
+    AutoCloseables.closeSilently(results, pstmt, reader);
   }
 
   private TupleMetadata defineMetadata() throws SQLException {
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
index 29512a3..ed8c670 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
@@ -21,13 +21,14 @@ import java.io.PrintWriter;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
 import java.util.Map;
 import java.util.Properties;
 import java.util.logging.Logger;
 
 import javax.sql.DataSource;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 /**
@@ -39,46 +40,51 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
  * is not always left in a healthy state by the previous user. It is better to
  * create new Phoenix Connections to ensure that you avoid any potential issues.
  */
+@Slf4j
 public class PhoenixDataSource implements DataSource {
 
   private static final String DEFAULT_URL_HEADER = "jdbc:phoenix:thin:url=http://";
   private static final String DEFAULT_SERIALIZATION = "serialization=PROTOBUF";
+  private static final String IMPERSONATED_USER_VARIABLE = "$user";
+  private static final String DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM = "doAs";
 
-  private String url;
+  private final String url;
+  private final String user;
   private Map<String, Object> connectionProperties;
-  private boolean isFatClient; // Is a fat client
+  private boolean isFatClient;
 
-  public PhoenixDataSource(String url) {
-    Preconditions.checkNotNull(url);
-    this.url = url;
-  }
-
-  public PhoenixDataSource(String host, int port) {
-    Preconditions.checkNotNull(host);
-    Preconditions.checkArgument(port > 0, "Please set the correct port.");
-    this.url = new StringBuilder()
-        .append(DEFAULT_URL_HEADER)
-        .append(host)
-        .append(":")
-        .append(port)
-        .append(";")
-        .append(DEFAULT_SERIALIZATION)
-        .toString();
-  }
-
-  public PhoenixDataSource(String url, Map<String, Object> connectionProperties) {
-    this(url);
+  public PhoenixDataSource(String url,
+                           String userName,
+                           Map<String, Object> connectionProperties,
+                           boolean impersonationEnabled) {
+    Preconditions.checkNotNull(url, userName);
     Preconditions.checkNotNull(connectionProperties);
     connectionProperties.forEach((k, v)
         -> Preconditions.checkArgument(v != null, String.format("does not accept null values : %s", k)));
+    this.url = impersonationEnabled ? doAsUserUrl(url, userName) : url;
+    this.user = userName;
     this.connectionProperties = connectionProperties;
   }
 
-  public PhoenixDataSource(String host, int port, Map<String, Object> connectionProperties) {
-    this(host, port);
-    Preconditions.checkNotNull(connectionProperties);
+  public PhoenixDataSource(String host,
+                           int port,
+                           String userName,
+                           Map<String, Object> connectionProperties,
+                           boolean impersonationEnabled) {
+    Preconditions.checkNotNull(host, userName);
+    Preconditions.checkArgument(port > 0, "Please set the correct port.");
     connectionProperties.forEach((k, v)
-        -> Preconditions.checkArgument(v != null, String.format("does not accept null values : %s", k)));
+      -> Preconditions.checkArgument(v != null, String.format("does not accept null values : %s", k)));
+    this.url = new StringBuilder()
+      .append(DEFAULT_URL_HEADER)
+      .append(host)
+      .append(":")
+      .append(port)
+      .append(impersonationEnabled ? "?doAs=" + userName : "")
+      .append(";")
+      .append(DEFAULT_SERIALIZATION)
+      .toString();
+    this.user = userName;
     this.connectionProperties = connectionProperties;
   }
 
@@ -91,27 +97,27 @@ public class PhoenixDataSource implements DataSource {
   }
 
   @Override
-  public PrintWriter getLogWriter() throws SQLException {
+  public PrintWriter getLogWriter() {
     throw new UnsupportedOperationException("getLogWriter");
   }
 
   @Override
-  public void setLogWriter(PrintWriter out) throws SQLException {
+  public void setLogWriter(PrintWriter out) {
     throw new UnsupportedOperationException("setLogWriter");
   }
 
   @Override
-  public void setLoginTimeout(int seconds) throws SQLException {
+  public void setLoginTimeout(int seconds) {
     throw new UnsupportedOperationException("setLoginTimeout");
   }
 
   @Override
-  public int getLoginTimeout() throws SQLException {
+  public int getLoginTimeout() {
     return 0;
   }
 
   @Override
-  public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+  public Logger getParentLogger() {
     throw new UnsupportedOperationException("getParentLogger");
   }
 
@@ -126,22 +132,21 @@ public class PhoenixDataSource implements DataSource {
   }
 
   @Override
-  public boolean isWrapperFor(Class<?> iface) throws SQLException {
+  public boolean isWrapperFor(Class<?> iface) {
     return iface.isInstance(this);
   }
 
   @Override
   public Connection getConnection() throws SQLException {
     useDriverClass();
-    Connection conn = DriverManager.getConnection(url, useConfProperties());
-    return conn;
+    return getConnection(this.user, null);
   }
 
   @Override
-  public Connection getConnection(String username, String password) throws SQLException {
+  public Connection getConnection(String userName, String password) throws SQLException {
     useDriverClass();
-    Connection conn = DriverManager.getConnection(url, username, password);
-    return conn;
+    logger.debug("Drill/Phoenix connection url: {}", url);
+    return DriverManager.getConnection(url, useConfProperties());
   }
 
   /**
@@ -150,12 +155,12 @@ public class PhoenixDataSource implements DataSource {
    *
    * @throws SQLException
    */
-  private void useDriverClass() throws SQLException {
+  public Class<?> useDriverClass() throws SQLException {
     try {
-      if (!isFatClient) {
-        Class.forName(PhoenixStoragePluginConfig.THIN_DRIVER_CLASS);
+      if (isFatClient) {
+        return Class.forName(PhoenixStoragePluginConfig.FAT_DRIVER_CLASS);
       } else {
-        Class.forName(PhoenixStoragePluginConfig.FAT_DRIVER_CLASS);
+        return Class.forName(PhoenixStoragePluginConfig.THIN_DRIVER_CLASS);
       }
     } catch (ClassNotFoundException e) {
       throw new SQLException("Cause by : " + e.getMessage());
@@ -169,12 +174,23 @@ public class PhoenixDataSource implements DataSource {
    */
   private Properties useConfProperties() {
     Properties props = new Properties();
-    props.put("phoenix.trace.frequency", "never");
-    props.put("phoenix.query.timeoutMs", 30000);
-    props.put("phoenix.query.keepAliveMs", 120000);
     if (getConnectionProperties() != null) {
       props.putAll(getConnectionProperties());
     }
+    props.putIfAbsent("phoenix.trace.frequency", "never");
+    props.putIfAbsent("phoenix.query.timeoutMs", 30000);
+    props.putIfAbsent("phoenix.query.keepAliveMs", 120000);
     return props;
   }
+
+  private String doAsUserUrl(String url, String userName) {
+    if (url.contains(DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM)) {
+      return url.replace(IMPERSONATED_USER_VARIABLE, userName);
+    } else {
+      throw UserException
+        .connectionError()
+        .message("Invalid PQS URL. Please add the value of the `doAs=$user` parameter if Impersonation is enabled.")
+        .build(logger);
+    }
+  }
 }
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java
index ae79570..6045a3d 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java
@@ -50,20 +50,21 @@ public class PhoenixGroupScan extends AbstractGroupScan {
 
   @JsonCreator
   public PhoenixGroupScan(
+      @JsonProperty("userName") String userName,
       @JsonProperty("sql") String sql,
       @JsonProperty("columns") List<SchemaPath> columns,
       @JsonProperty("scanSpec") PhoenixScanSpec scanSpec,
       @JsonProperty("config") PhoenixStoragePluginConfig config,
       @JacksonInject StoragePluginRegistry plugins) {
-    super("no-user");
+    super(userName);
     this.sql = sql;
     this.columns = columns;
     this.scanSpec = scanSpec;
     this.plugin = plugins.resolve(config, PhoenixStoragePlugin.class);
   }
 
-  public PhoenixGroupScan(PhoenixScanSpec scanSpec, PhoenixStoragePlugin plugin) {
-    super("no-user");
+  public PhoenixGroupScan(String userName, PhoenixScanSpec scanSpec, PhoenixStoragePlugin plugin) {
+    super(userName);
     this.sql = scanSpec.getSql();
     this.columns = ALL_COLUMNS;
     this.scanSpec = scanSpec;
@@ -86,8 +87,9 @@ public class PhoenixGroupScan extends AbstractGroupScan {
     this.plugin = scan.plugin;
   }
 
-  public PhoenixGroupScan(String sql, List<SchemaPath> columns, PhoenixScanSpec scanSpec, PhoenixStoragePlugin plugin) {
-    super("no-user");
+  public PhoenixGroupScan(String user, String sql, List<SchemaPath> columns, PhoenixScanSpec scanSpec,
+                          PhoenixStoragePlugin plugin) {
+    super(user);
     this.sql = sql;
     this.columns = columns;
     this.scanSpec = scanSpec;
@@ -124,7 +126,7 @@ public class PhoenixGroupScan extends AbstractGroupScan {
 
   @Override
   public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
-    return new PhoenixSubScan(sql, columns, scanSpec, plugin);
+    return new PhoenixSubScan(userName, sql, columns, scanSpec, plugin);
   }
 
   @Override
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixReader.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixReader.java
index 7e2e6d3..0ecc4ae 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixReader.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixReader.java
@@ -38,7 +38,7 @@ import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.writer.ScalarArrayWriter;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 
-public class PhoenixReader {
+public class PhoenixReader implements AutoCloseable {
 
   private final RowSetLoader writer;
   private final ColumnDefn[] columns;
@@ -104,6 +104,11 @@ public class PhoenixReader {
     COLUMN_TYPE_MAP.put(Types.BOOLEAN, MinorType.BIT);
   }
 
+  @Override
+  public void close() throws Exception {
+    results.close();
+  }
+
   protected abstract static class ColumnDefn {
 
     final String name;
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
index af82ea8..5c012bc 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
@@ -18,45 +18,63 @@
 package org.apache.drill.exec.store.phoenix;
 
 import java.io.IOException;
-import java.sql.Connection;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.sql.DataSource;
 
 import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.drill.exec.util.ImpersonationUtil.getProcessUserName;
 
 public class PhoenixSchemaFactory extends AbstractSchemaFactory {
 
   private final PhoenixStoragePlugin plugin;
   private final Map<String, PhoenixSchema> schemaMap;
   private PhoenixSchema rootSchema;
+  private final boolean isDrillImpersonationEnabled;
+  private final UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
 
   public PhoenixSchemaFactory(PhoenixStoragePlugin plugin) {
     super(plugin.getName());
     this.plugin = plugin;
-    this.schemaMap = Maps.newHashMap();
+    this.schemaMap = new HashMap<>();
+    isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    rootSchema = new PhoenixSchema(plugin, Collections.emptyList(), plugin.getName());
-    locateSchemas();
+    try {
+      rootSchema = new PhoenixSchema(schemaConfig, plugin, Collections.emptyList(), plugin.getName());
+      if (isDrillImpersonationEnabled) {
+        ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+          locateSchemas(schemaConfig, rootSchema.getUser(schemaConfig.getUserName(), getProcessUserName()));
+          return null;
+        });
+      } else {
+        locateSchemas(schemaConfig, rootSchema.getUser(schemaConfig.getUserName(), getProcessUserName()));
+      }
+    } catch (SQLException | InterruptedException e) {
+      throw new IOException(e);
+    }
     parent.add(getName(), rootSchema); // resolve the top-level schema.
     for (String schemaName : rootSchema.getSubSchemaNames()) {
       PhoenixSchema schema = (PhoenixSchema) rootSchema.getSubSchema(schemaName);
@@ -64,29 +82,28 @@ public class PhoenixSchemaFactory extends AbstractSchemaFactory {
     }
   }
 
-  private void locateSchemas() {
-    DataSource ds = plugin.getDataSource();
-    try (Connection conn = ds.getConnection();
-          ResultSet rs = ds.getConnection().getMetaData().getSchemas()) {
+  private void locateSchemas(SchemaConfig schemaConfig, String userName) throws SQLException {
+    try (ResultSet rs = plugin.getDataSource(userName).getConnection().getMetaData().getSchemas()) {
       while (rs.next()) {
         final String schemaName = rs.getString(1); // lookup the schema (or called database).
-        PhoenixSchema schema = new PhoenixSchema(plugin, Arrays.asList(getName()), schemaName);
+        PhoenixSchema schema = new PhoenixSchema(schemaConfig, plugin, Arrays.asList(getName()), schemaName);
         schemaMap.put(schemaName, schema);
       }
       rootSchema.addSchemas(schemaMap);
-    } catch (SQLException e) {
-      throw new DrillRuntimeException(e.getMessage(), e);
     }
   }
 
-  protected static class PhoenixSchema extends AbstractSchema {
-
+  class PhoenixSchema extends AbstractSchema {
     private final JdbcSchema jdbcSchema;
     private final Map<String, PhoenixSchema> schemaMap = CaseInsensitiveMap.newHashMap();
 
-    public PhoenixSchema(PhoenixStoragePlugin plugin, List<String> parentSchemaPath, String schemaName) {
+    public PhoenixSchema(SchemaConfig schemaConfig,
+                         PhoenixStoragePlugin plugin,
+                         List<String> parentSchemaPath,
+                         String schemaName) throws SQLException {
       super(parentSchemaPath, schemaName);
-      this.jdbcSchema = new JdbcSchema(plugin.getDataSource(), plugin.getDialect(), plugin.getConvention(), null, schemaName);
+      this.jdbcSchema = new JdbcSchema(plugin.getDataSource(schemaConfig.getUserName()), plugin.getDialect(),
+        plugin.getConvention(), null, schemaName);
     }
 
     @Override
@@ -101,14 +118,16 @@ public class PhoenixSchemaFactory extends AbstractSchemaFactory {
 
     @Override
     public Table getTable(String name) {
-      Table table = jdbcSchema.getTable(StringUtils.upperCase(name));
-      return table;
+      return isDrillImpersonationEnabled
+        ? ugi.doAs((PrivilegedAction<Table>) () -> jdbcSchema.getTable(StringUtils.upperCase(name)))
+        : jdbcSchema.getTable(StringUtils.upperCase(name));
     }
 
     @Override
     public Set<String> getTableNames() {
-      Set<String> tables = jdbcSchema.getTableNames();
-      return tables;
+      return isDrillImpersonationEnabled
+        ? ugi.doAs((PrivilegedAction<Set<String>>) jdbcSchema::getTableNames)
+        : jdbcSchema.getTableNames();
     }
 
     @Override
@@ -116,13 +135,23 @@ public class PhoenixSchemaFactory extends AbstractSchemaFactory {
       return PhoenixStoragePluginConfig.NAME;
     }
 
-    public void addSchemas(Map<String, PhoenixSchema> schemas) {
-      schemaMap.putAll(schemas);
-    }
-
     @Override
     public boolean areTableNamesCaseSensitive() {
       return false;
     }
+
+    @Override
+    public String getUser(String impersonated, String notImpersonated) {
+      return needToImpersonateReadingData() ? impersonated : notImpersonated;
+    }
+
+    @Override
+    public boolean needToImpersonateReadingData() {
+      return isDrillImpersonationEnabled;
+    }
+
+    public void addSchemas(Map<String, PhoenixSchema> schemas) {
+      schemaMap.putAll(schemas);
+    }
   }
 }
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
index 15b4103..b46fef2 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
@@ -18,43 +18,63 @@
 package org.apache.drill.exec.store.phoenix;
 
 import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.sql.SQLException;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
-import javax.sql.DataSource;
-
-import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlDialect;
-import org.apache.calcite.sql.SqlDialectFactoryImpl;
+import org.apache.calcite.sql.dialect.PhoenixSqlDialect;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.phoenix.rules.PhoenixConvention;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import org.apache.hadoop.security.UserGroupInformation;
 
 public class PhoenixStoragePlugin extends AbstractStoragePlugin {
 
   private final PhoenixStoragePluginConfig config;
-  private final DataSource dataSource;
   private final SqlDialect dialect;
   private final PhoenixConvention convention;
   private final PhoenixSchemaFactory schemaFactory;
+  private final boolean impersonationEnabled;
+  private final LoadingCache<String, PhoenixDataSource> CACHE = CacheBuilder.newBuilder()
+    .maximumSize(5) // Up to 5 clients for impersonation-enabled.
+    .expireAfterAccess(10, TimeUnit.MINUTES)
+    .build(new CacheLoader<String, PhoenixDataSource>() {
+      @Override
+      public PhoenixDataSource load(String userName) {
+        UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+        return impersonationEnabled
+          ? ugi.doAs((PrivilegedAction<PhoenixDataSource>) () -> createDataSource(userName))
+          : createDataSource(userName);
+      }
+    });
 
   public PhoenixStoragePlugin(PhoenixStoragePluginConfig config, DrillbitContext context, String name) {
     super(context, name);
     this.config = config;
-    this.dataSource = initNoPoolingDataSource(config);
-    this.dialect = JdbcSchema.createDialect(SqlDialectFactoryImpl.INSTANCE, dataSource);
+    this.dialect = PhoenixSqlDialect.DEFAULT;
     this.convention = new PhoenixConvention(dialect, name, this);
     this.schemaFactory = new PhoenixSchemaFactory(this);
+    this.impersonationEnabled = context.getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
   @Override
@@ -62,18 +82,6 @@ public class PhoenixStoragePlugin extends AbstractStoragePlugin {
     return config;
   }
 
-  public DataSource getDataSource() {
-    return dataSource;
-  }
-
-  public SqlDialect getDialect() {
-    return dialect;
-  }
-
-  public PhoenixConvention getConvention() {
-    return convention;
-  }
-
   @Override
   public boolean supportsRead() {
     return true;
@@ -91,25 +99,42 @@ public class PhoenixStoragePlugin extends AbstractStoragePlugin {
 
   @Override
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
-    PhoenixScanSpec scanSpec = selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<PhoenixScanSpec>() {});
-    return new PhoenixGroupScan(scanSpec, this);
+    PhoenixScanSpec scanSpec =
+      selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<PhoenixScanSpec>() {});
+    return new PhoenixGroupScan(userName, scanSpec, this);
   }
 
-  private static DataSource initNoPoolingDataSource(PhoenixStoragePluginConfig config) {
-    // Don't use the pool with the connection
-    PhoenixDataSource dataSource = null;
-    if (StringUtils.isNotBlank(config.getJdbcURL())) {
-      dataSource = new PhoenixDataSource(config.getJdbcURL(), config.getProps()); // the props is initiated.
-    } else {
-      dataSource = new PhoenixDataSource(config.getHost(), config.getPort(), config.getProps());
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(CACHE::invalidateAll);
+  }
+
+  public SqlDialect getDialect() {
+    return dialect;
+  }
+
+  public PhoenixConvention getConvention() {
+    return convention;
+  }
+
+  public PhoenixDataSource getDataSource(String userName) throws SQLException {
+    try {
+      return CACHE.get(userName);
+    } catch (final ExecutionException e) {
+      throw new SQLException("Failure setting up Phoenix DataSource (PQS client)", e);
     }
+  }
+
+  private PhoenixDataSource createDataSource(String userName) {
+    // Don't use the pool with the connection
+    Map<String, Object> props = config.getProps();
     if (config.getUsername() != null && config.getPassword() != null) {
-      if (dataSource.getConnectionProperties() == null) {
-        dataSource.setConnectionProperties(Maps.newHashMap());
-      }
-      dataSource.getConnectionProperties().put("user", config.getUsername());
-      dataSource.getConnectionProperties().put("password", config.getPassword());
+      props.put("user", config.getUsername());
+      props.put("password", config.getPassword());
     }
-    return dataSource;
+    boolean impersonationEnabled = context.getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
+    return StringUtils.isNotBlank(config.getJdbcURL())
+      ? new PhoenixDataSource(config.getJdbcURL(), userName, props, impersonationEnabled) // the props is initiated.
+      : new PhoenixDataSource(config.getHost(), config.getPort(), userName, props, impersonationEnabled);
   }
 }
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSubScan.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSubScan.java
index 2c81d4e..a57d5e3 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSubScan.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSubScan.java
@@ -21,7 +21,6 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractBase;
@@ -47,20 +46,21 @@ public class PhoenixSubScan extends AbstractBase implements SubScan {
 
   @JsonCreator
   public PhoenixSubScan(
+      @JsonProperty("userName") String userName,
       @JsonProperty("sql") String sql,
       @JsonProperty("columns") List<SchemaPath> columns,
       @JsonProperty("scanSpec") PhoenixScanSpec scanSpec,
       @JsonProperty("config") StoragePluginConfig config,
       @JacksonInject StoragePluginRegistry registry) {
-    super("no-user");
+    super(userName);
     this.sql = sql;
     this.columns = columns;
     this.scanSpec = scanSpec;
     this.plugin = registry.resolve(config, PhoenixStoragePlugin.class);
   }
 
-  public PhoenixSubScan(String sql, List<SchemaPath> columns, PhoenixScanSpec scanSpec, PhoenixStoragePlugin plugin) {
-    super("no-user");
+  public PhoenixSubScan(String userName, String sql, List<SchemaPath> columns, PhoenixScanSpec scanSpec, PhoenixStoragePlugin plugin) {
+    super(userName);
     this.sql = sql;
     this.columns = columns;
     this.scanSpec = scanSpec;
@@ -98,8 +98,8 @@ public class PhoenixSubScan extends AbstractBase implements SubScan {
   }
 
   @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
-    return new PhoenixSubScan(sql, columns, scanSpec, plugin);
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    return new PhoenixSubScan(userName, sql, columns, scanSpec, plugin);
   }
 
   @Override
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixPrel.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixPrel.java
index 725a93b..47a71e6 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixPrel.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixPrel.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.phoenix.rules;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -67,15 +66,15 @@ public class PhoenixPrel extends AbstractRelNode implements Prel {
   }
 
   @Override
-  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator)
-      throws IOException {
-    List<SchemaPath> schemaPaths = new ArrayList<SchemaPath>(rowType.getFieldCount());
-    List<String> columns = new ArrayList<String>(rowType.getFieldCount());
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) {
+    List<SchemaPath> schemaPaths = new ArrayList<>(rowType.getFieldCount());
+    List<String> columns = new ArrayList<>(rowType.getFieldCount());
     for (String col : rowType.getFieldNames()) {
       schemaPaths.add(SchemaPath.getSimplePath(col));
       columns.add(SchemaPath.getSimplePath(col).rootName());
     }
-    PhoenixGroupScan output = new PhoenixGroupScan(sql, schemaPaths, new PhoenixScanSpec(sql, columns, rows), convention.getPlugin());
+    PhoenixGroupScan output = new PhoenixGroupScan(creator.getContext().getQueryUserName(), sql, schemaPaths,
+      new PhoenixScanSpec(sql, columns, rows), convention.getPlugin());
     return creator.addMetadata(this, output);
   }
 
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixBaseTest.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixBaseTest.java
index fd41b70..9aea261 100644
--- a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixBaseTest.java
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixBaseTest.java
@@ -38,9 +38,11 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
 import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.LoggerFactory;
 
@@ -51,38 +53,48 @@ public class PhoenixBaseTest extends ClusterTest {
 
   private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PhoenixBaseTest.class);
 
-  private static AtomicInteger initCount = new AtomicInteger(0);
-  protected static String U_U_I_D = UUID.randomUUID().toString();
+  public final static String U_U_I_D = UUID.randomUUID().toString();
+  private final static AtomicInteger initCount = new AtomicInteger(0);
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+    PhoenixTestSuite.initPhoenixQueryServer();
     if (PhoenixTestSuite.isRunningSuite()) {
       QueryServerBasicsIT.testCatalogs();
     }
-    bootMiniCluster();
+    startDrillCluster();
     if (initCount.incrementAndGet() == 1) {
-      createSchema();
-      createTables();
-      createSampleData();
+      createSchema(QueryServerBasicsIT.CONN_STRING);
+      createTables(QueryServerBasicsIT.CONN_STRING);
+      createSampleData(QueryServerBasicsIT.CONN_STRING);
     }
   }
 
-  public static void bootMiniCluster() throws Exception {
-    ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher);
+  @AfterClass
+  public static void tearDownCluster() throws Exception {
+    if (!PhoenixTestSuite.isRunningSuite()) {
+      PhoenixTestSuite.tearDownCluster();
+    }
+  }
+
+  public static void startDrillCluster() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
     startCluster(builder);
-    dirTestWatcher.copyResourceToRoot(Paths.get(""));
     Map<String, Object> props = Maps.newHashMap();
     props.put("phoenix.query.timeoutMs", 90000);
     props.put("phoenix.query.keepAliveMs", "30000");
     StoragePluginRegistry registry = cluster.drillbit().getContext().getStorage();
-    PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, 0, null, null, QueryServerBasicsIT.CONN_STRING, null, props);
+    PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, 0, null, null,
+      QueryServerBasicsIT.CONN_STRING, null, props);
     config.setEnabled(true);
     registry.put(PhoenixStoragePluginConfig.NAME + "123", config);
+    dirTestWatcher.copyResourceToRoot(Paths.get(""));
   }
 
-  public static void createSchema() throws Exception {
-    try (final Connection connection = DriverManager.getConnection(QueryServerBasicsIT.CONN_STRING)) {
+  public static void createSchema(String connString) throws Exception {
+    try (final Connection connection = DriverManager.getConnection(connString)) {
+      logger.debug("Phoenix connection established with the specified url : {}", connString);
       assertFalse(connection.isClosed());
       connection.setAutoCommit(true);
       try (final Statement stmt = connection.createStatement()) {
@@ -91,8 +103,8 @@ public class PhoenixBaseTest extends ClusterTest {
     }
   }
 
-  public static void createTables() throws Exception {
-    try (final Connection connection = DriverManager.getConnection(QueryServerBasicsIT.CONN_STRING)) {
+  public static void createTables(String connString) throws Exception {
+    try (final Connection connection = DriverManager.getConnection(connString)) {
       assertFalse(connection.isClosed());
       connection.setAutoCommit(true);
       try (final Statement stmt = connection.createStatement()) {
@@ -149,7 +161,7 @@ public class PhoenixBaseTest extends ClusterTest {
     }
   }
 
-  public static void createSampleData() throws Exception {
+  public static void createSampleData(String connString) throws Exception {
     final String[] paths = new String[] { "data/region.tbl", "data/nation.tbl" };
     final String[] sqls = new String[] {
         "UPSERT INTO V1.REGION VALUES(?,?,?)",
@@ -163,7 +175,7 @@ public class PhoenixBaseTest extends ClusterTest {
     logger.info("Loading the .tbl file : " + Arrays.toString(paths));
 
     List<String[]> allRows = parseTblFile(String.valueOf(region_path));
-    try (final Connection connection = DriverManager.getConnection(QueryServerBasicsIT.CONN_STRING)) {
+    try (final Connection connection = DriverManager.getConnection(connString)) {
       assertFalse(connection.isClosed());
       connection.setAutoCommit(false);
       // region table
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixCommandTest.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixCommandTest.java
index 4fa5ac5..43fc645 100644
--- a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixCommandTest.java
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixCommandTest.java
@@ -39,6 +39,7 @@ public class PhoenixCommandTest extends PhoenixBaseTest {
 
   @Test
   public void testShowTablesLike() throws Exception {
+    runAndPrint("SHOW SCHEMAS");
     run("USE phoenix123.v1");
     assertEquals(1, queryBuilder().sql("SHOW TABLES LIKE '%REGION%'").run().recordCount());
   }
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixTestSuite.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixTestSuite.java
index 1817b76..a0f19ff 100644
--- a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixTestSuite.java
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixTestSuite.java
@@ -21,7 +21,7 @@ import java.util.TimeZone;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.categories.SlowTest;
-import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.BaseTest;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -31,6 +31,7 @@ import org.junit.runners.Suite;
 import org.junit.runners.Suite.SuiteClasses;
 import org.slf4j.LoggerFactory;
 
+
 @RunWith(Suite.class)
 @SuiteClasses ({
    PhoenixDataTypeTest.class,
@@ -39,7 +40,7 @@ import org.slf4j.LoggerFactory;
 })
 @Ignore
 @Category({ SlowTest.class })
-public class PhoenixTestSuite extends ClusterTest {
+public class PhoenixTestSuite extends BaseTest {
 
   private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PhoenixTestSuite.class);
 
@@ -65,7 +66,6 @@ public class PhoenixTestSuite extends ClusterTest {
       if (initCount.decrementAndGet() == 0) {
         logger.info("Shutdown all instances of test cluster.");
         QueryServerBasicsIT.afterClass();
-        shutdown();
       }
     }
   }
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/QueryServerBasicsIT.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/QueryServerBasicsIT.java
index 5105cbd..4a48bfc 100644
--- a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/QueryServerBasicsIT.java
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/QueryServerBasicsIT.java
@@ -29,12 +29,17 @@ import java.sql.ResultSetMetaData;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.end2end.QueryServerThread;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.queryserver.QueryServerProperties;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.ThinClientUtil;
 import org.slf4j.LoggerFactory;
 
+/**
+ * This is a copy of {@link org.apache.phoenix.end2end.QueryServerBasicsIT} until
+ * <a href="https://issues.apache.org/jira/browse/PHOENIX-6613">PHOENIX-6613</a> is fixed
+ */
 public class QueryServerBasicsIT extends BaseTest {
 
   private static final org.slf4j.Logger logger = LoggerFactory.getLogger(QueryServerBasicsIT.class);
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/QueryServerThread.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/QueryServerThread.java
deleted file mode 100644
index 84c9bc3..0000000
--- a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/QueryServerThread.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.phoenix;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.phoenix.queryserver.server.QueryServer;
-
-/** Wraps up the query server for tests. */
-public class QueryServerThread extends Thread {
-
-  private final QueryServer main;
-
-  public QueryServerThread(String[] argv, Configuration conf) {
-    this(argv, conf, null);
-  }
-
-  public QueryServerThread(String[] argv, Configuration conf, String name) {
-    this(new QueryServer(argv, conf), name);
-  }
-
-  private QueryServerThread(QueryServer m, String name) {
-    super(m, "query server" + (name == null ? "" : (" - " + name)));
-    this.main = m;
-    setDaemon(true);
-  }
-
-  public QueryServer getQueryServer() {
-    return main;
-  }
-}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/HttpParamImpersonationQueryServerIT.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/HttpParamImpersonationQueryServerIT.java
new file mode 100644
index 0000000..f0f0e11
--- /dev/null
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/HttpParamImpersonationQueryServerIT.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.TlsUtil;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.queryserver.QueryServerOptions;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.client.Driver;
+import org.junit.experimental.categories.Category;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.drill.exec.store.phoenix.secured.QueryServerEnvironment.LOGIN_USER;
+
+/**
+ * This is a copy of {@link org.apache.phoenix.end2end.HttpParamImpersonationQueryServerIT},
+ * but customized with 3 users, see {@link SecuredPhoenixBaseTest#runForThreeClients} for details
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class HttpParamImpersonationQueryServerIT {
+
+  public static QueryServerEnvironment environment;
+
+  private static final List<TableName> SYSTEM_TABLE_NAMES = Arrays.asList(
+    PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME,
+    PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME,
+    PhoenixDatabaseMetaData.SYSTEM_FUNCTION_HBASE_TABLE_NAME,
+    PhoenixDatabaseMetaData.SYSTEM_SCHEMA_HBASE_TABLE_NAME,
+    PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_HBASE_TABLE_NAME,
+    PhoenixDatabaseMetaData.SYSTEM_STATS_HBASE_TABLE_NAME);
+
+  public static synchronized void startQueryServerEnvironment() throws Exception {
+    // Clean up previous environment if any (Junit 4.13 @BeforeParam / @AfterParam would be an alternative)
+    if(environment != null) {
+      stopEnvironment();
+    }
+
+    final Configuration conf = new Configuration();
+    conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
+    conf.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
+    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName(), TokenProvider.class.getName());
+
+    // Set the proxyuser settings,
+    // so that the user who is running the Drillbits/MiniDfs can impersonate user1 and user2 (not user3)
+    conf.set(String.format("hadoop.proxyuser.%s.hosts", LOGIN_USER), "*");
+    conf.set(String.format("hadoop.proxyuser.%s.users", LOGIN_USER), "user1,user2");
+    conf.setBoolean(QueryServerProperties.QUERY_SERVER_WITH_REMOTEUSEREXTRACTOR_ATTRIB, true);
+    environment = new QueryServerEnvironment(conf, 3, false);
+  }
+
+  public static synchronized void stopEnvironment() throws Exception {
+    environment.stop();
+    environment = null;
+  }
+
+  static public String getUrlTemplate() {
+    String url = Driver.CONNECT_STRING_PREFIX + "url=%s://localhost:" + environment.getPqsPort() + "?"
+      + QueryServerOptions.DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM + "=%s;authentication=SPNEGO;serialization=PROTOBUF%s";
+    if (environment.getTls()) {
+      return String.format(url, "https", "%s", ";truststore=" + TlsUtil.getTrustStoreFile().getAbsolutePath()
+        + ";truststore_password=" + TlsUtil.getTrustStorePassword());
+    } else {
+      return String.format(url, "http", "%s", "");
+    }
+  }
+
+  static void grantUsersToPhoenixSystemTables(List<String> usersToGrant) throws Exception {
+    // Grant permission to the user to access the system tables
+    try {
+      for (String user : usersToGrant) {
+        for (TableName tn : SYSTEM_TABLE_NAMES) {
+          AccessControlClient.grant(environment.getUtil().getConnection(), tn, user, null, null, Action.READ, Action.EXEC);
+        }
+      }
+    } catch (Throwable e) {
+      throw new Exception(e);
+    }
+  }
+
+  static void grantUsersToGlobalPhoenixUserTables(List<String> usersToGrant) throws Exception {
+    // Grant permission to the user to access the user tables
+    try {
+      for (String user : usersToGrant) {
+        AccessControlClient.grant(environment.getUtil().getConnection(), user, Action.READ, Action.EXEC);
+      }
+    } catch (Throwable e) {
+      throw new Exception(e);
+    }
+  }
+}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/QueryServerEnvironment.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/QueryServerEnvironment.java
new file mode 100644
index 0000000..6cbe8df
--- /dev/null
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/QueryServerEnvironment.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.security.PrivilegedAction;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.phoenix.end2end.TlsUtil;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.ThinClientUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a copy of class from `org.apache.phoenix:phoenix-queryserver-it`,
+ * see original javadoc in {@link org.apache.phoenix.end2end.QueryServerEnvironment}.
+ *
+ * It is possible to use original QueryServerEnvironment, but need to solve several issues:
+ * <ul>
+ *   <li>TlsUtil.getClasspathDir(QueryServerEnvironment.class); in QueryServerEnvironment fails due to the path from jar.
+ *   Can be fixed with copying TlsUtil in Drill project and changing getClasspathDir method to use
+ *   SecuredPhoenixTestSuite.class instead of QueryServerEnvironment.class</li>
+ *   <li>SERVICE_PRINCIPAL from QueryServerEnvironment is for `securecluster` not system user. So Test fails later
+ *   in process of starting Drill cluster while creating udf area RemoteFunctionRegistry#createArea, it fails
+ *   on checking Precondition ImpersonationUtil.getProcessUserName().equals(fileStatus.getOwner()),
+ *   where ImpersonationUtil.getProcessUserName() is 'securecluster' and fileStatus.getOwner() is
+ *   your local machine login user</li>
+ * </ul>
+ */
+public class QueryServerEnvironment {
+  private static final Logger LOG = LoggerFactory.getLogger(QueryServerEnvironment.class);
+
+  private final File TEMP_DIR = new File(getTempDir());
+  private final File KEYTAB_DIR = new File(TEMP_DIR, "keytabs");
+  private final List<File> USER_KEYTAB_FILES = new ArrayList<>();
+
+  private static final String LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  static final String LOGIN_USER;
+
+  static {
+    try {
+      // uncomment it for debugging purposes
+      // System.setProperty("sun.security.krb5.debug", "true");
+      LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME = InetAddress.getByName("127.0.0.1").getCanonicalHostName();
+      String userName = System.getProperty("user.name");
+      LOGIN_USER = userName != null ? userName : "securecluster";
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static final String SPNEGO_PRINCIPAL = "HTTP/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private static final String PQS_PRINCIPAL = "phoenixqs/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private static final String SERVICE_PRINCIPAL = LOGIN_USER + "/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+  private File KEYTAB;
+
+  private MiniKdc KDC;
+  private HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private LocalHBaseCluster HBASE_CLUSTER;
+  private int NUM_CREATED_USERS;
+
+  private ExecutorService PQS_EXECUTOR;
+  private QueryServer PQS;
+  private int PQS_PORT;
+  private String PQS_URL;
+
+  private boolean tls;
+
+  private static String getTempDir() {
+    StringBuilder sb = new StringBuilder(32);
+    sb.append(System.getProperty("user.dir")).append(File.separator);
+    sb.append("target").append(File.separator);
+    sb.append(QueryServerEnvironment.class.getSimpleName());
+    sb.append("-").append(UUID.randomUUID());
+    return sb.toString();
+  }
+
+  public int getPqsPort() {
+    return PQS_PORT;
+  }
+
+  public String getPqsUrl() {
+    return PQS_URL;
+  }
+
+  public boolean getTls() {
+    return tls;
+  }
+
+  public HBaseTestingUtility getUtil() {
+    return UTIL;
+  }
+
+  public String getServicePrincipal() {
+    return SERVICE_PRINCIPAL;
+  }
+
+  public File getServiceKeytab() {
+    return KEYTAB;
+  }
+
+  private static void updateDefaultRealm() throws Exception {
+    // (at least) one other phoenix test triggers the caching of this field before the KDC is up
+    // which causes principal parsing to fail.
+    Field f = KerberosName.class.getDeclaredField("defaultRealm");
+    f.setAccessible(true);
+    // Default realm for MiniKDC
+    f.set(null, "EXAMPLE.COM");
+  }
+
+  private void createUsers(int numUsers) throws Exception {
+    assertNotNull("KDC is null, was setup method called?", KDC);
+    NUM_CREATED_USERS = numUsers;
+    for (int i = 1; i <= numUsers; i++) {
+      String principal = "user" + i;
+      File keytabFile = new File(KEYTAB_DIR, principal + ".keytab");
+      KDC.createPrincipal(keytabFile, principal);
+      USER_KEYTAB_FILES.add(keytabFile);
+    }
+  }
+
+  public Map.Entry<String, File> getUser(int offset) {
+    if (!(offset > 0 && offset <= NUM_CREATED_USERS)) {
+      throw new IllegalArgumentException();
+    }
+    return new AbstractMap.SimpleImmutableEntry<String, File>("user" + offset, USER_KEYTAB_FILES.get(offset - 1));
+  }
+
+  /**
+   * Setup the security configuration for hdfs.
+   */
+  private void setHdfsSecuredConfiguration(Configuration conf) throws Exception {
+    // Set principal+keytab configuration for HDFS
+    conf.set(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
+      SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY,
+      SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
+      SPNEGO_PRINCIPAL + "@" + KDC.getRealm());
+    // Enable token access for HDFS blocks
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    // Only use HTTPS (required because we aren't using "secure" ports)
+    conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+    // Bind on localhost for spnego to have a chance at working
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+
+    // Generate SSL certs
+    File keystoresDir = new File(UTIL.getDataTestDir("keystore").toUri().getPath());
+    keystoresDir.mkdirs();
+    String sslConfDir = TlsUtil.getClasspathDir(QueryServerEnvironment.class);
+    TlsUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, conf, false);
+
+    // Magic flag to tell hdfs to not fail on using ports above 1024
+    conf.setBoolean("ignore.secure.ports.for.testing", true);
+  }
+
+  private static void ensureIsEmptyDirectory(File f) throws IOException {
+    if (f.exists()) {
+      if (f.isDirectory()) {
+        FileUtils.deleteDirectory(f);
+      } else {
+        assertTrue("Failed to delete keytab directory", f.delete());
+      }
+    }
+    assertTrue("Failed to create keytab directory", f.mkdirs());
+  }
+
+  /**
+   * Setup and start kerberosed, hbase
+   * @throws Exception
+   */
+  public QueryServerEnvironment(final Configuration confIn, int numberOfUsers, boolean tls)
+    throws Exception {
+    this.tls = tls;
+
+    Configuration conf = UTIL.getConfiguration();
+    conf.addResource(confIn);
+    // Ensure the dirs we need are created/empty
+    ensureIsEmptyDirectory(TEMP_DIR);
+    ensureIsEmptyDirectory(KEYTAB_DIR);
+    KEYTAB = new File(KEYTAB_DIR, "test.keytab");
+    // Start a MiniKDC
+    KDC = UTIL.setupMiniKdc(KEYTAB);
+    // Create a service principal and spnego principal in one keytab
+    // NB. Due to some apparent limitations between HDFS and HBase in the same JVM, trying to
+    // use separate identies for HBase and HDFS results in a GSS initiate error. The quick
+    // solution is to just use a single "service" principal instead of "hbase" and "hdfs"
+    // (or "dn" and "nn") per usual.
+    KDC.createPrincipal(KEYTAB, SPNEGO_PRINCIPAL, PQS_PRINCIPAL, SERVICE_PRINCIPAL);
+    // Start ZK by hand
+    UTIL.startMiniZKCluster();
+
+    // Create a number of unprivileged users
+    createUsers(numberOfUsers);
+
+    // Set configuration for HBase
+    HBaseKerberosUtils.setPrincipalForTesting(SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+    HBaseKerberosUtils.setSecuredConfiguration(conf);
+    setHdfsSecuredConfiguration(conf);
+    UserGroupInformation.setConfiguration(conf);
+    conf.setInt(HConstants.MASTER_PORT, 0);
+    conf.setInt(HConstants.MASTER_INFO_PORT, 0);
+    conf.setInt(HConstants.REGIONSERVER_PORT, 0);
+    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, 0);
+
+    if (tls) {
+      conf.setBoolean(QueryServerProperties.QUERY_SERVER_TLS_ENABLED, true);
+      conf.set(QueryServerProperties.QUERY_SERVER_TLS_KEYSTORE,
+        TlsUtil.getKeyStoreFile().getAbsolutePath());
+      conf.set(QueryServerProperties.QUERY_SERVER_TLS_KEYSTORE_PASSWORD,
+        TlsUtil.getKeyStorePassword());
+      conf.set(QueryServerProperties.QUERY_SERVER_TLS_TRUSTSTORE,
+        TlsUtil.getTrustStoreFile().getAbsolutePath());
+      conf.set(QueryServerProperties.QUERY_SERVER_TLS_TRUSTSTORE_PASSWORD,
+        TlsUtil.getTrustStorePassword());
+    }
+
+    // Secure Phoenix setup
+    conf.set(QueryServerProperties.QUERY_SERVER_KERBEROS_HTTP_PRINCIPAL_ATTRIB_LEGACY,
+      SPNEGO_PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(QueryServerProperties.QUERY_SERVER_HTTP_KEYTAB_FILENAME_ATTRIB,
+      KEYTAB.getAbsolutePath());
+    conf.set(QueryServerProperties.QUERY_SERVER_KERBEROS_PRINCIPAL_ATTRIB,
+      PQS_PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(QueryServerProperties.QUERY_SERVER_KEYTAB_FILENAME_ATTRIB,
+      KEYTAB.getAbsolutePath());
+    conf.setBoolean(QueryServerProperties.QUERY_SERVER_DISABLE_KERBEROS_LOGIN, true);
+    conf.setInt(QueryServerProperties.QUERY_SERVER_HTTP_PORT_ATTRIB, 0);
+    // Required so that PQS can impersonate the end-users to HBase
+    conf.set("hadoop.proxyuser.phoenixqs.groups", "*");
+    conf.set("hadoop.proxyuser.phoenixqs.hosts", "*");
+
+    // Clear the cached singletons so we can inject our own.
+    InstanceResolver.clearSingletons();
+    // Make sure the ConnectionInfo doesn't try to pull a default Configuration
+    InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() {
+      @Override
+      public Configuration getConfiguration() {
+        return conf;
+      }
+
+      @Override
+      public Configuration getConfiguration(Configuration confToClone) {
+        Configuration copy = new Configuration(conf);
+        copy.addResource(confToClone);
+        return copy;
+      }
+    });
+    updateDefaultRealm();
+
+    // Start HDFS
+    UTIL.startMiniDFSCluster(1);
+    // Use LocalHBaseCluster to avoid HBaseTestingUtility from doing something wrong
+    // NB. I'm not actually sure what HTU does incorrect, but this was pulled from some test
+    // classes in HBase itself. I couldn't get HTU to work myself (2017/07/06)
+    Path rootdir = UTIL.getDataTestDirOnTestFS(QueryServerEnvironment.class.getSimpleName());
+    // There is no setRootdir method that is available in all supported HBase versions.
+    conf.set(HBASE_DIR, rootdir.toString());
+    HBASE_CLUSTER = new LocalHBaseCluster(conf, 1);
+    HBASE_CLUSTER.startup();
+
+    // Then fork a thread with PQS in it.
+    configureAndStartQueryServer(tls);
+  }
+
+  private void configureAndStartQueryServer(boolean tls) throws Exception {
+    PQS = new QueryServer(new String[0], UTIL.getConfiguration());
+    // Get the PQS ident for PQS to use
+    final UserGroupInformation ugi =
+      UserGroupInformation.loginUserFromKeytabAndReturnUGI(PQS_PRINCIPAL,
+        KEYTAB.getAbsolutePath());
+    PQS_EXECUTOR = Executors.newSingleThreadExecutor();
+    // Launch PQS, doing in the Kerberos login instead of letting PQS do it itself (which would
+    // break the HBase/HDFS logins also running in the same test case).
+    PQS_EXECUTOR.submit(new Runnable() {
+      @Override
+      public void run() {
+        ugi.doAs(new PrivilegedAction<Void>() {
+          @Override
+          public Void run() {
+            PQS.run();
+            return null;
+          }
+        });
+      }
+    });
+    PQS.awaitRunning();
+    PQS_PORT = PQS.getPort();
+    PQS_URL =
+      ThinClientUtil.getConnectionUrl(tls ? "https" : "http", "localhost", PQS_PORT)
+        + ";authentication=SPNEGO" + (tls
+        ? ";truststore=" + TlsUtil.getTrustStoreFile().getAbsolutePath()
+        + ";truststore_password=" + TlsUtil.getTrustStorePassword()
+        : "");
+    LOG.debug("Phoenix Query Server URL: {}", PQS_URL);
+  }
+
+  public void stop() throws Exception {
+    // Remove our custom ConfigurationFactory for future tests
+    InstanceResolver.clearSingletons();
+    if (PQS_EXECUTOR != null) {
+      PQS.stop();
+      PQS_EXECUTOR.shutdown();
+      if (!PQS_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.info("PQS didn't exit in 5 seconds, proceeding anyways.");
+      }
+    }
+    if (HBASE_CLUSTER != null) {
+      HBASE_CLUSTER.shutdown();
+      HBASE_CLUSTER.join();
+    }
+    if (UTIL != null) {
+      UTIL.shutdownMiniZKCluster();
+    }
+    if (KDC != null) {
+      KDC.stop();
+    }
+  }
+}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixBaseTest.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixBaseTest.java
new file mode 100644
index 0000000..bf541a9
--- /dev/null
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixBaseTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import ch.qos.logback.classic.Level;
+import com.sun.security.auth.module.Krb5LoginModule;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
+import org.apache.drill.exec.rpc.security.kerberos.KerberosFactory;
+import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.phoenix.PhoenixStoragePluginConfig;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createSampleData;
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createSchema;
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createTables;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.environment;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.getUrlTemplate;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.grantUsersToGlobalPhoenixUserTables;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.grantUsersToPhoenixSystemTables;
+import static org.apache.drill.exec.store.phoenix.secured.SecuredPhoenixTestSuite.initPhoenixQueryServer;
+
+@Slf4j
+public abstract class SecuredPhoenixBaseTest extends ClusterTest {
+  protected static LogFixture logFixture;
+  private final static Level CURRENT_LOG_LEVEL = Level.INFO;
+
+  private final static AtomicInteger initCount = new AtomicInteger(0);
+
+  @BeforeAll
+  public static void setUpBeforeClass() throws Exception {
+    TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+    initPhoenixQueryServer();
+    startSecuredDrillCluster();
+    initializeDatabase();
+  }
+
+  private static void startSecuredDrillCluster() throws Exception {
+    logFixture = LogFixture.builder()
+      .toConsole()
+      .logger(QueryServerEnvironment.class, CURRENT_LOG_LEVEL)
+      .logger(SecuredPhoenixBaseTest.class, CURRENT_LOG_LEVEL)
+      .logger(KerberosFactory.class, CURRENT_LOG_LEVEL)
+      .logger(Krb5LoginModule.class, CURRENT_LOG_LEVEL)
+      .logger(QueryServer.class, CURRENT_LOG_LEVEL)
+      .logger(ServerAuthenticationHandler.class, CURRENT_LOG_LEVEL)
+      .build();
+
+    Map.Entry<String, File> user1 = environment.getUser(1);
+    Map.Entry<String, File> user2 = environment.getUser(2);
+    Map.Entry<String, File> user3 = environment.getUser(3);
+
+    dirTestWatcher.start(SecuredPhoenixTestSuite.class); // until DirTestWatcher ClassRule is implemented for JUnit5
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+        .configProperty(ExecConstants.USER_AUTHENTICATION_ENABLED, true)
+        .configProperty(ExecConstants.USER_AUTHENTICATOR_IMPL, UserAuthenticatorTestImpl.TYPE)
+        .configNonStringProperty(ExecConstants.AUTHENTICATION_MECHANISMS, Lists.newArrayList("kerberos"))
+        .configProperty(ExecConstants.IMPERSONATION_ENABLED, true)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_ENABLED, true)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_MECHANISM, "kerberos")
+        .configProperty(ExecConstants.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting())
+        .configProperty(ExecConstants.SERVICE_KEYTAB_LOCATION, environment.getServiceKeytab().getAbsolutePath())
+        .configClientProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting())
+        .configClientProperty(DrillProperties.USER, user1.getKey())
+        .configClientProperty(DrillProperties.KEYTAB, user1.getValue().getAbsolutePath());
+    startCluster(builder);
+    Properties user2ClientProperties = new Properties();
+    user2ClientProperties.setProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting());
+    user2ClientProperties.setProperty(DrillProperties.USER, user2.getKey());
+    user2ClientProperties.setProperty(DrillProperties.KEYTAB, user2.getValue().getAbsolutePath());
+    cluster.addClientFixture(user2ClientProperties);
+    Properties user3ClientProperties = new Properties();
+    user3ClientProperties.setProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting());
+    user3ClientProperties.setProperty(DrillProperties.USER, user3.getKey());
+    user3ClientProperties.setProperty(DrillProperties.KEYTAB, user3.getValue().getAbsolutePath());
+    cluster.addClientFixture(user3ClientProperties);
+
+    Map<String, Object> phoenixProps = new HashMap<>();
+    phoenixProps.put("phoenix.query.timeoutMs", 90000);
+    phoenixProps.put("phoenix.query.keepAliveMs", "30000");
+    phoenixProps.put("phoenix.queryserver.withRemoteUserExtractor", true);
+    StoragePluginRegistry registry = cluster.drillbit().getContext().getStorage();
+    final String doAsUrl = String.format(getUrlTemplate(), "$user");
+    logger.debug("Phoenix Query Server URL: {}", environment.getPqsUrl());
+    PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, 0, null, null,
+      doAsUrl, null, phoenixProps);
+    config.setEnabled(true);
+    registry.put(PhoenixStoragePluginConfig.NAME + "123", config);
+  }
+
+
+  /**
+   * Initialize HBase via Phoenix
+   */
+  private static void initializeDatabase() throws Exception {
+    dirTestWatcher.copyResourceToRoot(Paths.get(""));
+    if (initCount.incrementAndGet() == 1) {
+      final Map.Entry<String, File> user1 = environment.getUser(1);
+      final Map.Entry<String, File> user2 = environment.getUser(2);
+      // Build the JDBC URL by hand with the doAs
+      final UserGroupInformation serviceUgi = ImpersonationUtil.getProcessUserUGI();
+      serviceUgi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        createSchema(environment.getPqsUrl());
+        createTables(environment.getPqsUrl());
+        createSampleData(environment.getPqsUrl());
+        grantUsersToPhoenixSystemTables(Arrays.asList(user1.getKey(), user2.getKey()));
+        grantUsersToGlobalPhoenixUserTables(Arrays.asList(user1.getKey()));
+        return null;
+      });
+    }
+  }
+
+  protected interface TestWrapper {
+    void apply() throws Exception;
+  }
+
+  public void runForThreeClients(SecuredPhoenixSQLTest.TestWrapper wrapper) throws Exception {
+    runForThreeClients(wrapper, UserRemoteException.class, RuntimeException.class);
+  }
+
+  /**
+   * @param wrapper actual test case execution
+   * @param user2ExpectedException the expected Exception for user2, which can be impersonated, but hasn't permissions to the tables
+   * @param user3ExpectedException the expected Exception for user3, isn't impersonated
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public void runForThreeClients(SecuredPhoenixSQLTest.TestWrapper wrapper, Class user2ExpectedException, Class user3ExpectedException) throws Exception {
+    try {
+      client = cluster.client(0);
+      wrapper.apply();
+      client = cluster.client(1);
+      // original is AccessDeniedException: Insufficient permissions for user 'user2'
+      Assertions.assertThrows(user2ExpectedException, wrapper::apply);
+      client = cluster.client(2);
+      // RuntimeException for user3, Failed to execute HTTP Request, got HTTP/401
+      Assertions.assertThrows(user3ExpectedException, wrapper::apply);
+    } finally {
+      client = cluster.client(0);
+    }
+  }
+
+  @AfterAll
+  public static void tearDownCluster() throws Exception {
+    if (!SecuredPhoenixTestSuite.isRunningSuite() && environment != null) {
+      HttpParamImpersonationQueryServerIT.stopEnvironment();
+    }
+  }
+}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixCommandTest.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixCommandTest.java
similarity index 61%
copy from contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixCommandTest.java
copy to contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixCommandTest.java
index 4fa5ac5..e0f1cce 100644
--- a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixCommandTest.java
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixCommandTest.java
@@ -15,9 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.phoenix;
-
-import static org.junit.Assert.assertEquals;
+package org.apache.drill.exec.store.phoenix.secured;
 
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.categories.SlowTest;
@@ -28,23 +26,31 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.test.QueryBuilder;
 import org.apache.drill.test.rowSet.RowSetComparison;
-import org.junit.FixMethodOrder;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runners.MethodSorters;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
 
-@FixMethodOrder(MethodSorters.JVM)
-@Category({ SlowTest.class, RowSetTests.class })
-public class PhoenixCommandTest extends PhoenixBaseTest {
+@Tag(SlowTest.TAG)
+@Tag(RowSetTests.TAG)
+public class SecuredPhoenixCommandTest extends SecuredPhoenixBaseTest {
 
   @Test
   public void testShowTablesLike() throws Exception {
-    run("USE phoenix123.v1");
-    assertEquals(1, queryBuilder().sql("SHOW TABLES LIKE '%REGION%'").run().recordCount());
+    runForThreeClients(this::doTestShowTablesLike);
+  }
+
+  private void doTestShowTablesLike() throws Exception {
+    runAndPrint("SHOW SCHEMAS");
+    run("USE phoenix123.V1");
+    Assertions.assertEquals(1, queryBuilder().sql("SHOW TABLES LIKE '%REGION%'").run().recordCount());
   }
 
   @Test
   public void testShowTables() throws Exception {
+    runForThreeClients(this::doTestShowTables);
+  }
+
+  private void doTestShowTables() throws Exception {
     String sql = "SHOW TABLES FROM phoenix123.v1";
     QueryBuilder builder = client.queryBuilder().sql(sql);
     RowSet sets = builder.rowSet();
@@ -66,28 +72,11 @@ public class PhoenixCommandTest extends PhoenixBaseTest {
 
   @Test
   public void testDescribe() throws Exception {
-    assertEquals(4, queryBuilder().sql("DESCRIBE phoenix123.v1.NATION").run().recordCount());
+    runForThreeClients(this::doTestDescribe);
   }
 
-  @Test
-  public void testDescribeCaseInsensitive() throws Exception {
-    String sql = "DESCRIBE phoenix123.v1.nation"; // use lowercase
-    QueryBuilder builder = client.queryBuilder().sql(sql);
-    RowSet sets = builder.rowSet();
-
-    TupleMetadata schema = new SchemaBuilder()
-        .addNullable("COLUMN_NAME", MinorType.VARCHAR)
-        .addNullable("DATA_TYPE", MinorType.VARCHAR)
-        .addNullable("IS_NULLABLE", MinorType.VARCHAR)
-        .build();
-
-    RowSet expected = new RowSetBuilder(client.allocator(), schema)
-        .addRow("N_NATIONKEY", "BIGINT", "NO")
-        .addRow("N_NAME", "CHARACTER VARYING", "YES")
-        .addRow("N_REGIONKEY", "BIGINT", "YES")
-        .addRow("N_COMMENT", "CHARACTER VARYING", "YES")
-        .build();
-
-    new RowSetComparison(expected).verifyAndClearAll(sets);
+  private void doTestDescribe() throws Exception {
+    run("USE phoenix123.v1");
+    Assertions.assertEquals(4, queryBuilder().sql("DESCRIBE NATION").run().recordCount());
   }
 }
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixDataTypeTest.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixDataTypeTest.java
new file mode 100644
index 0000000..5fd0962
--- /dev/null
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixDataTypeTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.U_U_I_D;
+import static org.apache.drill.test.rowSet.RowSetUtilities.boolArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.byteArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.doubleArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.longArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.shortArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+
+@Tag(SlowTest.TAG)
+@Tag(RowSetTests.TAG)
+public class SecuredPhoenixDataTypeTest extends SecuredPhoenixBaseTest {
+
+  @Test
+  public void testDataType() throws Exception {
+    runForThreeClients(this::doTestDataType);
+  }
+
+  private void doTestDataType() throws Exception {
+    String sql = "select * from phoenix123.v1.datatype";
+    QueryBuilder builder = queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("T_UUID", MinorType.VARCHAR)
+        .addNullable("T_VARCHAR", MinorType.VARCHAR)
+        .addNullable("T_CHAR", MinorType.VARCHAR)
+        .addNullable("T_BIGINT", MinorType.BIGINT)
+        .addNullable("T_INTEGER", MinorType.INT)
+        .addNullable("T_SMALLINT", MinorType.INT)
+        .addNullable("T_TINYINT", MinorType.INT)
+        .addNullable("T_DOUBLE", MinorType.FLOAT8)
+        .addNullable("T_FLOAT", MinorType.FLOAT4)
+        .addNullable("T_DECIMAL", MinorType.VARDECIMAL)
+        .addNullable("T_DATE", MinorType.DATE)
+        .addNullable("T_TIME", MinorType.TIME)
+        .addNullable("T_TIMESTAMP", MinorType.TIMESTAMP)
+        .addNullable("T_BINARY", MinorType.VARBINARY)
+        .addNullable("T_VARBINARY", MinorType.VARBINARY)
+        .addNullable("T_BOOLEAN", MinorType.BIT)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+        .addRow(U_U_I_D,
+            "apache", "drill",
+            Long.MAX_VALUE,
+            Integer.MAX_VALUE,
+            Short.MAX_VALUE,
+            Byte.MAX_VALUE,
+            Double.MAX_VALUE,
+            Float.MAX_VALUE,
+            BigDecimal.valueOf(10.11),
+            LocalDate.parse("2021-12-12"),
+            LocalTime.parse("12:12:12"),
+            Instant.ofEpochMilli(1639311132000l),
+            "a_b_c_d_e_".getBytes(), "12345".getBytes(),
+            Boolean.TRUE)
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+
+  @Test
+  public void testArrayType() throws Exception {
+    runForThreeClients(this::doTestArrayType);
+  }
+
+  private void doTestArrayType() throws Exception {
+    String sql = "select * from phoenix123.v1.arraytype";
+
+    QueryBuilder builder = queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("T_UUID", MinorType.VARCHAR)
+        .addArray("T_VARCHAR", MinorType.VARCHAR)
+        .addArray("T_CHAR", MinorType.VARCHAR)
+        .addArray("T_BIGINT", MinorType.BIGINT)
+        .addArray("T_INTEGER", MinorType.INT)
+        .addArray("T_DOUBLE", MinorType.FLOAT8)
+        .addArray("T_SMALLINT", MinorType.SMALLINT)
+        .addArray("T_TINYINT", MinorType.TINYINT)
+        .addArray("T_BOOLEAN", MinorType.BIT)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+        .addRow(U_U_I_D,
+            strArray("apache", "drill", "1.20"),
+            strArray("a", "b", "c"),
+            longArray(Long.MIN_VALUE, Long.MAX_VALUE),
+            intArray(Integer.MIN_VALUE, Integer.MAX_VALUE),
+            doubleArray(Double.MIN_VALUE, Double.MAX_VALUE),
+            shortArray(Short.MIN_VALUE, Short.MAX_VALUE),
+            byteArray((int) Byte.MIN_VALUE, (int) Byte.MAX_VALUE),
+            boolArray(Boolean.TRUE, Boolean.FALSE))
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixSQLTest.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixSQLTest.java
new file mode 100644
index 0000000..c2209ce
--- /dev/null
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixSQLTest.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag(SlowTest.TAG)
+@Tag(RowSetTests.TAG)
+public class SecuredPhoenixSQLTest extends SecuredPhoenixBaseTest {
+
+  @Test
+  public void testStarQuery() throws Exception {
+    runForThreeClients(this::doTestStarQuery);
+  }
+
+  private void doTestStarQuery() throws Exception {
+    String sql = "select * from phoenix123.v1.nation";
+    queryBuilder().sql(sql).run();
+  }
+
+  @Test
+  public void testExplicitQuery() throws Exception {
+    runForThreeClients(this::doTestExplicitQuery);
+  }
+
+  private void doTestExplicitQuery() throws Exception {
+    String sql = "select n_nationkey, n_regionkey, n_name from phoenix123.v1.nation";
+    QueryBuilder builder = queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("n_nationkey", MinorType.BIGINT)
+        .addNullable("n_regionkey", MinorType.BIGINT)
+        .addNullable("n_name", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+        .addRow(0, 0, "ALGERIA")
+        .addRow(1, 1, "ARGENTINA")
+        .addRow(2, 1, "BRAZIL")
+        .addRow(3, 1, "CANADA")
+        .addRow(4, 4, "EGYPT")
+        .addRow(5, 0, "ETHIOPIA")
+        .addRow(6, 3, "FRANCE")
+        .addRow(7, 3, "GERMANY")
+        .addRow(8, 2, "INDIA")
+        .addRow(9, 2, "INDONESIA")
+        .addRow(10, 4, "IRAN")
+        .addRow(11, 4, "IRAQ")
+        .addRow(12, 2, "JAPAN")
+        .addRow(13, 4, "JORDAN")
+        .addRow(14, 0, "KENYA")
+        .addRow(15, 0, "MOROCCO")
+        .addRow(16, 0, "MOZAMBIQUE")
+        .addRow(17, 1, "PERU")
+        .addRow(18, 2, "CHINA")
+        .addRow(19, 3, "ROMANIA")
+        .addRow(20, 4, "SAUDI ARABIA")
+        .addRow(21, 2, "VIETNAM")
+        .addRow(22, 3, "RUSSIA")
+        .addRow(23, 3, "UNITED KINGDOM")
+        .addRow(24, 1, "UNITED STATES")
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+
+  @Test
+  public void testLimitPushdown() throws Exception {
+    runForThreeClients(this::doTestLimitPushdown);
+  }
+
+  private void doTestLimitPushdown() throws Exception {
+    String sql = "select n_name, n_regionkey from phoenix123.v1.nation limit 20 offset 10";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    builder.planMatcher()
+        .exclude("Limit")
+        .include("OFFSET .* ROWS FETCH NEXT .* ROWS ONLY")
+        .match();
+
+    assertEquals(15, sets.rowCount());
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("n_name", MinorType.VARCHAR)
+        .addNullable("n_regionkey", MinorType.BIGINT)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+        .addRow("IRAN", 4)
+        .addRow("IRAQ", 4)
+        .addRow("JAPAN", 2)
+        .addRow("JORDAN", 4)
+        .addRow("KENYA", 0)
+        .addRow("MOROCCO", 0)
+        .addRow("MOZAMBIQUE", 0)
+        .addRow("PERU", 1)
+        .addRow("CHINA", 2)
+        .addRow("ROMANIA", 3)
+        .addRow("SAUDI ARABIA", 4)
+        .addRow("VIETNAM", 2)
+        .addRow("RUSSIA", 3)
+        .addRow("UNITED KINGDOM", 3)
+        .addRow("UNITED STATES", 1)
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+
+  @Test
+  public void testFilterPushdown() throws Exception {
+    runForThreeClients(this::doTestFilterPushdown);
+  }
+
+  private void doTestFilterPushdown() throws Exception {
+    String sql = "select * from phoenix123.v1.region where r_name = 'ASIA'";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    builder.planMatcher()
+        .exclude("Filter")
+        .include("WHERE .* = 'ASIA'")
+        .match();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("R_REGIONKEY", MinorType.BIGINT)
+        .addNullable("R_NAME", MinorType.VARCHAR)
+        .addNullable("R_COMMENT", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+        .addRow(2, "ASIA", "ges. thinly even pinto beans ca")
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    runForThreeClients(this::doTestSerDe, RpcException.class, RpcException.class);
+  }
+
+  private void doTestSerDe() throws Exception {
+    String sql = "select count(*) as total from phoenix123.v1.nation";
+    String plan = queryBuilder().sql(sql).explainJson();
+    long cnt = queryBuilder().physical(plan).singletonLong();
+    assertEquals(25, cnt, "Counts should match");
+  }
+
+  @Test
+  public void testJoinPushdown() throws Exception {
+    runForThreeClients(this::doTestJoinPushdown);
+  }
+
+  private void doTestJoinPushdown() throws Exception {
+    String sql = "select a.n_name, b.r_name from phoenix123.v1.nation a join phoenix123.v1.region b "
+        + "on a.n_regionkey = b.r_regionkey";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    builder.planMatcher()
+        .exclude("Join")
+        .include("Phoenix\\(.* INNER JOIN")
+        .match();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("n_name", MinorType.VARCHAR)
+        .addNullable("r_name", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+        .addRow("ALGERIA", "AFRICA")
+        .addRow("ARGENTINA", "AMERICA")
+        .addRow("BRAZIL", "AMERICA")
+        .addRow("CANADA", "AMERICA")
+        .addRow("EGYPT", "MIDDLE EAST")
+        .addRow("ETHIOPIA", "AFRICA")
+        .addRow("FRANCE", "EUROPE")
+        .addRow("GERMANY", "EUROPE")
+        .addRow("INDIA", "ASIA")
+        .addRow("INDONESIA", "ASIA")
+        .addRow("IRAN", "MIDDLE EAST")
+        .addRow("IRAQ", "MIDDLE EAST")
+        .addRow("JAPAN", "ASIA")
+        .addRow("JORDAN", "MIDDLE EAST")
+        .addRow("KENYA", "AFRICA")
+        .addRow("MOROCCO", "AFRICA")
+        .addRow("MOZAMBIQUE", "AFRICA")
+        .addRow("PERU", "AMERICA")
+        .addRow("CHINA", "ASIA")
+        .addRow("ROMANIA", "EUROPE")
+        .addRow("SAUDI ARABIA", "MIDDLE EAST")
+        .addRow("VIETNAM", "ASIA")
+        .addRow("RUSSIA", "EUROPE")
+        .addRow("UNITED KINGDOM", "EUROPE")
+        .addRow("UNITED STATES", "AMERICA")
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+
+  @Test
+  public void testCrossJoin() throws Exception {
+    runForThreeClients(this::doTestCrossJoin);
+  }
+
+  private void doTestCrossJoin() throws Exception {
+    String sql = "select a.n_name, b.n_comment from phoenix123.v1.nation a cross join phoenix123.v1.nation b";
+    client.alterSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName(), false);
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    builder.planMatcher().exclude("Join").match();
+
+    assertEquals(625, sets.rowCount(), "Counts should match");
+    sets.clear();
+  }
+
+  @Test
+  @Disabled("use the remote query server directly without minicluster")
+  public void testJoinWithFilterPushdown() throws Exception {
+    String sql = "select 10 as DRILL, a.n_name, b.r_name from phoenix123.v1.nation a join phoenix123.v1.region b "
+        + "on a.n_regionkey = b.r_regionkey where b.r_name = 'ASIA'";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    builder.planMatcher()
+        .exclude("Join")
+        .exclude("Filter")
+        .include("Phoenix\\(.* INNER JOIN .* WHERE")
+        .match();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("DRILL", MinorType.INT)
+        .addNullable("n_name", MinorType.VARCHAR)
+        .addNullable("r_name", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+        .addRow(10, "INDIA", "ASIA")
+        .addRow(10, "INDONESIA", "ASIA")
+        .addRow(10, "JAPAN", "ASIA")
+        .addRow(10, "CHINA", "ASIA")
+        .addRow(10, "VIETNAM", "ASIA")
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+
+  @Test
+  public void testGroupByPushdown() throws Exception {
+    runForThreeClients(this::doTestGroupByPushdown);
+  }
+
+  private void doTestGroupByPushdown() throws Exception {
+    String sql = "select n_regionkey, count(1) as total from phoenix123.v1.nation group by n_regionkey";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    builder.planMatcher()
+        .exclude("Aggregate")
+        .include("Phoenix\\(.* GROUP BY")
+        .match();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("n_regionkey", MinorType.BIGINT)
+        .addNullable("total", MinorType.BIGINT)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+        .addRow(0, 5)
+        .addRow(1, 5)
+        .addRow(2, 5)
+        .addRow(3, 5)
+        .addRow(4, 5)
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+
+  @Test
+  public void testDistinctPushdown() throws Exception {
+    runForThreeClients(this::doTestDistinctPushdown);
+  }
+
+  private void doTestDistinctPushdown() throws Exception {
+    String sql = "select distinct n_name from phoenix123.v1.nation"; // auto convert to group-by
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    builder.planMatcher()
+        .exclude("Aggregate")
+        .include("Phoenix\\(.* GROUP BY \"N_NAME")
+        .match();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("n_name", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+        .addRow("ALGERIA")
+        .addRow("ARGENTINA")
+        .addRow("BRAZIL")
+        .addRow("CANADA")
+        .addRow("CHINA")
+        .addRow("EGYPT")
+        .addRow("ETHIOPIA")
+        .addRow("FRANCE")
+        .addRow("GERMANY")
+        .addRow("INDIA")
+        .addRow("INDONESIA")
+        .addRow("IRAN")
+        .addRow("IRAQ")
+        .addRow("JAPAN")
+        .addRow("JORDAN")
+        .addRow("KENYA")
+        .addRow("MOROCCO")
+        .addRow("MOZAMBIQUE")
+        .addRow("PERU")
+        .addRow("ROMANIA")
+        .addRow("RUSSIA")
+        .addRow("SAUDI ARABIA")
+        .addRow("UNITED KINGDOM")
+        .addRow("UNITED STATES")
+        .addRow("VIETNAM")
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+
+  @Test
+  public void testHavingPushdown() throws Exception {
+    runForThreeClients(this::doTestHavingPushdown);
+  }
+
+  private void doTestHavingPushdown() throws Exception {
+    String sql = "select n_regionkey, max(n_nationkey) from phoenix123.v1.nation group by n_regionkey having max(n_nationkey) > 20";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    builder.planMatcher()
+        .exclude("Aggregate")
+        .include("Phoenix\\(.* GROUP BY .* HAVING MAX")
+        .match();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("n_regionkey", MinorType.BIGINT)
+        .addNullable("EXPR$1", MinorType.BIGINT)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+        .addRow(1, 24)
+        .addRow(2, 21)
+        .addRow(3, 23)
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixTestSuite.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixTestSuite.java
similarity index 62%
copy from contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixTestSuite.java
copy to contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixTestSuite.java
index 1817b76..5d0451e 100644
--- a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixTestSuite.java
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixTestSuite.java
@@ -15,57 +15,59 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.phoenix;
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.exec.store.phoenix.QueryServerBasicsIT;
+import org.apache.drill.test.BaseTest;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Tag;
+import org.junit.platform.suite.api.SelectClasses;
+import org.junit.platform.suite.api.Suite;
+import org.slf4j.LoggerFactory;
 
 import java.util.TimeZone;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.drill.categories.SlowTest;
-import org.apache.drill.test.ClusterTest;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
-import org.junit.runners.Suite.SuiteClasses;
-import org.slf4j.LoggerFactory;
 
-@RunWith(Suite.class)
-@SuiteClasses ({
-   PhoenixDataTypeTest.class,
-   PhoenixSQLTest.class,
-   PhoenixCommandTest.class
+@Suite
+@SelectClasses({
+  SecuredPhoenixDataTypeTest.class,
+  SecuredPhoenixSQLTest.class,
+  SecuredPhoenixCommandTest.class
 })
-@Ignore
-@Category({ SlowTest.class })
-public class PhoenixTestSuite extends ClusterTest {
+@Disabled
+@Tag(SlowTest.TAG)
+@Tag(RowSetTests.TAG)
+public class SecuredPhoenixTestSuite extends BaseTest {
 
-  private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PhoenixTestSuite.class);
+  private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SecuredPhoenixTestSuite.class);
 
   private static volatile boolean runningSuite = false;
-  private static AtomicInteger initCount = new AtomicInteger(0);
+  private static final AtomicInteger initCount = new AtomicInteger(0);
 
-  @BeforeClass
+  @BeforeAll
   public static void initPhoenixQueryServer() throws Exception {
     TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
-    synchronized (PhoenixTestSuite.class) {
+    synchronized (SecuredPhoenixTestSuite.class) {
       if (initCount.get() == 0) {
         logger.info("Boot the test cluster...");
-        QueryServerBasicsIT.doSetup();
+        HttpParamImpersonationQueryServerIT.startQueryServerEnvironment();
       }
       initCount.incrementAndGet();
       runningSuite = true;
     }
   }
 
-  @AfterClass
+  @AfterAll
   public static void tearDownCluster() throws Exception {
-    synchronized (PhoenixTestSuite.class) {
+    synchronized (SecuredPhoenixTestSuite.class) {
       if (initCount.decrementAndGet() == 0) {
         logger.info("Shutdown all instances of test cluster.");
         QueryServerBasicsIT.afterClass();
-        shutdown();
       }
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index d47e8d9..9e50f12 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -98,12 +98,7 @@ class OperatorContextImpl extends BaseOperatorContext implements AutoCloseable {
         currentThread.setName(proxyUgi.getUserName() + ":task-delegate-thread");
         final RESULT result;
         try {
-          result = proxyUgi.doAs(new PrivilegedExceptionAction<RESULT>() {
-            @Override
-            public RESULT run() throws Exception {
-              return callable.call();
-            }
-          });
+          result = proxyUgi.doAs((PrivilegedExceptionAction<RESULT>) () -> callable.call());
         } finally {
           currentThread.setName(originalThreadName);
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index 325d587..bc98205 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -110,7 +110,6 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
   /**
    * Name of the user whom to impersonate while setting up the implementation (RecordBatch) of this
    * PhysicalOperator. Default value is "null" in which case we impersonate as user who launched the query.
-   * @return
    */
   @JsonProperty("userName")
   String getUserName();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index adfd766..cf0112d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -117,12 +117,8 @@ public class ImplCreator {
     if (context.isImpersonationEnabled()) {
       final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(root.getUserName(), context.getQueryUserName());
       try {
-        return proxyUgi.doAs(new PrivilegedExceptionAction<RootExec>() {
-          @Override
-          public RootExec run() throws Exception {
-            return ((RootCreator<PhysicalOperator>) getOpCreator(root, context)).getRoot(context, root, childRecordBatches);
-          }
-        });
+        return proxyUgi.doAs((PrivilegedExceptionAction<RootExec>) ()
+          -> ((RootCreator<PhysicalOperator>) getOpCreator(root, context)).getRoot(context, root, childRecordBatches));
       } catch (InterruptedException | IOException e) {
         final String errMsg = String.format("Failed to create RootExec for operator with id '%d'", root.getOperatorId());
         logger.error(errMsg, e);
@@ -145,15 +141,12 @@ public class ImplCreator {
     if (context.isImpersonationEnabled()) {
       final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(op.getUserName(), context.getQueryUserName());
       try {
-        return proxyUgi.doAs(new PrivilegedExceptionAction<RecordBatch>() {
-          @Override
-          public RecordBatch run() throws Exception {
-            @SuppressWarnings("unchecked")
-            final CloseableRecordBatch batch = ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(
-                context, op, childRecordBatches);
-            operators.addFirst(batch);
-            return batch;
-          }
+        return proxyUgi.doAs((PrivilegedExceptionAction<RecordBatch>) () -> {
+          @SuppressWarnings("unchecked")
+          final CloseableRecordBatch batch = ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(
+              context, op, childRecordBatches);
+          operators.addFirst(batch);
+          return batch;
         });
       } catch (InterruptedException | IOException e) {
         final String errMsg = String.format("Failed to create RecordBatch for operator with id '%d'", op.getOperatorId());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java
index 1e8237b..f23f425 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.planner.sql.conversion;
 
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -38,13 +39,17 @@ import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import org.apache.calcite.util.Static;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 class DrillValidator extends SqlValidatorImpl {
 
+  private final boolean isImpersonationEnabled;
+
   DrillValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader,
-                 RelDataTypeFactory typeFactory, SqlConformance conformance) {
+                 RelDataTypeFactory typeFactory, SqlConformance conformance, boolean isImpersonationEnabled) {
     super(opTab, catalogReader, typeFactory, conformance);
+    this.isImpersonationEnabled = isImpersonationEnabled;
   }
 
   @Override
@@ -66,7 +71,14 @@ class DrillValidator extends SqlValidatorImpl {
           }
       }
     }
-    super.validateFrom(node, targetRowType, scope);
+    if (isImpersonationEnabled) {
+      ImpersonationUtil.getProcessUserUGI().doAs((PrivilegedAction<Void>) () -> {
+        super.validateFrom(node, targetRowType, scope);
+        return null;
+      });
+    } else {
+      super.validateFrom(node, targetRowType, scope);
+    }
   }
 
   private void replaceAliasWithActualName(SqlIdentifier tempNode) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/SqlConverter.java
index 03ed970..ce3cf4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/SqlConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/SqlConverter.java
@@ -50,7 +50,6 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.ops.UdfUtilities;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.logical.DrillConstExecutor;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
@@ -88,7 +87,7 @@ public class SqlConverter {
   private final DrillValidator validator;
   private final boolean isInnerQuery;
   private final boolean isExpandedView;
-  private final UdfUtilities util;
+  private final QueryContext util;
   private final FunctionImplementationRegistry functions;
   private final String temporarySchema;
   private final UserSession session;
@@ -139,7 +138,8 @@ public class SqlConverter {
         this::getDefaultSchema);
     this.opTab = new ChainedSqlOperatorTable(Arrays.asList(context.getDrillOperatorTable(), catalog));
     this.costFactory = (settings.useDefaultCosting()) ? null : new DrillCostBase.DrillCostFactory();
-    this.validator = new DrillValidator(opTab, catalog, typeFactory, parserConfig.conformance());
+    this.validator =
+      new DrillValidator(opTab, catalog, typeFactory, parserConfig.conformance(), context.isImpersonationEnabled());
     validator.setIdentifierExpansion(true);
     cluster = null;
   }
@@ -160,7 +160,8 @@ public class SqlConverter {
     this.catalog = catalog;
     this.opTab = parent.opTab;
     this.planner = parent.planner;
-    this.validator = new DrillValidator(opTab, catalog, typeFactory, parserConfig.conformance());
+    this.validator =
+      new DrillValidator(opTab, catalog, typeFactory, parserConfig.conformance(), util.isImpersonationEnabled());
     this.temporarySchema = parent.temporarySchema;
     this.session = parent.session;
     this.drillConfig = parent.drillConfig;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
index 3fe4bc8..98b4793 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
@@ -138,21 +138,12 @@ public class KerberosFactory implements AuthenticatorFactory {
 
     // ignore parts[2]; GSSAPI gets the realm info from the ticket
     try {
-      final SaslClient saslClient = ugi.doAs(new PrivilegedExceptionAction<SaslClient>() {
-
-        @Override
-        public SaslClient run() throws Exception {
-          return FastSaslClientFactory.getInstance().createSaslClient(new String[]{KerberosUtil.KERBEROS_SASL_NAME},
-              null /** authorization ID */, serviceName, serviceHostName, properties,
-              new CallbackHandler() {
-                @Override
-                public void handle(final Callback[] callbacks)
-                    throws IOException, UnsupportedCallbackException {
-                  throw new UnsupportedCallbackException(callbacks[0]);
-                }
-              });
-        }
-      });
+      final SaslClient saslClient = ugi.doAs((PrivilegedExceptionAction<SaslClient>) () ->
+        FastSaslClientFactory.getInstance().createSaslClient(new String[]{ KerberosUtil.KERBEROS_SASL_NAME },
+          null /* authorization ID */, serviceName, serviceHostName, properties,
+          callbacks -> {
+            throw new UnsupportedCallbackException(callbacks[0]);
+          }));
       logger.debug("GSSAPI SaslClient created to authenticate to {} running on {} with QOP value {}",
           serviceName, serviceHostName, qopValue);
       return saslClient;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java
index cf0afcf..40ea0a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java
@@ -58,9 +58,11 @@ public class InboundImpersonationManager {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InboundImpersonationManager.class);
 
   private static final String STAR = "*";
-
   private static final ObjectMapper impersonationPolicyMapper = new ObjectMapper();
 
+  private List<ImpersonationPolicy> impersonationPolicies;
+  private String policiesString; // used to test if policies changed
+
   static {
     impersonationPolicyMapper.configure(JsonGenerator.Feature.QUOTE_FIELD_NAMES, false);
     impersonationPolicyMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
@@ -152,10 +154,6 @@ public class InboundImpersonationManager {
         deserializeImpersonationPolicies(policiesString));
   }
 
-
-  private List<ImpersonationPolicy> impersonationPolicies;
-  private String policiesString; // used to test if policies changed
-
   /**
    * Check if the current session user, as a proxy user, is authorized to impersonate the given target user
    * based on the system's impersonation policies.
@@ -164,8 +162,7 @@ public class InboundImpersonationManager {
    * @param session    user session
    */
   public void replaceUserOnSession(final String targetName, final UserSession session) {
-    final String policiesString = session.getOptions()
-        .getOption(ExecConstants.IMPERSONATION_POLICY_VALIDATOR);
+    final String policiesString = session.getOptions().getOption(ExecConstants.IMPERSONATION_POLICY_VALIDATOR);
     if (!policiesString.equals(this.policiesString)) {
       try {
         impersonationPolicies = deserializeImpersonationPolicies(policiesString);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
index 4e4c80e..da7352e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
@@ -67,7 +67,6 @@ class UserConnectionConfig extends AbstractConnectionConfig {
             maxWrappedSize, RpcConstants.MAX_RECOMMENDED_WRAPPED_SIZE);
       }
       encryptionContext.setMaxWrappedSize(maxWrappedSize);
-
       logger.info("Configured all user connections to require authentication with encryption: {} using: {}",
           encryptionContext.getEncryptionCtxtString(), authProvider.getAllFactoryNames());
     } else if (config.getBoolean(ExecConstants.USER_ENCRYPTION_SASL_ENABLED)) {
@@ -76,16 +75,13 @@ class UserConnectionConfig extends AbstractConnectionConfig {
     } else {
       authEnabled = false;
     }
-    impersonationManager = !config.getBoolean(ExecConstants.IMPERSONATION_ENABLED)
-        ? null
-        : new InboundImpersonationManager();
-
+    impersonationManager = config.getBoolean(ExecConstants.IMPERSONATION_ENABLED)
+        ? new InboundImpersonationManager()
+        : null;
     sslEnabled = config.getBoolean(ExecConstants.USER_SSL_ENABLED);
-
-    if(isSSLEnabled() && isAuthEnabled() && isEncryptionEnabled()){
+    if (isSSLEnabled() && isAuthEnabled() && isEncryptionEnabled()) {
       logger.warn("The server is configured to use both SSL and SASL encryption (only one should be configured).");
     }
-
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
index 00c4755..28aee4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
@@ -407,4 +407,21 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer,
   public boolean areTableNamesCaseSensitive() {
     return true;
   }
+
+  /**
+   * @param impersonated User whom to impersonate. Usually {@link SchemaConfig#getUserName()}
+   * @param notImpersonated Regular user (table owner for Views or Drillbit process user for Tables)
+   * @return endUser
+   */
+  public String getUser(String impersonated, String notImpersonated) {
+    return notImpersonated;
+  }
+
+  /**
+   * Does Drill needs to impersonate as user connected to Drill when reading data from DataSource?
+   * @return True when both Drill impersonation and DataSource impersonation are enabled.
+   */
+  public boolean needToImpersonateReadingData() {
+    return false;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
index e916149..2068295 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
 import org.apache.drill.exec.util.FileSystemUtil;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.Metastore;
 import org.apache.drill.metastore.components.tables.BasicTablesRequests;
@@ -45,8 +46,10 @@ import org.apache.drill.metastore.metadata.MetadataType;
 import org.apache.drill.metastore.statistics.ColumnStatistics;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -181,6 +184,13 @@ public interface RecordCollector {
     @Override
     public List<Records.Column> columns(String schemaPath, SchemaPlus schema) {
       AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
+      UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+      return drillSchema.needToImpersonateReadingData()
+        ? ugi.doAs((PrivilegedAction<List<Records.Column>>) () -> processColumns(schemaPath, schema, drillSchema))
+        : processColumns(schemaPath, schema, drillSchema);
+    }
+
+    private List<Records.Column> processColumns(String schemaPath, SchemaPlus schema, AbstractSchema drillSchema) {
       List<Records.Column> records = new ArrayList<>();
       for (Pair<String, ? extends Table> tableNameToTable : drillSchema.getTablesByNames(schema.getTableNames())) {
         String tableName = tableNameToTable.getKey();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java
index 3edcb1d..d6854be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java
@@ -99,8 +99,6 @@ public class ImpersonationUtil {
       }
       return true;
     }
-
-
   }
   /**
    * Create and return proxy user {@link org.apache.hadoop.security.UserGroupInformation} of operator owner if operator
@@ -127,7 +125,7 @@ public class ImpersonationUtil {
   }
 
   /**
-   * Create and return proxy user {@link org.apache.hadoop.security.UserGroupInformation} for give user name.
+   * Create and return proxy user {@link org.apache.hadoop.security.UserGroupInformation} for given user name.
    *
    * @param proxyUserName Proxy user name (must be valid)
    */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 0e03b01..e13857a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -307,27 +307,24 @@ public class FragmentExecutor implements Runnable {
           ImpersonationUtil.createProxyUgi(fragmentContext.getQueryUserName()) :
           ImpersonationUtil.getProcessUserUGI();
 
-      queryUserUgi.doAs(new PrivilegedExceptionAction<Void>() {
-        @Override
-        public Void run() throws Exception {
-          injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
-
-          while (shouldContinue()) {
-            // Fragment is not cancelled
-
-            for (FragmentHandle fragmentHandle; (fragmentHandle = receiverFinishedQueue.poll()) != null;) {
-              // See if we have any finished requests. If so execute them.
-              root.receivingFragmentFinished(fragmentHandle);
-            }
-
-            if (!root.next()) {
-              // Fragment has processed all of its data
-              break;
-            }
+      queryUserUgi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
+
+        while (shouldContinue()) {
+          // Fragment is not cancelled
+
+          for (FragmentHandle fragmentHandle1; (fragmentHandle1 = receiverFinishedQueue.poll()) != null;) {
+            // See if we have any finished requests. If so execute them.
+            root.receivingFragmentFinished(fragmentHandle1);
           }
 
-          return null;
+          if (!root.next()) {
+            // Fragment has processed all of its data
+            break;
+          }
         }
+
+        return null;
       });
 
     } catch (QueryCancelledException e) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java
index 570d20a..5670b0b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java
@@ -152,4 +152,4 @@ public class KerberosHelper {
       Files.deleteIfExists(file.toPath());
     }
   }
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
index 442fd8e..1a55c4f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
@@ -66,6 +66,11 @@ public class ClientFixture implements AutoCloseable {
       clientProps = cluster.getClientProps();
     }
 
+    protected ClientBuilder(ClusterFixture cluster, Properties properties) {
+      this.cluster = cluster;
+      clientProps = properties;
+    }
+
     /**
      * Specify an optional client property.
      * @param key property name
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index 3a027ed..16f302e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -72,6 +72,8 @@ import org.apache.hadoop.fs.FileSystem;
  * execute queries. Can be used in JUnit tests, or in ad-hoc programs. Provides
  * a builder to set the necessary embedded Drillbit and client options, then
  * creates the requested Drillbit and client.
+ * TODO: To support User Impersonation add Configuration dfsConf and set hadoop.proxyuser settings
+ * similar to {@link org.apache.drill.exec.impersonation.BaseTestImpersonation}
  */
 public class ClusterFixture extends BaseFixture implements AutoCloseable {
   public static final int MAX_WIDTH_PER_NODE = 2;
@@ -302,6 +304,10 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
     return new ClientFixture.ClientBuilder(this);
   }
 
+  public ClientFixture.ClientBuilder clientBuilder(Properties properties) {
+    return new ClientFixture.ClientBuilder(this, properties);
+  }
+
   public RestClientFixture.Builder restClientBuilder() {
     return new RestClientFixture.Builder(this);
   }
@@ -316,6 +322,19 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
   }
 
   /**
+   * It can be used to add one more client for {@link ClusterFixture}. <br>
+   * Note: {@link ClusterTest#client}
+   *
+   * @param properties client Properties (clientProps)
+   * @return new ClientFixture for current ClusterFixture
+   */
+  public ClientFixture addClientFixture(Properties properties) {
+    return clientBuilder(properties)
+      .property(DrillProperties.DRILLBIT_CONNECTION, String.format("localhost:%s", drillbit().getUserPort()))
+      .build();
+  }
+
+  /**
    * Create a test client for a specific host and port.
    *
    * @param host host, must be one of those created by this
@@ -343,6 +362,10 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
     return clientFixture().client();
   }
 
+  public ClientFixture client(int number) {
+    return clients.get(number);
+  }
+
   /**
    * Return a JDBC connection to the default (first) Drillbit.
    * Note that this code requires special setup of the test code.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
index 4b2b4e0..a510c58 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
@@ -115,7 +115,7 @@ public class ClusterFixtureBuilder {
   /**
    * Add an additional boot-time property for the embedded Drillbit.
    * @param key config property name
-   * @param value property value
+   * @param value String property value
    * @return this builder
    */
   public ClusterFixtureBuilder configProperty(String key, Object value) {
@@ -123,6 +123,17 @@ public class ClusterFixtureBuilder {
     return this;
   }
 
+  /**
+   * Non-string {@link #configProperty}
+   * @param key config property name
+   * @param value property value
+   * @return this builder
+   */
+  public ClusterFixtureBuilder configNonStringProperty(String key, Object value) {
+    configBuilder.put(key, value);
+    return this;
+  }
+
   public ClusterFixtureBuilder putDefinition(OptionDefinition definition) {
     configBuilder.putDefinition(definition);
     return this;
diff --git a/pom.xml b/pom.xml
index 76e4bfc..2047d0d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,7 @@
     <proto.cas.path>${project.basedir}/src/main/protobuf/</proto.cas.path>
     <junit.version>5.7.2</junit.version>
     <junit4.version>4.13.2</junit4.version>
+    <junit.platform.version>1.8.2</junit.platform.version>
     <slf4j.version>1.7.26</slf4j.version>
     <shaded.guava.version>28.2-jre</shaded.guava.version>
     <guava.version>30.1.1-jre</guava.version>
@@ -1143,6 +1144,30 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.junit.platform</groupId>
+      <artifactId>junit-platform-suite-api</artifactId>
+      <version>${junit.platform.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.platform</groupId>
+      <artifactId>junit-platform-suite-engine</artifactId>
+      <version>${junit.platform.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.platform</groupId>
+      <artifactId>junit-platform-engine</artifactId>
+      <version>${junit.platform.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.platform</groupId>
+      <artifactId>junit-platform-launcher</artifactId>
+      <version>${junit.platform.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.junit.vintage</groupId>
       <artifactId>junit-vintage-engine</artifactId>
       <scope>test</scope>
@@ -2269,6 +2294,10 @@
                 <groupId>com.nimbusds</groupId>
                 <artifactId>nimbus-jose-jwt</artifactId>
               </exclusion>
+              <exclusion>
+                <groupId>org.slf4j</groupId>
+                <artifactId>*</artifactId>
+              </exclusion>
             </exclusions>
           </dependency>
           <dependency>