You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by lu...@apache.org on 2022/01/29 02:37:18 UTC
[drill] branch master updated: DRILL-8061: Add Impersonation Support for Phoenix (#2422)
This is an automated email from the ASF dual-hosted git repository.
luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 2decae1 DRILL-8061: Add Impersonation Support for Phoenix (#2422)
2decae1 is described below
commit 2decae18b85eeda51816e92d5a9e9e6e2f9ce8d5
Author: Vitalii Diravka <vi...@apache.org>
AuthorDate: Sat Jan 29 04:36:42 2022 +0200
DRILL-8061: Add Impersonation Support for Phoenix (#2422)
---
.../org/apache/drill/common/PlanStringBuilder.java | 2 +-
.../drill/common/config/DrillProperties.java | 3 +
.../org/apache/drill/categories/RowSetTests.java | 9 +-
.../exec/store/hive/schema/HiveSchemaFactory.java | 21 +-
contrib/storage-phoenix/README.md | 37 +-
contrib/storage-phoenix/pom.xml | 96 ++++-
.../exec/store/phoenix/PhoenixBatchReader.java | 56 ++-
.../exec/store/phoenix/PhoenixDataSource.java | 106 +++---
.../drill/exec/store/phoenix/PhoenixGroupScan.java | 14 +-
.../drill/exec/store/phoenix/PhoenixReader.java | 7 +-
.../exec/store/phoenix/PhoenixSchemaFactory.java | 81 +++--
.../exec/store/phoenix/PhoenixStoragePlugin.java | 95 +++--
.../drill/exec/store/phoenix/PhoenixSubScan.java | 12 +-
.../exec/store/phoenix/rules/PhoenixPrel.java | 11 +-
.../drill/exec/store/phoenix/PhoenixBaseTest.java | 44 ++-
.../exec/store/phoenix/PhoenixCommandTest.java | 1 +
.../drill/exec/store/phoenix/PhoenixTestSuite.java | 6 +-
.../exec/store/phoenix/QueryServerBasicsIT.java | 5 +
.../exec/store/phoenix/QueryServerThread.java | 45 ---
.../HttpParamImpersonationQueryServerIT.java | 115 ++++++
.../phoenix/secured/QueryServerEnvironment.java | 367 +++++++++++++++++++
.../phoenix/secured/SecuredPhoenixBaseTest.java | 188 ++++++++++
.../SecuredPhoenixCommandTest.java} | 55 ++-
.../secured/SecuredPhoenixDataTypeTest.java | 136 +++++++
.../phoenix/secured/SecuredPhoenixSQLTest.java | 390 +++++++++++++++++++++
.../SecuredPhoenixTestSuite.java} | 56 +--
.../apache/drill/exec/ops/OperatorContextImpl.java | 7 +-
.../drill/exec/physical/base/PhysicalOperator.java | 1 -
.../drill/exec/physical/impl/ImplCreator.java | 23 +-
.../planner/sql/conversion/DrillValidator.java | 16 +-
.../exec/planner/sql/conversion/SqlConverter.java | 9 +-
.../rpc/security/kerberos/KerberosFactory.java | 21 +-
.../exec/rpc/user/InboundImpersonationManager.java | 11 +-
.../drill/exec/rpc/user/UserConnectionConfig.java | 12 +-
.../apache/drill/exec/store/AbstractSchema.java | 17 +
.../drill/exec/store/ischema/RecordCollector.java | 10 +
.../apache/drill/exec/util/ImpersonationUtil.java | 4 +-
.../drill/exec/work/fragment/FragmentExecutor.java | 33 +-
.../drill/exec/rpc/security/KerberosHelper.java | 2 +-
.../java/org/apache/drill/test/ClientFixture.java | 5 +
.../java/org/apache/drill/test/ClusterFixture.java | 23 ++
.../apache/drill/test/ClusterFixtureBuilder.java | 13 +-
pom.xml | 29 ++
43 files changed, 1817 insertions(+), 377 deletions(-)
diff --git a/common/src/main/java/org/apache/drill/common/PlanStringBuilder.java b/common/src/main/java/org/apache/drill/common/PlanStringBuilder.java
index ef04a9a..c4f58fd 100644
--- a/common/src/main/java/org/apache/drill/common/PlanStringBuilder.java
+++ b/common/src/main/java/org/apache/drill/common/PlanStringBuilder.java
@@ -79,7 +79,7 @@ public class PlanStringBuilder {
public PlanStringBuilder field(String key, Object value) {
if (value != null) {
startField(key);
- buf.append(value.toString());
+ buf.append(value);
}
return this;
}
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillProperties.java b/common/src/main/java/org/apache/drill/common/config/DrillProperties.java
index 4df66f9..7726808 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillProperties.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillProperties.java
@@ -46,6 +46,9 @@ public final class DrillProperties extends Properties {
public static final String PASSWORD = "password";
+ /**
+ * Impersonation target name for Drill Inbound Impersonation
+ */
public static final String IMPERSONATION_TARGET = "impersonation_target";
public static final String AUTH_MECHANISM = "auth";
diff --git a/common/src/test/java/org/apache/drill/categories/RowSetTests.java b/common/src/test/java/org/apache/drill/categories/RowSetTests.java
index 98ad675..eb83006 100644
--- a/common/src/test/java/org/apache/drill/categories/RowSetTests.java
+++ b/common/src/test/java/org/apache/drill/categories/RowSetTests.java
@@ -18,9 +18,12 @@
package org.apache.drill.categories;
/**
- * A category for tests that test the RowSet, ResultSetLoader
- * and related mechanisms.
+ * Junit category marker. <br>
+ * A category for tests that test the RowSet, ResultSetLoader and related mechanisms.
*/
public interface RowSetTests {
- // Junit category marker
+ /**
+ * tag for JUnit5
+ */
+ String TAG = "row-set-tests";
}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
index 36e32bd..e82274e 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
@@ -95,22 +95,11 @@ public class HiveSchemaFactory extends AbstractSchemaFactory {
}
/**
- * Does Drill needs to impersonate as user connected to Drill when reading data from Hive warehouse location?
- * @return True when both Drill impersonation and Hive impersonation are enabled.
- */
- private boolean needToImpersonateReadingData() {
- return isDrillImpersonationEnabled && isHS2DoAsSet;
- }
-
- /**
* Close this schema factory in preparation for retrying. Attempt to close
* connections, but just ignore any errors.
*/
public void close() {
- AutoCloseables.closeSilently(
- processUserMetastoreClient::close,
- metaStoreClientLoadingCache::invalidateAll
- );
+ AutoCloseables.closeSilently(processUserMetastoreClient, metaStoreClientLoadingCache::invalidateAll);
}
@Override
@@ -220,7 +209,8 @@ public class HiveSchemaFactory extends AbstractSchemaFactory {
: new DrillHiveTable(getName(), plugin, getUser(schemaUser, getProcessUserName()), entry);
}
- private String getUser(String impersonated, String notImpersonated) {
+ @Override
+ public String getUser(String impersonated, String notImpersonated) {
return needToImpersonateReadingData() ? impersonated : notImpersonated;
}
@@ -245,6 +235,11 @@ public class HiveSchemaFactory extends AbstractSchemaFactory {
public String getTypeName() {
return HiveStoragePluginConfig.NAME;
}
+
+ @Override
+ public boolean needToImpersonateReadingData() {
+ return isDrillImpersonationEnabled && isHS2DoAsSet;
+ }
}
}
diff --git a/contrib/storage-phoenix/README.md b/contrib/storage-phoenix/README.md
index ab178b3..01278b5 100644
--- a/contrib/storage-phoenix/README.md
+++ b/contrib/storage-phoenix/README.md
@@ -11,32 +11,24 @@
Features :
- Full support for Enhanced Vector Framework.
-
- Tested in phoenix 4.14 and 5.1.2.
-
- Support the array data type.
-
- Support the pushdown (Project, Limit, Filter, Aggregate, Join, CrossJoin, Join_Filter, GroupBy, Distinct and more).
-
- Use the PQS client (6.0).
Related Information :
1. PHOENIX-6398: Returns uniform SQL dialect in calcite for the PQS
-
2. PHOENIX-6582: Bump default HBase version to 2.3.7 and 2.4.8
-
3. PHOENIX-6605, PHOENIX-6606 and PHOENIX-6607.
-
4. DRILL-8060, DRILL-8061 and DRILL-8062.
-
5. [QueryServer 6.0.0-drill-r1](https://github.com/luocooong/phoenix-queryserver/releases/tag/6.0.0-drill-r1)
## Configuration
To connect Drill to Phoenix, create a new storage plugin with the following configuration :
-Option 1 (Use the host and port) :
+Option 1 (Use the host and port):
```json
{
@@ -74,7 +66,7 @@ Use the connection properties :
Tips :
* More connection properties, see also [PQS Configuration](http://phoenix.apache.org/server.html).
* If you provide the `jdbcURL`, the connection will ignore the value of `host` and `port`.
- * If you extended the authentication of QueryServer, you can also pass the `userName` and `password`.
+ * If you [extended the authentication of QueryServer](https://github.com/Boostport/avatica/issues/28), you can also pass the `userName` and `password`.
```json
{
@@ -87,6 +79,16 @@ Tips :
}
```
+### Impersonation
+Configurations :
+1. Enable [Drill User Impersonation](https://drill.apache.org/docs/configuring-user-impersonation/)
+2. Enable [PQS Impersonation](https://phoenix.apache.org/server.html#Impersonation)
+3. PQS URL:
+ 1. Provide `host` and `port` and Drill will generate the PQS URL with a doAs parameter of current session user
+ 2. Provide the `jdbcURL` with a `doAs` url param and `$user` placeholder as a value, for instance:
+ `jdbc:phoenix:thin:url=http://localhost:8765?doAs=$user`. In case Drill Impersonation is enabled, but `doAs=$user`
+ is missing the User Exception is thrown.
+
## Testing
The test framework of phoenix queryserver required the Hadoop 3, but exist `PHOENIX-5993` and `HBASE-22394` :
@@ -101,10 +103,23 @@ requires a recompilation of HBase because of incompatible changes between Hadoop
1. Download HBase 2.4.2 sources and rebuild with Hadoop 3.
- 2. Remove the `Ignore` annotation in `PhoenixTestSuite.java`.
+ ```mvn clean install -DskipTests -Dhadoop.profile=3.0 -Dhadoop-three.version=3.2.2```
+ 2. Remove the `Ignore` annotation in `PhoenixTestSuite.java`.
+
+ ```
+ @Ignore
+ @Category({ SlowTest.class })
+ public class PhoenixTestSuite extends ClusterTest {
+ ```
+
3. Go to the phoenix root folder and run test.
+ ```
+ cd contrib/storage-phoenix/
+ mvn test
+ ```
+
### To Add Features
- Don't forget to add a test function to the test class.
diff --git a/contrib/storage-phoenix/pom.xml b/contrib/storage-phoenix/pom.xml
index 5d85575..277641e 100644
--- a/contrib/storage-phoenix/pom.xml
+++ b/contrib/storage-phoenix/pom.xml
@@ -33,6 +33,7 @@
<phoenix.version>5.1.2</phoenix.version>
<!-- Keep the 2.4.2 to reduce dependency conflict -->
<hbase.minicluster.version>2.4.2</hbase.minicluster.version>
+ <jetty.test.version>9.4.31.v20200723</jetty.test.version>
</properties>
<dependencies>
@@ -92,6 +93,14 @@
</exclusions>
</dependency>
<dependency>
+ <!-- PHOENIX-6605 -->
+ <groupId>com.github.luocooong.phoenix-queryserver</groupId>
+ <artifactId>phoenix-queryserver-it</artifactId>
+ <version>6.0.0-drill-r1</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>${phoenix.version}</version>
@@ -110,7 +119,7 @@
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
- <exclusion>
+ <exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
@@ -143,6 +152,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.kerby</groupId>
+ <artifactId>kerb-core</artifactId>
+ <version>1.0.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-it</artifactId>
<version>${hbase.minicluster.version}</version>
@@ -188,17 +203,96 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-asyncfs</artifactId>
+ <type>test-jar</type>
+ <version>${hbase.minicluster.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minikdc</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <version>${hbase.minicluster.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
<version>${univocity-parsers.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>${jetty.test.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ <version>${jetty.test.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>${jetty.test.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
index c56369b..db48603 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.store.phoenix;
-import java.sql.Connection;
+import java.security.PrivilegedAction;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@@ -30,6 +30,7 @@ import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
@@ -48,17 +49,23 @@ import org.apache.drill.exec.store.phoenix.PhoenixReader.GenericDateDefn;
import org.apache.drill.exec.store.phoenix.PhoenixReader.GenericDefn;
import org.apache.drill.exec.store.phoenix.PhoenixReader.GenericTimeDefn;
import org.apache.drill.exec.store.phoenix.PhoenixReader.GenericTimestampDefn;
+import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.LoggerFactory;
+import javax.sql.DataSource;
+
+
public class PhoenixBatchReader implements ManagedReader<SchemaNegotiator> {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PhoenixBatchReader.class);
private final PhoenixSubScan subScan;
+ private final boolean impersonationEnabled;
+ private final UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
private CustomErrorContext errorContext;
private PhoenixReader reader;
- private Connection conn;
private PreparedStatement pstmt;
private ResultSet results;
private ResultSetMetaData meta;
@@ -67,22 +74,29 @@ public class PhoenixBatchReader implements ManagedReader<SchemaNegotiator> {
public PhoenixBatchReader(PhoenixSubScan subScan) {
this.subScan = subScan;
+ this.impersonationEnabled = subScan.getPlugin().getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
}
@Override
public boolean open(SchemaNegotiator negotiator) {
+ return impersonationEnabled
+ ? ugi.doAs((PrivilegedAction<Boolean>) () -> processOpen(negotiator))
+ : processOpen(negotiator);
+ }
+
+ private boolean processOpen(SchemaNegotiator negotiator) {
try {
errorContext = negotiator.parentErrorContext();
- conn = subScan.getPlugin().getDataSource().getConnection();
- pstmt = conn.prepareStatement(subScan.getSql());
+ DataSource ds = subScan.getPlugin().getDataSource(negotiator.userName());
+ pstmt = ds.getConnection().prepareStatement(subScan.getSql());
results = pstmt.executeQuery();
meta = pstmt.getMetaData();
} catch (SQLException e) {
throw UserException
- .dataReadError(e)
- .message("Failed to execute the phoenix sql query. " + e.getMessage())
- .addContext(errorContext)
- .build(logger);
+ .dataReadError(e)
+ .message("Failed to execute the phoenix sql query. " + e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
}
try {
negotiator.tableSchema(defineMetadata(), true);
@@ -90,10 +104,10 @@ public class PhoenixBatchReader implements ManagedReader<SchemaNegotiator> {
bindColumns(reader.getStorage());
} catch (SQLException e) {
throw UserException
- .dataReadError(e)
- .message("Failed to get type of columns from metadata. " + e.getMessage())
- .addContext(errorContext)
- .build(logger);
+ .dataReadError(e)
+ .message("Failed to get type of columns from metadata. " + e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
}
watch = Stopwatch.createStarted();
return true;
@@ -101,8 +115,14 @@ public class PhoenixBatchReader implements ManagedReader<SchemaNegotiator> {
@Override
public boolean next() {
+ return impersonationEnabled
+ ? ugi.doAs((PrivilegedAction<Boolean>) this::processNext)
+ : processNext();
+ }
+
+ private boolean processNext() {
try {
- while(!reader.getStorage().isFull()) {
+ while (!reader.getStorage().isFull()) {
if (!reader.processRow()) { // return true if one row is processed.
watch.stop();
logger.debug("Phoenix fetch total record numbers : {}", reader.getRowCount());
@@ -112,17 +132,17 @@ public class PhoenixBatchReader implements ManagedReader<SchemaNegotiator> {
return true; // batch full but not reached the EOF.
} catch (SQLException e) {
throw UserException
- .dataReadError(e)
- .message("Failed to get the data from the result set. " + e.getMessage())
- .addContext(errorContext)
- .build(logger);
+ .dataReadError(e)
+ .message("Failed to get the data from the result set. " + e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
}
}
@Override
public void close() {
logger.debug("Phoenix fetch batch size : {}, took {} ms. ", reader.getBatchCount(), watch.elapsed(TimeUnit.MILLISECONDS));
- AutoCloseables.closeSilently(results, pstmt, conn);
+ AutoCloseables.closeSilently(results, pstmt, reader);
}
private TupleMetadata defineMetadata() throws SQLException {
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
index 29512a3..ed8c670 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixDataSource.java
@@ -21,13 +21,14 @@ import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
import java.util.Map;
import java.util.Properties;
import java.util.logging.Logger;
import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
/**
@@ -39,46 +40,51 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
* is not always left in a healthy state by the previous user. It is better to
* create new Phoenix Connections to ensure that you avoid any potential issues.
*/
+@Slf4j
public class PhoenixDataSource implements DataSource {
private static final String DEFAULT_URL_HEADER = "jdbc:phoenix:thin:url=http://";
private static final String DEFAULT_SERIALIZATION = "serialization=PROTOBUF";
+ private static final String IMPERSONATED_USER_VARIABLE = "$user";
+ private static final String DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM = "doAs";
- private String url;
+ private final String url;
+ private final String user;
private Map<String, Object> connectionProperties;
- private boolean isFatClient; // Is a fat client
+ private boolean isFatClient;
- public PhoenixDataSource(String url) {
- Preconditions.checkNotNull(url);
- this.url = url;
- }
-
- public PhoenixDataSource(String host, int port) {
- Preconditions.checkNotNull(host);
- Preconditions.checkArgument(port > 0, "Please set the correct port.");
- this.url = new StringBuilder()
- .append(DEFAULT_URL_HEADER)
- .append(host)
- .append(":")
- .append(port)
- .append(";")
- .append(DEFAULT_SERIALIZATION)
- .toString();
- }
-
- public PhoenixDataSource(String url, Map<String, Object> connectionProperties) {
- this(url);
+ public PhoenixDataSource(String url,
+ String userName,
+ Map<String, Object> connectionProperties,
+ boolean impersonationEnabled) {
+ Preconditions.checkNotNull(url, userName);
Preconditions.checkNotNull(connectionProperties);
connectionProperties.forEach((k, v)
-> Preconditions.checkArgument(v != null, String.format("does not accept null values : %s", k)));
+ this.url = impersonationEnabled ? doAsUserUrl(url, userName) : url;
+ this.user = userName;
this.connectionProperties = connectionProperties;
}
- public PhoenixDataSource(String host, int port, Map<String, Object> connectionProperties) {
- this(host, port);
- Preconditions.checkNotNull(connectionProperties);
+ public PhoenixDataSource(String host,
+ int port,
+ String userName,
+ Map<String, Object> connectionProperties,
+ boolean impersonationEnabled) {
+ Preconditions.checkNotNull(host, userName);
+ Preconditions.checkArgument(port > 0, "Please set the correct port.");
connectionProperties.forEach((k, v)
- -> Preconditions.checkArgument(v != null, String.format("does not accept null values : %s", k)));
+ -> Preconditions.checkArgument(v != null, String.format("does not accept null values : %s", k)));
+ this.url = new StringBuilder()
+ .append(DEFAULT_URL_HEADER)
+ .append(host)
+ .append(":")
+ .append(port)
+ .append(impersonationEnabled ? "?doAs=" + userName : "")
+ .append(";")
+ .append(DEFAULT_SERIALIZATION)
+ .toString();
+ this.user = userName;
this.connectionProperties = connectionProperties;
}
@@ -91,27 +97,27 @@ public class PhoenixDataSource implements DataSource {
}
@Override
- public PrintWriter getLogWriter() throws SQLException {
+ public PrintWriter getLogWriter() {
throw new UnsupportedOperationException("getLogWriter");
}
@Override
- public void setLogWriter(PrintWriter out) throws SQLException {
+ public void setLogWriter(PrintWriter out) {
throw new UnsupportedOperationException("setLogWriter");
}
@Override
- public void setLoginTimeout(int seconds) throws SQLException {
+ public void setLoginTimeout(int seconds) {
throw new UnsupportedOperationException("setLoginTimeout");
}
@Override
- public int getLoginTimeout() throws SQLException {
+ public int getLoginTimeout() {
return 0;
}
@Override
- public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+ public Logger getParentLogger() {
throw new UnsupportedOperationException("getParentLogger");
}
@@ -126,22 +132,21 @@ public class PhoenixDataSource implements DataSource {
}
@Override
- public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ public boolean isWrapperFor(Class<?> iface) {
return iface.isInstance(this);
}
@Override
public Connection getConnection() throws SQLException {
useDriverClass();
- Connection conn = DriverManager.getConnection(url, useConfProperties());
- return conn;
+ return getConnection(this.user, null);
}
@Override
- public Connection getConnection(String username, String password) throws SQLException {
+ public Connection getConnection(String userName, String password) throws SQLException {
useDriverClass();
- Connection conn = DriverManager.getConnection(url, username, password);
- return conn;
+ logger.debug("Drill/Phoenix connection url: {}", url);
+ return DriverManager.getConnection(url, useConfProperties());
}
/**
@@ -150,12 +155,12 @@ public class PhoenixDataSource implements DataSource {
*
* @throws SQLException
*/
- private void useDriverClass() throws SQLException {
+ public Class<?> useDriverClass() throws SQLException {
try {
- if (!isFatClient) {
- Class.forName(PhoenixStoragePluginConfig.THIN_DRIVER_CLASS);
+ if (isFatClient) {
+ return Class.forName(PhoenixStoragePluginConfig.FAT_DRIVER_CLASS);
} else {
- Class.forName(PhoenixStoragePluginConfig.FAT_DRIVER_CLASS);
+ return Class.forName(PhoenixStoragePluginConfig.THIN_DRIVER_CLASS);
}
} catch (ClassNotFoundException e) {
throw new SQLException("Cause by : " + e.getMessage());
@@ -169,12 +174,23 @@ public class PhoenixDataSource implements DataSource {
*/
private Properties useConfProperties() {
Properties props = new Properties();
- props.put("phoenix.trace.frequency", "never");
- props.put("phoenix.query.timeoutMs", 30000);
- props.put("phoenix.query.keepAliveMs", 120000);
if (getConnectionProperties() != null) {
props.putAll(getConnectionProperties());
}
+ props.putIfAbsent("phoenix.trace.frequency", "never");
+ props.putIfAbsent("phoenix.query.timeoutMs", 30000);
+ props.putIfAbsent("phoenix.query.keepAliveMs", 120000);
return props;
}
+
+ private String doAsUserUrl(String url, String userName) {
+ if (url.contains(DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM)) {
+ return url.replace(IMPERSONATED_USER_VARIABLE, userName);
+ } else {
+ throw UserException
+ .connectionError()
+ .message("Invalid PQS URL. Please add the value of the `doAs=$user` parameter if Impersonation is enabled.")
+ .build(logger);
+ }
+ }
}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java
index ae79570..6045a3d 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java
@@ -50,20 +50,21 @@ public class PhoenixGroupScan extends AbstractGroupScan {
@JsonCreator
public PhoenixGroupScan(
+ @JsonProperty("userName") String userName,
@JsonProperty("sql") String sql,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("scanSpec") PhoenixScanSpec scanSpec,
@JsonProperty("config") PhoenixStoragePluginConfig config,
@JacksonInject StoragePluginRegistry plugins) {
- super("no-user");
+ super(userName);
this.sql = sql;
this.columns = columns;
this.scanSpec = scanSpec;
this.plugin = plugins.resolve(config, PhoenixStoragePlugin.class);
}
- public PhoenixGroupScan(PhoenixScanSpec scanSpec, PhoenixStoragePlugin plugin) {
- super("no-user");
+ public PhoenixGroupScan(String userName, PhoenixScanSpec scanSpec, PhoenixStoragePlugin plugin) {
+ super(userName);
this.sql = scanSpec.getSql();
this.columns = ALL_COLUMNS;
this.scanSpec = scanSpec;
@@ -86,8 +87,9 @@ public class PhoenixGroupScan extends AbstractGroupScan {
this.plugin = scan.plugin;
}
- public PhoenixGroupScan(String sql, List<SchemaPath> columns, PhoenixScanSpec scanSpec, PhoenixStoragePlugin plugin) {
- super("no-user");
+ public PhoenixGroupScan(String user, String sql, List<SchemaPath> columns, PhoenixScanSpec scanSpec,
+ PhoenixStoragePlugin plugin) {
+ super(user);
this.sql = sql;
this.columns = columns;
this.scanSpec = scanSpec;
@@ -124,7 +126,7 @@ public class PhoenixGroupScan extends AbstractGroupScan {
@Override
public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
- return new PhoenixSubScan(sql, columns, scanSpec, plugin);
+ return new PhoenixSubScan(userName, sql, columns, scanSpec, plugin);
}
@Override
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixReader.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixReader.java
index 7e2e6d3..0ecc4ae 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixReader.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixReader.java
@@ -38,7 +38,7 @@ import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.writer.ScalarArrayWriter;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
-public class PhoenixReader {
+public class PhoenixReader implements AutoCloseable {
private final RowSetLoader writer;
private final ColumnDefn[] columns;
@@ -104,6 +104,11 @@ public class PhoenixReader {
COLUMN_TYPE_MAP.put(Types.BOOLEAN, MinorType.BIT);
}
+ @Override
+ public void close() throws Exception {
+ results.close();
+ }
+
protected abstract static class ColumnDefn {
final String name;
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
index af82ea8..5c012bc 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
@@ -18,45 +18,63 @@
package org.apache.drill.exec.store.phoenix;
import java.io.IOException;
-import java.sql.Connection;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import javax.sql.DataSource;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.AbstractSchemaFactory;
import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.drill.exec.util.ImpersonationUtil.getProcessUserName;
public class PhoenixSchemaFactory extends AbstractSchemaFactory {
private final PhoenixStoragePlugin plugin;
private final Map<String, PhoenixSchema> schemaMap;
private PhoenixSchema rootSchema;
+ private final boolean isDrillImpersonationEnabled;
+ private final UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
public PhoenixSchemaFactory(PhoenixStoragePlugin plugin) {
super(plugin.getName());
this.plugin = plugin;
- this.schemaMap = Maps.newHashMap();
+ this.schemaMap = new HashMap<>();
+ isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
}
@Override
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
- rootSchema = new PhoenixSchema(plugin, Collections.emptyList(), plugin.getName());
- locateSchemas();
+ try {
+ rootSchema = new PhoenixSchema(schemaConfig, plugin, Collections.emptyList(), plugin.getName());
+ if (isDrillImpersonationEnabled) {
+ ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+ locateSchemas(schemaConfig, rootSchema.getUser(schemaConfig.getUserName(), getProcessUserName()));
+ return null;
+ });
+ } else {
+ locateSchemas(schemaConfig, rootSchema.getUser(schemaConfig.getUserName(), getProcessUserName()));
+ }
+ } catch (SQLException | InterruptedException e) {
+ throw new IOException(e);
+ }
parent.add(getName(), rootSchema); // resolve the top-level schema.
for (String schemaName : rootSchema.getSubSchemaNames()) {
PhoenixSchema schema = (PhoenixSchema) rootSchema.getSubSchema(schemaName);
@@ -64,29 +82,28 @@ public class PhoenixSchemaFactory extends AbstractSchemaFactory {
}
}
- private void locateSchemas() {
- DataSource ds = plugin.getDataSource();
- try (Connection conn = ds.getConnection();
- ResultSet rs = ds.getConnection().getMetaData().getSchemas()) {
+ private void locateSchemas(SchemaConfig schemaConfig, String userName) throws SQLException {
+ try (ResultSet rs = plugin.getDataSource(userName).getConnection().getMetaData().getSchemas()) {
while (rs.next()) {
final String schemaName = rs.getString(1); // lookup the schema (or called database).
- PhoenixSchema schema = new PhoenixSchema(plugin, Arrays.asList(getName()), schemaName);
+ PhoenixSchema schema = new PhoenixSchema(schemaConfig, plugin, Arrays.asList(getName()), schemaName);
schemaMap.put(schemaName, schema);
}
rootSchema.addSchemas(schemaMap);
- } catch (SQLException e) {
- throw new DrillRuntimeException(e.getMessage(), e);
}
}
- protected static class PhoenixSchema extends AbstractSchema {
-
+ class PhoenixSchema extends AbstractSchema {
private final JdbcSchema jdbcSchema;
private final Map<String, PhoenixSchema> schemaMap = CaseInsensitiveMap.newHashMap();
- public PhoenixSchema(PhoenixStoragePlugin plugin, List<String> parentSchemaPath, String schemaName) {
+ public PhoenixSchema(SchemaConfig schemaConfig,
+ PhoenixStoragePlugin plugin,
+ List<String> parentSchemaPath,
+ String schemaName) throws SQLException {
super(parentSchemaPath, schemaName);
- this.jdbcSchema = new JdbcSchema(plugin.getDataSource(), plugin.getDialect(), plugin.getConvention(), null, schemaName);
+ this.jdbcSchema = new JdbcSchema(plugin.getDataSource(schemaConfig.getUserName()), plugin.getDialect(),
+ plugin.getConvention(), null, schemaName);
}
@Override
@@ -101,14 +118,16 @@ public class PhoenixSchemaFactory extends AbstractSchemaFactory {
@Override
public Table getTable(String name) {
- Table table = jdbcSchema.getTable(StringUtils.upperCase(name));
- return table;
+ return isDrillImpersonationEnabled
+ ? ugi.doAs((PrivilegedAction<Table>) () -> jdbcSchema.getTable(StringUtils.upperCase(name)))
+ : jdbcSchema.getTable(StringUtils.upperCase(name));
}
@Override
public Set<String> getTableNames() {
- Set<String> tables = jdbcSchema.getTableNames();
- return tables;
+ return isDrillImpersonationEnabled
+ ? ugi.doAs((PrivilegedAction<Set<String>>) jdbcSchema::getTableNames)
+ : jdbcSchema.getTableNames();
}
@Override
@@ -116,13 +135,23 @@ public class PhoenixSchemaFactory extends AbstractSchemaFactory {
return PhoenixStoragePluginConfig.NAME;
}
- public void addSchemas(Map<String, PhoenixSchema> schemas) {
- schemaMap.putAll(schemas);
- }
-
@Override
public boolean areTableNamesCaseSensitive() {
return false;
}
+
+ @Override
+ public String getUser(String impersonated, String notImpersonated) {
+ return needToImpersonateReadingData() ? impersonated : notImpersonated;
+ }
+
+ @Override
+ public boolean needToImpersonateReadingData() {
+ return isDrillImpersonationEnabled;
+ }
+
+ public void addSchemas(Map<String, PhoenixSchema> schemas) {
+ schemaMap.putAll(schemas);
+ }
}
}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
index 15b4103..b46fef2 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
@@ -18,43 +18,63 @@
package org.apache.drill.exec.store.phoenix;
import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.sql.SQLException;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
-import javax.sql.DataSource;
-
-import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlDialect;
-import org.apache.calcite.sql.SqlDialectFactoryImpl;
+import org.apache.calcite.sql.dialect.PhoenixSqlDialect;
import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.phoenix.rules.PhoenixConvention;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import org.apache.hadoop.security.UserGroupInformation;
public class PhoenixStoragePlugin extends AbstractStoragePlugin {
private final PhoenixStoragePluginConfig config;
- private final DataSource dataSource;
private final SqlDialect dialect;
private final PhoenixConvention convention;
private final PhoenixSchemaFactory schemaFactory;
+ private final boolean impersonationEnabled;
+ private final LoadingCache<String, PhoenixDataSource> CACHE = CacheBuilder.newBuilder()
+ .maximumSize(5) // Up to 5 clients for impersonation-enabled.
+ .expireAfterAccess(10, TimeUnit.MINUTES)
+ .build(new CacheLoader<String, PhoenixDataSource>() {
+ @Override
+ public PhoenixDataSource load(String userName) {
+ UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+ return impersonationEnabled
+ ? ugi.doAs((PrivilegedAction<PhoenixDataSource>) () -> createDataSource(userName))
+ : createDataSource(userName);
+ }
+ });
public PhoenixStoragePlugin(PhoenixStoragePluginConfig config, DrillbitContext context, String name) {
super(context, name);
this.config = config;
- this.dataSource = initNoPoolingDataSource(config);
- this.dialect = JdbcSchema.createDialect(SqlDialectFactoryImpl.INSTANCE, dataSource);
+ this.dialect = PhoenixSqlDialect.DEFAULT;
this.convention = new PhoenixConvention(dialect, name, this);
this.schemaFactory = new PhoenixSchemaFactory(this);
+ this.impersonationEnabled = context.getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
}
@Override
@@ -62,18 +82,6 @@ public class PhoenixStoragePlugin extends AbstractStoragePlugin {
return config;
}
- public DataSource getDataSource() {
- return dataSource;
- }
-
- public SqlDialect getDialect() {
- return dialect;
- }
-
- public PhoenixConvention getConvention() {
- return convention;
- }
-
@Override
public boolean supportsRead() {
return true;
@@ -91,25 +99,42 @@ public class PhoenixStoragePlugin extends AbstractStoragePlugin {
@Override
public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
- PhoenixScanSpec scanSpec = selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<PhoenixScanSpec>() {});
- return new PhoenixGroupScan(scanSpec, this);
+ PhoenixScanSpec scanSpec =
+ selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<PhoenixScanSpec>() {});
+ return new PhoenixGroupScan(userName, scanSpec, this);
}
- private static DataSource initNoPoolingDataSource(PhoenixStoragePluginConfig config) {
- // Don't use the pool with the connection
- PhoenixDataSource dataSource = null;
- if (StringUtils.isNotBlank(config.getJdbcURL())) {
- dataSource = new PhoenixDataSource(config.getJdbcURL(), config.getProps()); // the props is initiated.
- } else {
- dataSource = new PhoenixDataSource(config.getHost(), config.getPort(), config.getProps());
+ @Override
+ public void close() {
+ AutoCloseables.closeSilently(CACHE::invalidateAll);
+ }
+
+ public SqlDialect getDialect() {
+ return dialect;
+ }
+
+ public PhoenixConvention getConvention() {
+ return convention;
+ }
+
+ public PhoenixDataSource getDataSource(String userName) throws SQLException {
+ try {
+ return CACHE.get(userName);
+ } catch (final ExecutionException e) {
+ throw new SQLException("Failure setting up Phoenix DataSource (PQS client)", e);
}
+ }
+
+ private PhoenixDataSource createDataSource(String userName) {
+ // Don't use the pool with the connection
+ Map<String, Object> props = config.getProps();
if (config.getUsername() != null && config.getPassword() != null) {
- if (dataSource.getConnectionProperties() == null) {
- dataSource.setConnectionProperties(Maps.newHashMap());
- }
- dataSource.getConnectionProperties().put("user", config.getUsername());
- dataSource.getConnectionProperties().put("password", config.getPassword());
+ props.put("user", config.getUsername());
+ props.put("password", config.getPassword());
}
- return dataSource;
+ boolean impersonationEnabled = context.getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
+ return StringUtils.isNotBlank(config.getJdbcURL())
+ ? new PhoenixDataSource(config.getJdbcURL(), userName, props, impersonationEnabled) // the props is initiated.
+ : new PhoenixDataSource(config.getHost(), config.getPort(), userName, props, impersonationEnabled);
}
}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSubScan.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSubScan.java
index 2c81d4e..a57d5e3 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSubScan.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSubScan.java
@@ -21,7 +21,6 @@ import java.util.Iterator;
import java.util.List;
import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractBase;
@@ -47,20 +46,21 @@ public class PhoenixSubScan extends AbstractBase implements SubScan {
@JsonCreator
public PhoenixSubScan(
+ @JsonProperty("userName") String userName,
@JsonProperty("sql") String sql,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("scanSpec") PhoenixScanSpec scanSpec,
@JsonProperty("config") StoragePluginConfig config,
@JacksonInject StoragePluginRegistry registry) {
- super("no-user");
+ super(userName);
this.sql = sql;
this.columns = columns;
this.scanSpec = scanSpec;
this.plugin = registry.resolve(config, PhoenixStoragePlugin.class);
}
- public PhoenixSubScan(String sql, List<SchemaPath> columns, PhoenixScanSpec scanSpec, PhoenixStoragePlugin plugin) {
- super("no-user");
+ public PhoenixSubScan(String userName, String sql, List<SchemaPath> columns, PhoenixScanSpec scanSpec, PhoenixStoragePlugin plugin) {
+ super(userName);
this.sql = sql;
this.columns = columns;
this.scanSpec = scanSpec;
@@ -98,8 +98,8 @@ public class PhoenixSubScan extends AbstractBase implements SubScan {
}
@Override
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
- return new PhoenixSubScan(sql, columns, scanSpec, plugin);
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ return new PhoenixSubScan(userName, sql, columns, scanSpec, plugin);
}
@Override
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixPrel.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixPrel.java
index 725a93b..47a71e6 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixPrel.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixPrel.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.phoenix.rules;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -67,15 +66,15 @@ public class PhoenixPrel extends AbstractRelNode implements Prel {
}
@Override
- public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator)
- throws IOException {
- List<SchemaPath> schemaPaths = new ArrayList<SchemaPath>(rowType.getFieldCount());
- List<String> columns = new ArrayList<String>(rowType.getFieldCount());
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) {
+ List<SchemaPath> schemaPaths = new ArrayList<>(rowType.getFieldCount());
+ List<String> columns = new ArrayList<>(rowType.getFieldCount());
for (String col : rowType.getFieldNames()) {
schemaPaths.add(SchemaPath.getSimplePath(col));
columns.add(SchemaPath.getSimplePath(col).rootName());
}
- PhoenixGroupScan output = new PhoenixGroupScan(sql, schemaPaths, new PhoenixScanSpec(sql, columns, rows), convention.getPlugin());
+ PhoenixGroupScan output = new PhoenixGroupScan(creator.getContext().getQueryUserName(), sql, schemaPaths,
+ new PhoenixScanSpec(sql, columns, rows), convention.getPlugin());
return creator.addMetadata(this, output);
}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixBaseTest.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixBaseTest.java
index fd41b70..9aea261 100644
--- a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixBaseTest.java
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixBaseTest.java
@@ -38,9 +38,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterFixtureBuilder;
import org.apache.drill.test.ClusterTest;
import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.LoggerFactory;
@@ -51,38 +53,48 @@ public class PhoenixBaseTest extends ClusterTest {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PhoenixBaseTest.class);
- private static AtomicInteger initCount = new AtomicInteger(0);
- protected static String U_U_I_D = UUID.randomUUID().toString();
+ public final static String U_U_I_D = UUID.randomUUID().toString();
+ private final static AtomicInteger initCount = new AtomicInteger(0);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+ PhoenixTestSuite.initPhoenixQueryServer();
if (PhoenixTestSuite.isRunningSuite()) {
QueryServerBasicsIT.testCatalogs();
}
- bootMiniCluster();
+ startDrillCluster();
if (initCount.incrementAndGet() == 1) {
- createSchema();
- createTables();
- createSampleData();
+ createSchema(QueryServerBasicsIT.CONN_STRING);
+ createTables(QueryServerBasicsIT.CONN_STRING);
+ createSampleData(QueryServerBasicsIT.CONN_STRING);
}
}
- public static void bootMiniCluster() throws Exception {
- ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher);
+ @AfterClass
+ public static void tearDownCluster() throws Exception {
+ if (!PhoenixTestSuite.isRunningSuite()) {
+ PhoenixTestSuite.tearDownCluster();
+ }
+ }
+
+ public static void startDrillCluster() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
startCluster(builder);
- dirTestWatcher.copyResourceToRoot(Paths.get(""));
Map<String, Object> props = Maps.newHashMap();
props.put("phoenix.query.timeoutMs", 90000);
props.put("phoenix.query.keepAliveMs", "30000");
StoragePluginRegistry registry = cluster.drillbit().getContext().getStorage();
- PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, 0, null, null, QueryServerBasicsIT.CONN_STRING, null, props);
+ PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, 0, null, null,
+ QueryServerBasicsIT.CONN_STRING, null, props);
config.setEnabled(true);
registry.put(PhoenixStoragePluginConfig.NAME + "123", config);
+ dirTestWatcher.copyResourceToRoot(Paths.get(""));
}
- public static void createSchema() throws Exception {
- try (final Connection connection = DriverManager.getConnection(QueryServerBasicsIT.CONN_STRING)) {
+ public static void createSchema(String connString) throws Exception {
+ try (final Connection connection = DriverManager.getConnection(connString)) {
+ logger.debug("Phoenix connection established with the specified url : {}", connString);
assertFalse(connection.isClosed());
connection.setAutoCommit(true);
try (final Statement stmt = connection.createStatement()) {
@@ -91,8 +103,8 @@ public class PhoenixBaseTest extends ClusterTest {
}
}
- public static void createTables() throws Exception {
- try (final Connection connection = DriverManager.getConnection(QueryServerBasicsIT.CONN_STRING)) {
+ public static void createTables(String connString) throws Exception {
+ try (final Connection connection = DriverManager.getConnection(connString)) {
assertFalse(connection.isClosed());
connection.setAutoCommit(true);
try (final Statement stmt = connection.createStatement()) {
@@ -149,7 +161,7 @@ public class PhoenixBaseTest extends ClusterTest {
}
}
- public static void createSampleData() throws Exception {
+ public static void createSampleData(String connString) throws Exception {
final String[] paths = new String[] { "data/region.tbl", "data/nation.tbl" };
final String[] sqls = new String[] {
"UPSERT INTO V1.REGION VALUES(?,?,?)",
@@ -163,7 +175,7 @@ public class PhoenixBaseTest extends ClusterTest {
logger.info("Loading the .tbl file : " + Arrays.toString(paths));
List<String[]> allRows = parseTblFile(String.valueOf(region_path));
- try (final Connection connection = DriverManager.getConnection(QueryServerBasicsIT.CONN_STRING)) {
+ try (final Connection connection = DriverManager.getConnection(connString)) {
assertFalse(connection.isClosed());
connection.setAutoCommit(false);
// region table
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixCommandTest.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixCommandTest.java
index 4fa5ac5..43fc645 100644
--- a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixCommandTest.java
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixCommandTest.java
@@ -39,6 +39,7 @@ public class PhoenixCommandTest extends PhoenixBaseTest {
@Test
public void testShowTablesLike() throws Exception {
+ runAndPrint("SHOW SCHEMAS");
run("USE phoenix123.v1");
assertEquals(1, queryBuilder().sql("SHOW TABLES LIKE '%REGION%'").run().recordCount());
}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixTestSuite.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixTestSuite.java
index 1817b76..a0f19ff 100644
--- a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixTestSuite.java
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixTestSuite.java
@@ -21,7 +21,7 @@ import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.categories.SlowTest;
-import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.BaseTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -31,6 +31,7 @@ import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
import org.slf4j.LoggerFactory;
+
@RunWith(Suite.class)
@SuiteClasses ({
PhoenixDataTypeTest.class,
@@ -39,7 +40,7 @@ import org.slf4j.LoggerFactory;
})
@Ignore
@Category({ SlowTest.class })
-public class PhoenixTestSuite extends ClusterTest {
+public class PhoenixTestSuite extends BaseTest {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PhoenixTestSuite.class);
@@ -65,7 +66,6 @@ public class PhoenixTestSuite extends ClusterTest {
if (initCount.decrementAndGet() == 0) {
logger.info("Shutdown all instances of test cluster.");
QueryServerBasicsIT.afterClass();
- shutdown();
}
}
}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/QueryServerBasicsIT.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/QueryServerBasicsIT.java
index 5105cbd..4a48bfc 100644
--- a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/QueryServerBasicsIT.java
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/QueryServerBasicsIT.java
@@ -29,12 +29,17 @@ import java.sql.ResultSetMetaData;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.end2end.QueryServerThread;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.queryserver.QueryServerProperties;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.ThinClientUtil;
import org.slf4j.LoggerFactory;
+/**
+ * This is a copy of {@link org.apache.phoenix.end2end.QueryServerBasicsIT} until
+ * <a href="https://issues.apache.org/jira/browse/PHOENIX-6613">PHOENIX-6613</a> is fixed
+ */
public class QueryServerBasicsIT extends BaseTest {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(QueryServerBasicsIT.class);
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/QueryServerThread.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/QueryServerThread.java
deleted file mode 100644
index 84c9bc3..0000000
--- a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/QueryServerThread.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.phoenix;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.phoenix.queryserver.server.QueryServer;
-
-/** Wraps up the query server for tests. */
-public class QueryServerThread extends Thread {
-
- private final QueryServer main;
-
- public QueryServerThread(String[] argv, Configuration conf) {
- this(argv, conf, null);
- }
-
- public QueryServerThread(String[] argv, Configuration conf, String name) {
- this(new QueryServer(argv, conf), name);
- }
-
- private QueryServerThread(QueryServer m, String name) {
- super(m, "query server" + (name == null ? "" : (" - " + name)));
- this.main = m;
- setDaemon(true);
- }
-
- public QueryServer getQueryServer() {
- return main;
- }
-}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/HttpParamImpersonationQueryServerIT.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/HttpParamImpersonationQueryServerIT.java
new file mode 100644
index 0000000..f0f0e11
--- /dev/null
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/HttpParamImpersonationQueryServerIT.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.TlsUtil;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.queryserver.QueryServerOptions;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.client.Driver;
+import org.junit.experimental.categories.Category;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.drill.exec.store.phoenix.secured.QueryServerEnvironment.LOGIN_USER;
+
+/**
+ * This is a copy of {@link org.apache.phoenix.end2end.HttpParamImpersonationQueryServerIT},
+ * but customized with 3 users, see {@link SecuredPhoenixBaseTest#runForThreeClients} for details
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class HttpParamImpersonationQueryServerIT {
+
+ public static QueryServerEnvironment environment;
+
+ private static final List<TableName> SYSTEM_TABLE_NAMES = Arrays.asList(
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME,
+ PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME,
+ PhoenixDatabaseMetaData.SYSTEM_FUNCTION_HBASE_TABLE_NAME,
+ PhoenixDatabaseMetaData.SYSTEM_SCHEMA_HBASE_TABLE_NAME,
+ PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_HBASE_TABLE_NAME,
+ PhoenixDatabaseMetaData.SYSTEM_STATS_HBASE_TABLE_NAME);
+
+ public static synchronized void startQueryServerEnvironment() throws Exception {
+ // Clean up previous environment if any (Junit 4.13 @BeforeParam / @AfterParam would be an alternative)
+ if(environment != null) {
+ stopEnvironment();
+ }
+
+ final Configuration conf = new Configuration();
+ conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
+ conf.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName(), TokenProvider.class.getName());
+
+ // Set the proxyuser settings,
+ // so that the user who is running the Drillbits/MiniDfs can impersonate user1 and user2 (not user3)
+ conf.set(String.format("hadoop.proxyuser.%s.hosts", LOGIN_USER), "*");
+ conf.set(String.format("hadoop.proxyuser.%s.users", LOGIN_USER), "user1,user2");
+ conf.setBoolean(QueryServerProperties.QUERY_SERVER_WITH_REMOTEUSEREXTRACTOR_ATTRIB, true);
+ environment = new QueryServerEnvironment(conf, 3, false);
+ }
+
+ public static synchronized void stopEnvironment() throws Exception {
+ environment.stop();
+ environment = null;
+ }
+
+ static public String getUrlTemplate() {
+ String url = Driver.CONNECT_STRING_PREFIX + "url=%s://localhost:" + environment.getPqsPort() + "?"
+ + QueryServerOptions.DEFAULT_QUERY_SERVER_REMOTEUSEREXTRACTOR_PARAM + "=%s;authentication=SPNEGO;serialization=PROTOBUF%s";
+ if (environment.getTls()) {
+ return String.format(url, "https", "%s", ";truststore=" + TlsUtil.getTrustStoreFile().getAbsolutePath()
+ + ";truststore_password=" + TlsUtil.getTrustStorePassword());
+ } else {
+ return String.format(url, "http", "%s", "");
+ }
+ }
+
+ static void grantUsersToPhoenixSystemTables(List<String> usersToGrant) throws Exception {
+ // Grant permission to the user to access the system tables
+ try {
+ for (String user : usersToGrant) {
+ for (TableName tn : SYSTEM_TABLE_NAMES) {
+ AccessControlClient.grant(environment.getUtil().getConnection(), tn, user, null, null, Action.READ, Action.EXEC);
+ }
+ }
+ } catch (Throwable e) {
+ throw new Exception(e);
+ }
+ }
+
+ static void grantUsersToGlobalPhoenixUserTables(List<String> usersToGrant) throws Exception {
+ // Grant permission to the user to access the user tables
+ try {
+ for (String user : usersToGrant) {
+ AccessControlClient.grant(environment.getUtil().getConnection(), user, Action.READ, Action.EXEC);
+ }
+ } catch (Throwable e) {
+ throw new Exception(e);
+ }
+ }
+}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/QueryServerEnvironment.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/QueryServerEnvironment.java
new file mode 100644
index 0000000..6cbe8df
--- /dev/null
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/QueryServerEnvironment.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.security.PrivilegedAction;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.phoenix.end2end.TlsUtil;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.queryserver.QueryServerProperties;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.ThinClientUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a copy of class from `org.apache.phoenix:phoenix-queryserver-it`,
+ * see original javadoc in {@link org.apache.phoenix.end2end.QueryServerEnvironment}.
+ *
+ * It is possible to use original QueryServerEnvironment, but need to solve several issues:
+ * <ul>
+ * <li>TlsUtil.getClasspathDir(QueryServerEnvironment.class); in QueryServerEnvironment fails due to the path from jar.
+ * Can be fixed with copying TlsUtil in Drill project and changing getClasspathDir method to use
+ * SecuredPhoenixTestSuite.class instead of QueryServerEnvironment.class</li>
+ * <li>SERVICE_PRINCIPAL from QueryServerEnvironment is for `securecluster` not system user. So Test fails later
+ * in process of starting Drill cluster while creating udf area RemoteFunctionRegistry#createArea, it fails
+ * on checking Precondition ImpersonationUtil.getProcessUserName().equals(fileStatus.getOwner()),
+ * where ImpersonationUtil.getProcessUserName() is 'securecluster' and fileStatus.getOwner() is
+ * your local machine login user</li>
+ * </ul>
+ */
+public class QueryServerEnvironment {
+ private static final Logger LOG = LoggerFactory.getLogger(QueryServerEnvironment.class);
+
+ private final File TEMP_DIR = new File(getTempDir());
+ private final File KEYTAB_DIR = new File(TEMP_DIR, "keytabs");
+ private final List<File> USER_KEYTAB_FILES = new ArrayList<>();
+
+ private static final String LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+ static final String LOGIN_USER;
+
+ static {
+ try {
+ // uncomment it for debugging purposes
+ // System.setProperty("sun.security.krb5.debug", "true");
+ LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME = InetAddress.getByName("127.0.0.1").getCanonicalHostName();
+ String userName = System.getProperty("user.name");
+ LOGIN_USER = userName != null ? userName : "securecluster";
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static final String SPNEGO_PRINCIPAL = "HTTP/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+ private static final String PQS_PRINCIPAL = "phoenixqs/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+ private static final String SERVICE_PRINCIPAL = LOGIN_USER + "/" + LOCAL_HOST_REVERSE_DNS_LOOKUP_NAME;
+ private File KEYTAB;
+
+ private MiniKdc KDC;
+ private HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private LocalHBaseCluster HBASE_CLUSTER;
+ private int NUM_CREATED_USERS;
+
+ private ExecutorService PQS_EXECUTOR;
+ private QueryServer PQS;
+ private int PQS_PORT;
+ private String PQS_URL;
+
+ private boolean tls;
+
+ private static String getTempDir() {
+ StringBuilder sb = new StringBuilder(32);
+ sb.append(System.getProperty("user.dir")).append(File.separator);
+ sb.append("target").append(File.separator);
+ sb.append(QueryServerEnvironment.class.getSimpleName());
+ sb.append("-").append(UUID.randomUUID());
+ return sb.toString();
+ }
+
+ public int getPqsPort() {
+ return PQS_PORT;
+ }
+
+ public String getPqsUrl() {
+ return PQS_URL;
+ }
+
+ public boolean getTls() {
+ return tls;
+ }
+
+ public HBaseTestingUtility getUtil() {
+ return UTIL;
+ }
+
+ public String getServicePrincipal() {
+ return SERVICE_PRINCIPAL;
+ }
+
+ public File getServiceKeytab() {
+ return KEYTAB;
+ }
+
+ private static void updateDefaultRealm() throws Exception {
+ // (at least) one other phoenix test triggers the caching of this field before the KDC is up
+ // which causes principal parsing to fail.
+ Field f = KerberosName.class.getDeclaredField("defaultRealm");
+ f.setAccessible(true);
+ // Default realm for MiniKDC
+ f.set(null, "EXAMPLE.COM");
+ }
+
+ private void createUsers(int numUsers) throws Exception {
+ assertNotNull("KDC is null, was setup method called?", KDC);
+ NUM_CREATED_USERS = numUsers;
+ for (int i = 1; i <= numUsers; i++) {
+ String principal = "user" + i;
+ File keytabFile = new File(KEYTAB_DIR, principal + ".keytab");
+ KDC.createPrincipal(keytabFile, principal);
+ USER_KEYTAB_FILES.add(keytabFile);
+ }
+ }
+
+ public Map.Entry<String, File> getUser(int offset) {
+ if (!(offset > 0 && offset <= NUM_CREATED_USERS)) {
+ throw new IllegalArgumentException();
+ }
+ return new AbstractMap.SimpleImmutableEntry<String, File>("user" + offset, USER_KEYTAB_FILES.get(offset - 1));
+ }
+
+ /**
+ * Setup the security configuration for hdfs.
+ */
+ private void setHdfsSecuredConfiguration(Configuration conf) throws Exception {
+ // Set principal+keytab configuration for HDFS
+ conf.set(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
+ SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+ conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB.getAbsolutePath());
+ conf.set(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY,
+ SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+ conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB.getAbsolutePath());
+ conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
+ SPNEGO_PRINCIPAL + "@" + KDC.getRealm());
+ // Enable token access for HDFS blocks
+ conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+ // Only use HTTPS (required because we aren't using "secure" ports)
+ conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+ // Bind on localhost for spnego to have a chance at working
+ conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+ conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+
+ // Generate SSL certs
+ File keystoresDir = new File(UTIL.getDataTestDir("keystore").toUri().getPath());
+ keystoresDir.mkdirs();
+ String sslConfDir = TlsUtil.getClasspathDir(QueryServerEnvironment.class);
+ TlsUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, conf, false);
+
+ // Magic flag to tell hdfs to not fail on using ports above 1024
+ conf.setBoolean("ignore.secure.ports.for.testing", true);
+ }
+
+ private static void ensureIsEmptyDirectory(File f) throws IOException {
+ if (f.exists()) {
+ if (f.isDirectory()) {
+ FileUtils.deleteDirectory(f);
+ } else {
+ assertTrue("Failed to delete keytab directory", f.delete());
+ }
+ }
+ assertTrue("Failed to create keytab directory", f.mkdirs());
+ }
+
+ /**
+ * Setup and start kerberosed, hbase
+ * @throws Exception
+ */
+ public QueryServerEnvironment(final Configuration confIn, int numberOfUsers, boolean tls)
+ throws Exception {
+ this.tls = tls;
+
+ Configuration conf = UTIL.getConfiguration();
+ conf.addResource(confIn);
+ // Ensure the dirs we need are created/empty
+ ensureIsEmptyDirectory(TEMP_DIR);
+ ensureIsEmptyDirectory(KEYTAB_DIR);
+ KEYTAB = new File(KEYTAB_DIR, "test.keytab");
+ // Start a MiniKDC
+ KDC = UTIL.setupMiniKdc(KEYTAB);
+ // Create a service principal and spnego principal in one keytab
+ // NB. Due to some apparent limitations between HDFS and HBase in the same JVM, trying to
+ // use separate identies for HBase and HDFS results in a GSS initiate error. The quick
+ // solution is to just use a single "service" principal instead of "hbase" and "hdfs"
+ // (or "dn" and "nn") per usual.
+ KDC.createPrincipal(KEYTAB, SPNEGO_PRINCIPAL, PQS_PRINCIPAL, SERVICE_PRINCIPAL);
+ // Start ZK by hand
+ UTIL.startMiniZKCluster();
+
+ // Create a number of unprivileged users
+ createUsers(numberOfUsers);
+
+ // Set configuration for HBase
+ HBaseKerberosUtils.setPrincipalForTesting(SERVICE_PRINCIPAL + "@" + KDC.getRealm());
+ HBaseKerberosUtils.setSecuredConfiguration(conf);
+ setHdfsSecuredConfiguration(conf);
+ UserGroupInformation.setConfiguration(conf);
+ conf.setInt(HConstants.MASTER_PORT, 0);
+ conf.setInt(HConstants.MASTER_INFO_PORT, 0);
+ conf.setInt(HConstants.REGIONSERVER_PORT, 0);
+ conf.setInt(HConstants.REGIONSERVER_INFO_PORT, 0);
+
+ if (tls) {
+ conf.setBoolean(QueryServerProperties.QUERY_SERVER_TLS_ENABLED, true);
+ conf.set(QueryServerProperties.QUERY_SERVER_TLS_KEYSTORE,
+ TlsUtil.getKeyStoreFile().getAbsolutePath());
+ conf.set(QueryServerProperties.QUERY_SERVER_TLS_KEYSTORE_PASSWORD,
+ TlsUtil.getKeyStorePassword());
+ conf.set(QueryServerProperties.QUERY_SERVER_TLS_TRUSTSTORE,
+ TlsUtil.getTrustStoreFile().getAbsolutePath());
+ conf.set(QueryServerProperties.QUERY_SERVER_TLS_TRUSTSTORE_PASSWORD,
+ TlsUtil.getTrustStorePassword());
+ }
+
+ // Secure Phoenix setup
+ conf.set(QueryServerProperties.QUERY_SERVER_KERBEROS_HTTP_PRINCIPAL_ATTRIB_LEGACY,
+ SPNEGO_PRINCIPAL + "@" + KDC.getRealm());
+ conf.set(QueryServerProperties.QUERY_SERVER_HTTP_KEYTAB_FILENAME_ATTRIB,
+ KEYTAB.getAbsolutePath());
+ conf.set(QueryServerProperties.QUERY_SERVER_KERBEROS_PRINCIPAL_ATTRIB,
+ PQS_PRINCIPAL + "@" + KDC.getRealm());
+ conf.set(QueryServerProperties.QUERY_SERVER_KEYTAB_FILENAME_ATTRIB,
+ KEYTAB.getAbsolutePath());
+ conf.setBoolean(QueryServerProperties.QUERY_SERVER_DISABLE_KERBEROS_LOGIN, true);
+ conf.setInt(QueryServerProperties.QUERY_SERVER_HTTP_PORT_ATTRIB, 0);
+ // Required so that PQS can impersonate the end-users to HBase
+ conf.set("hadoop.proxyuser.phoenixqs.groups", "*");
+ conf.set("hadoop.proxyuser.phoenixqs.hosts", "*");
+
+ // Clear the cached singletons so we can inject our own.
+ InstanceResolver.clearSingletons();
+ // Make sure the ConnectionInfo doesn't try to pull a default Configuration
+ InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() {
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public Configuration getConfiguration(Configuration confToClone) {
+ Configuration copy = new Configuration(conf);
+ copy.addResource(confToClone);
+ return copy;
+ }
+ });
+ updateDefaultRealm();
+
+ // Start HDFS
+ UTIL.startMiniDFSCluster(1);
+ // Use LocalHBaseCluster to avoid HBaseTestingUtility from doing something wrong
+ // NB. I'm not actually sure what HTU does incorrect, but this was pulled from some test
+ // classes in HBase itself. I couldn't get HTU to work myself (2017/07/06)
+ Path rootdir = UTIL.getDataTestDirOnTestFS(QueryServerEnvironment.class.getSimpleName());
+ // There is no setRootdir method that is available in all supported HBase versions.
+ conf.set(HBASE_DIR, rootdir.toString());
+ HBASE_CLUSTER = new LocalHBaseCluster(conf, 1);
+ HBASE_CLUSTER.startup();
+
+ // Then fork a thread with PQS in it.
+ configureAndStartQueryServer(tls);
+ }
+
+ private void configureAndStartQueryServer(boolean tls) throws Exception {
+ PQS = new QueryServer(new String[0], UTIL.getConfiguration());
+ // Get the PQS ident for PQS to use
+ final UserGroupInformation ugi =
+ UserGroupInformation.loginUserFromKeytabAndReturnUGI(PQS_PRINCIPAL,
+ KEYTAB.getAbsolutePath());
+ PQS_EXECUTOR = Executors.newSingleThreadExecutor();
+ // Launch PQS, doing in the Kerberos login instead of letting PQS do it itself (which would
+ // break the HBase/HDFS logins also running in the same test case).
+ PQS_EXECUTOR.submit(new Runnable() {
+ @Override
+ public void run() {
+ ugi.doAs(new PrivilegedAction<Void>() {
+ @Override
+ public Void run() {
+ PQS.run();
+ return null;
+ }
+ });
+ }
+ });
+ PQS.awaitRunning();
+ PQS_PORT = PQS.getPort();
+ PQS_URL =
+ ThinClientUtil.getConnectionUrl(tls ? "https" : "http", "localhost", PQS_PORT)
+ + ";authentication=SPNEGO" + (tls
+ ? ";truststore=" + TlsUtil.getTrustStoreFile().getAbsolutePath()
+ + ";truststore_password=" + TlsUtil.getTrustStorePassword()
+ : "");
+ LOG.debug("Phoenix Query Server URL: {}", PQS_URL);
+ }
+
+ public void stop() throws Exception {
+ // Remove our custom ConfigurationFactory for future tests
+ InstanceResolver.clearSingletons();
+ if (PQS_EXECUTOR != null) {
+ PQS.stop();
+ PQS_EXECUTOR.shutdown();
+ if (!PQS_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS)) {
+ LOG.info("PQS didn't exit in 5 seconds, proceeding anyways.");
+ }
+ }
+ if (HBASE_CLUSTER != null) {
+ HBASE_CLUSTER.shutdown();
+ HBASE_CLUSTER.join();
+ }
+ if (UTIL != null) {
+ UTIL.shutdownMiniZKCluster();
+ }
+ if (KDC != null) {
+ KDC.stop();
+ }
+ }
+}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixBaseTest.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixBaseTest.java
new file mode 100644
index 0000000..bf541a9
--- /dev/null
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixBaseTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import ch.qos.logback.classic.Level;
+import com.sun.security.auth.module.Krb5LoginModule;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
+import org.apache.drill.exec.rpc.security.kerberos.KerberosFactory;
+import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.phoenix.PhoenixStoragePluginConfig;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createSampleData;
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createSchema;
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.createTables;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.environment;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.getUrlTemplate;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.grantUsersToGlobalPhoenixUserTables;
+import static org.apache.drill.exec.store.phoenix.secured.HttpParamImpersonationQueryServerIT.grantUsersToPhoenixSystemTables;
+import static org.apache.drill.exec.store.phoenix.secured.SecuredPhoenixTestSuite.initPhoenixQueryServer;
+
+@Slf4j
+public abstract class SecuredPhoenixBaseTest extends ClusterTest {
+ protected static LogFixture logFixture;
+ private final static Level CURRENT_LOG_LEVEL = Level.INFO;
+
+ private final static AtomicInteger initCount = new AtomicInteger(0);
+
+ @BeforeAll
+ public static void setUpBeforeClass() throws Exception {
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+ initPhoenixQueryServer();
+ startSecuredDrillCluster();
+ initializeDatabase();
+ }
+
+ private static void startSecuredDrillCluster() throws Exception {
+ logFixture = LogFixture.builder()
+ .toConsole()
+ .logger(QueryServerEnvironment.class, CURRENT_LOG_LEVEL)
+ .logger(SecuredPhoenixBaseTest.class, CURRENT_LOG_LEVEL)
+ .logger(KerberosFactory.class, CURRENT_LOG_LEVEL)
+ .logger(Krb5LoginModule.class, CURRENT_LOG_LEVEL)
+ .logger(QueryServer.class, CURRENT_LOG_LEVEL)
+ .logger(ServerAuthenticationHandler.class, CURRENT_LOG_LEVEL)
+ .build();
+
+ Map.Entry<String, File> user1 = environment.getUser(1);
+ Map.Entry<String, File> user2 = environment.getUser(2);
+ Map.Entry<String, File> user3 = environment.getUser(3);
+
+ dirTestWatcher.start(SecuredPhoenixTestSuite.class); // until DirTestWatcher ClassRule is implemented for JUnit5
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .configProperty(ExecConstants.USER_AUTHENTICATION_ENABLED, true)
+ .configProperty(ExecConstants.USER_AUTHENTICATOR_IMPL, UserAuthenticatorTestImpl.TYPE)
+ .configNonStringProperty(ExecConstants.AUTHENTICATION_MECHANISMS, Lists.newArrayList("kerberos"))
+ .configProperty(ExecConstants.IMPERSONATION_ENABLED, true)
+ .configProperty(ExecConstants.BIT_AUTHENTICATION_ENABLED, true)
+ .configProperty(ExecConstants.BIT_AUTHENTICATION_MECHANISM, "kerberos")
+ .configProperty(ExecConstants.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting())
+ .configProperty(ExecConstants.SERVICE_KEYTAB_LOCATION, environment.getServiceKeytab().getAbsolutePath())
+ .configClientProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting())
+ .configClientProperty(DrillProperties.USER, user1.getKey())
+ .configClientProperty(DrillProperties.KEYTAB, user1.getValue().getAbsolutePath());
+ startCluster(builder);
+ Properties user2ClientProperties = new Properties();
+ user2ClientProperties.setProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting());
+ user2ClientProperties.setProperty(DrillProperties.USER, user2.getKey());
+ user2ClientProperties.setProperty(DrillProperties.KEYTAB, user2.getValue().getAbsolutePath());
+ cluster.addClientFixture(user2ClientProperties);
+ Properties user3ClientProperties = new Properties();
+ user3ClientProperties.setProperty(DrillProperties.SERVICE_PRINCIPAL, HBaseKerberosUtils.getPrincipalForTesting());
+ user3ClientProperties.setProperty(DrillProperties.USER, user3.getKey());
+ user3ClientProperties.setProperty(DrillProperties.KEYTAB, user3.getValue().getAbsolutePath());
+ cluster.addClientFixture(user3ClientProperties);
+
+ Map<String, Object> phoenixProps = new HashMap<>();
+ phoenixProps.put("phoenix.query.timeoutMs", 90000);
+ phoenixProps.put("phoenix.query.keepAliveMs", "30000");
+ phoenixProps.put("phoenix.queryserver.withRemoteUserExtractor", true);
+ StoragePluginRegistry registry = cluster.drillbit().getContext().getStorage();
+ final String doAsUrl = String.format(getUrlTemplate(), "$user");
+ logger.debug("Phoenix Query Server URL: {}", environment.getPqsUrl());
+ PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, 0, null, null,
+ doAsUrl, null, phoenixProps);
+ config.setEnabled(true);
+ registry.put(PhoenixStoragePluginConfig.NAME + "123", config);
+ }
+
+
+ /**
+ * Initialize HBase via Phoenix
+ */
+ private static void initializeDatabase() throws Exception {
+ dirTestWatcher.copyResourceToRoot(Paths.get(""));
+ if (initCount.incrementAndGet() == 1) {
+ final Map.Entry<String, File> user1 = environment.getUser(1);
+ final Map.Entry<String, File> user2 = environment.getUser(2);
+ // Build the JDBC URL by hand with the doAs
+ final UserGroupInformation serviceUgi = ImpersonationUtil.getProcessUserUGI();
+ serviceUgi.doAs((PrivilegedExceptionAction<Void>) () -> {
+ createSchema(environment.getPqsUrl());
+ createTables(environment.getPqsUrl());
+ createSampleData(environment.getPqsUrl());
+ grantUsersToPhoenixSystemTables(Arrays.asList(user1.getKey(), user2.getKey()));
+ grantUsersToGlobalPhoenixUserTables(Arrays.asList(user1.getKey()));
+ return null;
+ });
+ }
+ }
+
+ protected interface TestWrapper {
+ void apply() throws Exception;
+ }
+
+ public void runForThreeClients(SecuredPhoenixSQLTest.TestWrapper wrapper) throws Exception {
+ runForThreeClients(wrapper, UserRemoteException.class, RuntimeException.class);
+ }
+
+ /**
+ * @param wrapper actual test case execution
+ * @param user2ExpectedException the expected Exception for user2, which can be impersonated, but hasn't permissions to the tables
+ * @param user3ExpectedException the expected Exception for user3, isn't impersonated
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public void runForThreeClients(SecuredPhoenixSQLTest.TestWrapper wrapper, Class user2ExpectedException, Class user3ExpectedException) throws Exception {
+ try {
+ client = cluster.client(0);
+ wrapper.apply();
+ client = cluster.client(1);
+ // original is AccessDeniedException: Insufficient permissions for user 'user2'
+ Assertions.assertThrows(user2ExpectedException, wrapper::apply);
+ client = cluster.client(2);
+ // RuntimeException for user3, Failed to execute HTTP Request, got HTTP/401
+ Assertions.assertThrows(user3ExpectedException, wrapper::apply);
+ } finally {
+ client = cluster.client(0);
+ }
+ }
+
+ @AfterAll
+ public static void tearDownCluster() throws Exception {
+ if (!SecuredPhoenixTestSuite.isRunningSuite() && environment != null) {
+ HttpParamImpersonationQueryServerIT.stopEnvironment();
+ }
+ }
+}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixCommandTest.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixCommandTest.java
similarity index 61%
copy from contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixCommandTest.java
copy to contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixCommandTest.java
index 4fa5ac5..e0f1cce 100644
--- a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixCommandTest.java
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixCommandTest.java
@@ -15,9 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.phoenix;
-
-import static org.junit.Assert.assertEquals;
+package org.apache.drill.exec.store.phoenix.secured;
import org.apache.drill.categories.RowSetTests;
import org.apache.drill.categories.SlowTest;
@@ -28,23 +26,31 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.test.QueryBuilder;
import org.apache.drill.test.rowSet.RowSetComparison;
-import org.junit.FixMethodOrder;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runners.MethodSorters;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
-@FixMethodOrder(MethodSorters.JVM)
-@Category({ SlowTest.class, RowSetTests.class })
-public class PhoenixCommandTest extends PhoenixBaseTest {
+@Tag(SlowTest.TAG)
+@Tag(RowSetTests.TAG)
+public class SecuredPhoenixCommandTest extends SecuredPhoenixBaseTest {
@Test
public void testShowTablesLike() throws Exception {
- run("USE phoenix123.v1");
- assertEquals(1, queryBuilder().sql("SHOW TABLES LIKE '%REGION%'").run().recordCount());
+ runForThreeClients(this::doTestShowTablesLike);
+ }
+
+ private void doTestShowTablesLike() throws Exception {
+ runAndPrint("SHOW SCHEMAS");
+ run("USE phoenix123.V1");
+ Assertions.assertEquals(1, queryBuilder().sql("SHOW TABLES LIKE '%REGION%'").run().recordCount());
}
@Test
public void testShowTables() throws Exception {
+ runForThreeClients(this::doTestShowTables);
+ }
+
+ private void doTestShowTables() throws Exception {
String sql = "SHOW TABLES FROM phoenix123.v1";
QueryBuilder builder = client.queryBuilder().sql(sql);
RowSet sets = builder.rowSet();
@@ -66,28 +72,11 @@ public class PhoenixCommandTest extends PhoenixBaseTest {
@Test
public void testDescribe() throws Exception {
- assertEquals(4, queryBuilder().sql("DESCRIBE phoenix123.v1.NATION").run().recordCount());
+ runForThreeClients(this::doTestDescribe);
}
- @Test
- public void testDescribeCaseInsensitive() throws Exception {
- String sql = "DESCRIBE phoenix123.v1.nation"; // use lowercase
- QueryBuilder builder = client.queryBuilder().sql(sql);
- RowSet sets = builder.rowSet();
-
- TupleMetadata schema = new SchemaBuilder()
- .addNullable("COLUMN_NAME", MinorType.VARCHAR)
- .addNullable("DATA_TYPE", MinorType.VARCHAR)
- .addNullable("IS_NULLABLE", MinorType.VARCHAR)
- .build();
-
- RowSet expected = new RowSetBuilder(client.allocator(), schema)
- .addRow("N_NATIONKEY", "BIGINT", "NO")
- .addRow("N_NAME", "CHARACTER VARYING", "YES")
- .addRow("N_REGIONKEY", "BIGINT", "YES")
- .addRow("N_COMMENT", "CHARACTER VARYING", "YES")
- .build();
-
- new RowSetComparison(expected).verifyAndClearAll(sets);
+ private void doTestDescribe() throws Exception {
+ run("USE phoenix123.v1");
+ Assertions.assertEquals(4, queryBuilder().sql("DESCRIBE NATION").run().recordCount());
}
}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixDataTypeTest.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixDataTypeTest.java
new file mode 100644
index 0000000..5fd0962
--- /dev/null
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixDataTypeTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.apache.drill.exec.store.phoenix.PhoenixBaseTest.U_U_I_D;
+import static org.apache.drill.test.rowSet.RowSetUtilities.boolArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.byteArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.doubleArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.longArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.shortArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+
+@Tag(SlowTest.TAG)
+@Tag(RowSetTests.TAG)
+public class SecuredPhoenixDataTypeTest extends SecuredPhoenixBaseTest {
+
+ @Test
+ public void testDataType() throws Exception {
+ runForThreeClients(this::doTestDataType);
+ }
+
+ private void doTestDataType() throws Exception {
+ String sql = "select * from phoenix123.v1.datatype";
+ QueryBuilder builder = queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable("T_UUID", MinorType.VARCHAR)
+ .addNullable("T_VARCHAR", MinorType.VARCHAR)
+ .addNullable("T_CHAR", MinorType.VARCHAR)
+ .addNullable("T_BIGINT", MinorType.BIGINT)
+ .addNullable("T_INTEGER", MinorType.INT)
+ .addNullable("T_SMALLINT", MinorType.INT)
+ .addNullable("T_TINYINT", MinorType.INT)
+ .addNullable("T_DOUBLE", MinorType.FLOAT8)
+ .addNullable("T_FLOAT", MinorType.FLOAT4)
+ .addNullable("T_DECIMAL", MinorType.VARDECIMAL)
+ .addNullable("T_DATE", MinorType.DATE)
+ .addNullable("T_TIME", MinorType.TIME)
+ .addNullable("T_TIMESTAMP", MinorType.TIMESTAMP)
+ .addNullable("T_BINARY", MinorType.VARBINARY)
+ .addNullable("T_VARBINARY", MinorType.VARBINARY)
+ .addNullable("T_BOOLEAN", MinorType.BIT)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow(U_U_I_D,
+ "apache", "drill",
+ Long.MAX_VALUE,
+ Integer.MAX_VALUE,
+ Short.MAX_VALUE,
+ Byte.MAX_VALUE,
+ Double.MAX_VALUE,
+ Float.MAX_VALUE,
+ BigDecimal.valueOf(10.11),
+ LocalDate.parse("2021-12-12"),
+ LocalTime.parse("12:12:12"),
+ Instant.ofEpochMilli(1639311132000l),
+ "a_b_c_d_e_".getBytes(), "12345".getBytes(),
+ Boolean.TRUE)
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
+
+ @Test
+ public void testArrayType() throws Exception {
+ runForThreeClients(this::doTestArrayType);
+ }
+
+ private void doTestArrayType() throws Exception {
+ String sql = "select * from phoenix123.v1.arraytype";
+
+ QueryBuilder builder = queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable("T_UUID", MinorType.VARCHAR)
+ .addArray("T_VARCHAR", MinorType.VARCHAR)
+ .addArray("T_CHAR", MinorType.VARCHAR)
+ .addArray("T_BIGINT", MinorType.BIGINT)
+ .addArray("T_INTEGER", MinorType.INT)
+ .addArray("T_DOUBLE", MinorType.FLOAT8)
+ .addArray("T_SMALLINT", MinorType.SMALLINT)
+ .addArray("T_TINYINT", MinorType.TINYINT)
+ .addArray("T_BOOLEAN", MinorType.BIT)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow(U_U_I_D,
+ strArray("apache", "drill", "1.20"),
+ strArray("a", "b", "c"),
+ longArray(Long.MIN_VALUE, Long.MAX_VALUE),
+ intArray(Integer.MIN_VALUE, Integer.MAX_VALUE),
+ doubleArray(Double.MIN_VALUE, Double.MAX_VALUE),
+ shortArray(Short.MIN_VALUE, Short.MAX_VALUE),
+ byteArray((int) Byte.MIN_VALUE, (int) Byte.MAX_VALUE),
+ boolArray(Boolean.TRUE, Boolean.FALSE))
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
+}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixSQLTest.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixSQLTest.java
new file mode 100644
index 0000000..c2209ce
--- /dev/null
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixSQLTest.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag(SlowTest.TAG)
+@Tag(RowSetTests.TAG)
+public class SecuredPhoenixSQLTest extends SecuredPhoenixBaseTest {
+
+ @Test
+ public void testStarQuery() throws Exception {
+ runForThreeClients(this::doTestStarQuery);
+ }
+
+ private void doTestStarQuery() throws Exception {
+ String sql = "select * from phoenix123.v1.nation";
+ queryBuilder().sql(sql).run();
+ }
+
+ @Test
+ public void testExplicitQuery() throws Exception {
+ runForThreeClients(this::doTestExplicitQuery);
+ }
+
+ private void doTestExplicitQuery() throws Exception {
+ String sql = "select n_nationkey, n_regionkey, n_name from phoenix123.v1.nation";
+ QueryBuilder builder = queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable("n_nationkey", MinorType.BIGINT)
+ .addNullable("n_regionkey", MinorType.BIGINT)
+ .addNullable("n_name", MinorType.VARCHAR)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow(0, 0, "ALGERIA")
+ .addRow(1, 1, "ARGENTINA")
+ .addRow(2, 1, "BRAZIL")
+ .addRow(3, 1, "CANADA")
+ .addRow(4, 4, "EGYPT")
+ .addRow(5, 0, "ETHIOPIA")
+ .addRow(6, 3, "FRANCE")
+ .addRow(7, 3, "GERMANY")
+ .addRow(8, 2, "INDIA")
+ .addRow(9, 2, "INDONESIA")
+ .addRow(10, 4, "IRAN")
+ .addRow(11, 4, "IRAQ")
+ .addRow(12, 2, "JAPAN")
+ .addRow(13, 4, "JORDAN")
+ .addRow(14, 0, "KENYA")
+ .addRow(15, 0, "MOROCCO")
+ .addRow(16, 0, "MOZAMBIQUE")
+ .addRow(17, 1, "PERU")
+ .addRow(18, 2, "CHINA")
+ .addRow(19, 3, "ROMANIA")
+ .addRow(20, 4, "SAUDI ARABIA")
+ .addRow(21, 2, "VIETNAM")
+ .addRow(22, 3, "RUSSIA")
+ .addRow(23, 3, "UNITED KINGDOM")
+ .addRow(24, 1, "UNITED STATES")
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
+
+ @Test
+ public void testLimitPushdown() throws Exception {
+ runForThreeClients(this::doTestLimitPushdown);
+ }
+
+ private void doTestLimitPushdown() throws Exception {
+ String sql = "select n_name, n_regionkey from phoenix123.v1.nation limit 20 offset 10";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ builder.planMatcher()
+ .exclude("Limit")
+ .include("OFFSET .* ROWS FETCH NEXT .* ROWS ONLY")
+ .match();
+
+ assertEquals(15, sets.rowCount());
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable("n_name", MinorType.VARCHAR)
+ .addNullable("n_regionkey", MinorType.BIGINT)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow("IRAN", 4)
+ .addRow("IRAQ", 4)
+ .addRow("JAPAN", 2)
+ .addRow("JORDAN", 4)
+ .addRow("KENYA", 0)
+ .addRow("MOROCCO", 0)
+ .addRow("MOZAMBIQUE", 0)
+ .addRow("PERU", 1)
+ .addRow("CHINA", 2)
+ .addRow("ROMANIA", 3)
+ .addRow("SAUDI ARABIA", 4)
+ .addRow("VIETNAM", 2)
+ .addRow("RUSSIA", 3)
+ .addRow("UNITED KINGDOM", 3)
+ .addRow("UNITED STATES", 1)
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
+
+ @Test
+ public void testFilterPushdown() throws Exception {
+ runForThreeClients(this::doTestFilterPushdown);
+ }
+
+ private void doTestFilterPushdown() throws Exception {
+ String sql = "select * from phoenix123.v1.region where r_name = 'ASIA'";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ builder.planMatcher()
+ .exclude("Filter")
+ .include("WHERE .* = 'ASIA'")
+ .match();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable("R_REGIONKEY", MinorType.BIGINT)
+ .addNullable("R_NAME", MinorType.VARCHAR)
+ .addNullable("R_COMMENT", MinorType.VARCHAR)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow(2, "ASIA", "ges. thinly even pinto beans ca")
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
+
+ @Test
+ public void testSerDe() throws Exception {
+ runForThreeClients(this::doTestSerDe, RpcException.class, RpcException.class);
+ }
+
+ private void doTestSerDe() throws Exception {
+ String sql = "select count(*) as total from phoenix123.v1.nation";
+ String plan = queryBuilder().sql(sql).explainJson();
+ long cnt = queryBuilder().physical(plan).singletonLong();
+ assertEquals(25, cnt, "Counts should match");
+ }
+
+ @Test
+ public void testJoinPushdown() throws Exception {
+ runForThreeClients(this::doTestJoinPushdown);
+ }
+
+ private void doTestJoinPushdown() throws Exception {
+ String sql = "select a.n_name, b.r_name from phoenix123.v1.nation a join phoenix123.v1.region b "
+ + "on a.n_regionkey = b.r_regionkey";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ builder.planMatcher()
+ .exclude("Join")
+ .include("Phoenix\\(.* INNER JOIN")
+ .match();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable("n_name", MinorType.VARCHAR)
+ .addNullable("r_name", MinorType.VARCHAR)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow("ALGERIA", "AFRICA")
+ .addRow("ARGENTINA", "AMERICA")
+ .addRow("BRAZIL", "AMERICA")
+ .addRow("CANADA", "AMERICA")
+ .addRow("EGYPT", "MIDDLE EAST")
+ .addRow("ETHIOPIA", "AFRICA")
+ .addRow("FRANCE", "EUROPE")
+ .addRow("GERMANY", "EUROPE")
+ .addRow("INDIA", "ASIA")
+ .addRow("INDONESIA", "ASIA")
+ .addRow("IRAN", "MIDDLE EAST")
+ .addRow("IRAQ", "MIDDLE EAST")
+ .addRow("JAPAN", "ASIA")
+ .addRow("JORDAN", "MIDDLE EAST")
+ .addRow("KENYA", "AFRICA")
+ .addRow("MOROCCO", "AFRICA")
+ .addRow("MOZAMBIQUE", "AFRICA")
+ .addRow("PERU", "AMERICA")
+ .addRow("CHINA", "ASIA")
+ .addRow("ROMANIA", "EUROPE")
+ .addRow("SAUDI ARABIA", "MIDDLE EAST")
+ .addRow("VIETNAM", "ASIA")
+ .addRow("RUSSIA", "EUROPE")
+ .addRow("UNITED KINGDOM", "EUROPE")
+ .addRow("UNITED STATES", "AMERICA")
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
+
+ @Test
+ public void testCrossJoin() throws Exception {
+ runForThreeClients(this::doTestCrossJoin);
+ }
+
+ private void doTestCrossJoin() throws Exception {
+ String sql = "select a.n_name, b.n_comment from phoenix123.v1.nation a cross join phoenix123.v1.nation b";
+ client.alterSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName(), false);
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ builder.planMatcher().exclude("Join").match();
+
+ assertEquals(625, sets.rowCount(), "Counts should match");
+ sets.clear();
+ }
+
+ @Test
+ @Disabled("use the remote query server directly without minicluster")
+ public void testJoinWithFilterPushdown() throws Exception {
+ String sql = "select 10 as DRILL, a.n_name, b.r_name from phoenix123.v1.nation a join phoenix123.v1.region b "
+ + "on a.n_regionkey = b.r_regionkey where b.r_name = 'ASIA'";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ builder.planMatcher()
+ .exclude("Join")
+ .exclude("Filter")
+ .include("Phoenix\\(.* INNER JOIN .* WHERE")
+ .match();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable("DRILL", MinorType.INT)
+ .addNullable("n_name", MinorType.VARCHAR)
+ .addNullable("r_name", MinorType.VARCHAR)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow(10, "INDIA", "ASIA")
+ .addRow(10, "INDONESIA", "ASIA")
+ .addRow(10, "JAPAN", "ASIA")
+ .addRow(10, "CHINA", "ASIA")
+ .addRow(10, "VIETNAM", "ASIA")
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
+
+ @Test
+ public void testGroupByPushdown() throws Exception {
+ runForThreeClients(this::doTestGroupByPushdown);
+ }
+
+ private void doTestGroupByPushdown() throws Exception {
+ String sql = "select n_regionkey, count(1) as total from phoenix123.v1.nation group by n_regionkey";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ builder.planMatcher()
+ .exclude("Aggregate")
+ .include("Phoenix\\(.* GROUP BY")
+ .match();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable("n_regionkey", MinorType.BIGINT)
+ .addNullable("total", MinorType.BIGINT)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow(0, 5)
+ .addRow(1, 5)
+ .addRow(2, 5)
+ .addRow(3, 5)
+ .addRow(4, 5)
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
+
+ @Test
+ public void testDistinctPushdown() throws Exception {
+ runForThreeClients(this::doTestDistinctPushdown);
+ }
+
+ private void doTestDistinctPushdown() throws Exception {
+ String sql = "select distinct n_name from phoenix123.v1.nation"; // auto convert to group-by
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ builder.planMatcher()
+ .exclude("Aggregate")
+ .include("Phoenix\\(.* GROUP BY \"N_NAME")
+ .match();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable("n_name", MinorType.VARCHAR)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow("ALGERIA")
+ .addRow("ARGENTINA")
+ .addRow("BRAZIL")
+ .addRow("CANADA")
+ .addRow("CHINA")
+ .addRow("EGYPT")
+ .addRow("ETHIOPIA")
+ .addRow("FRANCE")
+ .addRow("GERMANY")
+ .addRow("INDIA")
+ .addRow("INDONESIA")
+ .addRow("IRAN")
+ .addRow("IRAQ")
+ .addRow("JAPAN")
+ .addRow("JORDAN")
+ .addRow("KENYA")
+ .addRow("MOROCCO")
+ .addRow("MOZAMBIQUE")
+ .addRow("PERU")
+ .addRow("ROMANIA")
+ .addRow("RUSSIA")
+ .addRow("SAUDI ARABIA")
+ .addRow("UNITED KINGDOM")
+ .addRow("UNITED STATES")
+ .addRow("VIETNAM")
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
+
+ @Test
+ public void testHavingPushdown() throws Exception {
+ runForThreeClients(this::doTestHavingPushdown);
+ }
+
+ private void doTestHavingPushdown() throws Exception {
+ String sql = "select n_regionkey, max(n_nationkey) from phoenix123.v1.nation group by n_regionkey having max(n_nationkey) > 20";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ builder.planMatcher()
+ .exclude("Aggregate")
+ .include("Phoenix\\(.* GROUP BY .* HAVING MAX")
+ .match();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable("n_regionkey", MinorType.BIGINT)
+ .addNullable("EXPR$1", MinorType.BIGINT)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow(1, 24)
+ .addRow(2, 21)
+ .addRow(3, 23)
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
+}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixTestSuite.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixTestSuite.java
similarity index 62%
copy from contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixTestSuite.java
copy to contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixTestSuite.java
index 1817b76..5d0451e 100644
--- a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/PhoenixTestSuite.java
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/secured/SecuredPhoenixTestSuite.java
@@ -15,57 +15,59 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.phoenix;
+package org.apache.drill.exec.store.phoenix.secured;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.exec.store.phoenix.QueryServerBasicsIT;
+import org.apache.drill.test.BaseTest;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Tag;
+import org.junit.platform.suite.api.SelectClasses;
+import org.junit.platform.suite.api.Suite;
+import org.slf4j.LoggerFactory;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.drill.categories.SlowTest;
-import org.apache.drill.test.ClusterTest;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
-import org.junit.runners.Suite.SuiteClasses;
-import org.slf4j.LoggerFactory;
-@RunWith(Suite.class)
-@SuiteClasses ({
- PhoenixDataTypeTest.class,
- PhoenixSQLTest.class,
- PhoenixCommandTest.class
+@Suite
+@SelectClasses({
+ SecuredPhoenixDataTypeTest.class,
+ SecuredPhoenixSQLTest.class,
+ SecuredPhoenixCommandTest.class
})
-@Ignore
-@Category({ SlowTest.class })
-public class PhoenixTestSuite extends ClusterTest {
+@Disabled
+@Tag(SlowTest.TAG)
+@Tag(RowSetTests.TAG)
+public class SecuredPhoenixTestSuite extends BaseTest {
- private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PhoenixTestSuite.class);
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SecuredPhoenixTestSuite.class);
private static volatile boolean runningSuite = false;
- private static AtomicInteger initCount = new AtomicInteger(0);
+ private static final AtomicInteger initCount = new AtomicInteger(0);
- @BeforeClass
+ @BeforeAll
public static void initPhoenixQueryServer() throws Exception {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
- synchronized (PhoenixTestSuite.class) {
+ synchronized (SecuredPhoenixTestSuite.class) {
if (initCount.get() == 0) {
logger.info("Boot the test cluster...");
- QueryServerBasicsIT.doSetup();
+ HttpParamImpersonationQueryServerIT.startQueryServerEnvironment();
}
initCount.incrementAndGet();
runningSuite = true;
}
}
- @AfterClass
+ @AfterAll
public static void tearDownCluster() throws Exception {
- synchronized (PhoenixTestSuite.class) {
+ synchronized (SecuredPhoenixTestSuite.class) {
if (initCount.decrementAndGet() == 0) {
logger.info("Shutdown all instances of test cluster.");
QueryServerBasicsIT.afterClass();
- shutdown();
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index d47e8d9..9e50f12 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -98,12 +98,7 @@ class OperatorContextImpl extends BaseOperatorContext implements AutoCloseable {
currentThread.setName(proxyUgi.getUserName() + ":task-delegate-thread");
final RESULT result;
try {
- result = proxyUgi.doAs(new PrivilegedExceptionAction<RESULT>() {
- @Override
- public RESULT run() throws Exception {
- return callable.call();
- }
- });
+ result = proxyUgi.doAs((PrivilegedExceptionAction<RESULT>) () -> callable.call());
} finally {
currentThread.setName(originalThreadName);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index 325d587..bc98205 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -110,7 +110,6 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
/**
* Name of the user whom to impersonate while setting up the implementation (RecordBatch) of this
* PhysicalOperator. Default value is "null" in which case we impersonate as user who launched the query.
- * @return
*/
@JsonProperty("userName")
String getUserName();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index adfd766..cf0112d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -117,12 +117,8 @@ public class ImplCreator {
if (context.isImpersonationEnabled()) {
final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(root.getUserName(), context.getQueryUserName());
try {
- return proxyUgi.doAs(new PrivilegedExceptionAction<RootExec>() {
- @Override
- public RootExec run() throws Exception {
- return ((RootCreator<PhysicalOperator>) getOpCreator(root, context)).getRoot(context, root, childRecordBatches);
- }
- });
+ return proxyUgi.doAs((PrivilegedExceptionAction<RootExec>) ()
+ -> ((RootCreator<PhysicalOperator>) getOpCreator(root, context)).getRoot(context, root, childRecordBatches));
} catch (InterruptedException | IOException e) {
final String errMsg = String.format("Failed to create RootExec for operator with id '%d'", root.getOperatorId());
logger.error(errMsg, e);
@@ -145,15 +141,12 @@ public class ImplCreator {
if (context.isImpersonationEnabled()) {
final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(op.getUserName(), context.getQueryUserName());
try {
- return proxyUgi.doAs(new PrivilegedExceptionAction<RecordBatch>() {
- @Override
- public RecordBatch run() throws Exception {
- @SuppressWarnings("unchecked")
- final CloseableRecordBatch batch = ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(
- context, op, childRecordBatches);
- operators.addFirst(batch);
- return batch;
- }
+ return proxyUgi.doAs((PrivilegedExceptionAction<RecordBatch>) () -> {
+ @SuppressWarnings("unchecked")
+ final CloseableRecordBatch batch = ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(
+ context, op, childRecordBatches);
+ operators.addFirst(batch);
+ return batch;
});
} catch (InterruptedException | IOException e) {
final String errMsg = String.format("Failed to create RecordBatch for operator with id '%d'", op.getOperatorId());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java
index 1e8237b..f23f425 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.planner.sql.conversion;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
@@ -38,13 +39,17 @@ import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.util.Static;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
class DrillValidator extends SqlValidatorImpl {
+ private final boolean isImpersonationEnabled;
+
DrillValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader,
- RelDataTypeFactory typeFactory, SqlConformance conformance) {
+ RelDataTypeFactory typeFactory, SqlConformance conformance, boolean isImpersonationEnabled) {
super(opTab, catalogReader, typeFactory, conformance);
+ this.isImpersonationEnabled = isImpersonationEnabled;
}
@Override
@@ -66,7 +71,14 @@ class DrillValidator extends SqlValidatorImpl {
}
}
}
- super.validateFrom(node, targetRowType, scope);
+ if (isImpersonationEnabled) {
+ ImpersonationUtil.getProcessUserUGI().doAs((PrivilegedAction<Void>) () -> {
+ super.validateFrom(node, targetRowType, scope);
+ return null;
+ });
+ } else {
+ super.validateFrom(node, targetRowType, scope);
+ }
}
private void replaceAliasWithActualName(SqlIdentifier tempNode) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/SqlConverter.java
index 03ed970..ce3cf4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/SqlConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/SqlConverter.java
@@ -50,7 +50,6 @@ import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.ops.UdfUtilities;
import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.planner.logical.DrillConstExecutor;
import org.apache.drill.exec.planner.logical.DrillRelFactories;
@@ -88,7 +87,7 @@ public class SqlConverter {
private final DrillValidator validator;
private final boolean isInnerQuery;
private final boolean isExpandedView;
- private final UdfUtilities util;
+ private final QueryContext util;
private final FunctionImplementationRegistry functions;
private final String temporarySchema;
private final UserSession session;
@@ -139,7 +138,8 @@ public class SqlConverter {
this::getDefaultSchema);
this.opTab = new ChainedSqlOperatorTable(Arrays.asList(context.getDrillOperatorTable(), catalog));
this.costFactory = (settings.useDefaultCosting()) ? null : new DrillCostBase.DrillCostFactory();
- this.validator = new DrillValidator(opTab, catalog, typeFactory, parserConfig.conformance());
+ this.validator =
+ new DrillValidator(opTab, catalog, typeFactory, parserConfig.conformance(), context.isImpersonationEnabled());
validator.setIdentifierExpansion(true);
cluster = null;
}
@@ -160,7 +160,8 @@ public class SqlConverter {
this.catalog = catalog;
this.opTab = parent.opTab;
this.planner = parent.planner;
- this.validator = new DrillValidator(opTab, catalog, typeFactory, parserConfig.conformance());
+ this.validator =
+ new DrillValidator(opTab, catalog, typeFactory, parserConfig.conformance(), util.isImpersonationEnabled());
this.temporarySchema = parent.temporarySchema;
this.session = parent.session;
this.drillConfig = parent.drillConfig;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
index 3fe4bc8..98b4793 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
@@ -138,21 +138,12 @@ public class KerberosFactory implements AuthenticatorFactory {
// ignore parts[2]; GSSAPI gets the realm info from the ticket
try {
- final SaslClient saslClient = ugi.doAs(new PrivilegedExceptionAction<SaslClient>() {
-
- @Override
- public SaslClient run() throws Exception {
- return FastSaslClientFactory.getInstance().createSaslClient(new String[]{KerberosUtil.KERBEROS_SASL_NAME},
- null /** authorization ID */, serviceName, serviceHostName, properties,
- new CallbackHandler() {
- @Override
- public void handle(final Callback[] callbacks)
- throws IOException, UnsupportedCallbackException {
- throw new UnsupportedCallbackException(callbacks[0]);
- }
- });
- }
- });
+ final SaslClient saslClient = ugi.doAs((PrivilegedExceptionAction<SaslClient>) () ->
+ FastSaslClientFactory.getInstance().createSaslClient(new String[]{ KerberosUtil.KERBEROS_SASL_NAME },
+ null /* authorization ID */, serviceName, serviceHostName, properties,
+ callbacks -> {
+ throw new UnsupportedCallbackException(callbacks[0]);
+ }));
logger.debug("GSSAPI SaslClient created to authenticate to {} running on {} with QOP value {}",
serviceName, serviceHostName, qopValue);
return saslClient;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java
index cf0afcf..40ea0a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java
@@ -58,9 +58,11 @@ public class InboundImpersonationManager {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InboundImpersonationManager.class);
private static final String STAR = "*";
-
private static final ObjectMapper impersonationPolicyMapper = new ObjectMapper();
+ private List<ImpersonationPolicy> impersonationPolicies;
+ private String policiesString; // used to test if policies changed
+
static {
impersonationPolicyMapper.configure(JsonGenerator.Feature.QUOTE_FIELD_NAMES, false);
impersonationPolicyMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
@@ -152,10 +154,6 @@ public class InboundImpersonationManager {
deserializeImpersonationPolicies(policiesString));
}
-
- private List<ImpersonationPolicy> impersonationPolicies;
- private String policiesString; // used to test if policies changed
-
/**
* Check if the current session user, as a proxy user, is authorized to impersonate the given target user
* based on the system's impersonation policies.
@@ -164,8 +162,7 @@ public class InboundImpersonationManager {
* @param session user session
*/
public void replaceUserOnSession(final String targetName, final UserSession session) {
- final String policiesString = session.getOptions()
- .getOption(ExecConstants.IMPERSONATION_POLICY_VALIDATOR);
+ final String policiesString = session.getOptions().getOption(ExecConstants.IMPERSONATION_POLICY_VALIDATOR);
if (!policiesString.equals(this.policiesString)) {
try {
impersonationPolicies = deserializeImpersonationPolicies(policiesString);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
index 4e4c80e..da7352e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
@@ -67,7 +67,6 @@ class UserConnectionConfig extends AbstractConnectionConfig {
maxWrappedSize, RpcConstants.MAX_RECOMMENDED_WRAPPED_SIZE);
}
encryptionContext.setMaxWrappedSize(maxWrappedSize);
-
logger.info("Configured all user connections to require authentication with encryption: {} using: {}",
encryptionContext.getEncryptionCtxtString(), authProvider.getAllFactoryNames());
} else if (config.getBoolean(ExecConstants.USER_ENCRYPTION_SASL_ENABLED)) {
@@ -76,16 +75,13 @@ class UserConnectionConfig extends AbstractConnectionConfig {
} else {
authEnabled = false;
}
- impersonationManager = !config.getBoolean(ExecConstants.IMPERSONATION_ENABLED)
- ? null
- : new InboundImpersonationManager();
-
+ impersonationManager = config.getBoolean(ExecConstants.IMPERSONATION_ENABLED)
+ ? new InboundImpersonationManager()
+ : null;
sslEnabled = config.getBoolean(ExecConstants.USER_SSL_ENABLED);
-
- if(isSSLEnabled() && isAuthEnabled() && isEncryptionEnabled()){
+ if (isSSLEnabled() && isAuthEnabled() && isEncryptionEnabled()) {
logger.warn("The server is configured to use both SSL and SASL encryption (only one should be configured).");
}
-
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
index 00c4755..28aee4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
@@ -407,4 +407,21 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer,
public boolean areTableNamesCaseSensitive() {
return true;
}
+
+ /**
+ * @param impersonated User whom to impersonate. Usually {@link SchemaConfig#getUserName()}
+ * @param notImpersonated Regular user (table owner for Views or Drillbit process user for Tables)
+ * @return endUser
+ */
+ public String getUser(String impersonated, String notImpersonated) {
+ return notImpersonated;
+ }
+
+ /**
+ * Does Drill needs to impersonate as user connected to Drill when reading data from DataSource?
+ * @return True when both Drill impersonation and DataSource impersonation are enabled.
+ */
+ public boolean needToImpersonateReadingData() {
+ return false;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
index e916149..2068295 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
import org.apache.drill.exec.util.FileSystemUtil;
+import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.metastore.MetastoreColumn;
import org.apache.drill.metastore.Metastore;
import org.apache.drill.metastore.components.tables.BasicTablesRequests;
@@ -45,8 +46,10 @@ import org.apache.drill.metastore.metadata.MetadataType;
import org.apache.drill.metastore.statistics.ColumnStatistics;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -181,6 +184,13 @@ public interface RecordCollector {
@Override
public List<Records.Column> columns(String schemaPath, SchemaPlus schema) {
AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
+ UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
+ return drillSchema.needToImpersonateReadingData()
+ ? ugi.doAs((PrivilegedAction<List<Records.Column>>) () -> processColumns(schemaPath, schema, drillSchema))
+ : processColumns(schemaPath, schema, drillSchema);
+ }
+
+ private List<Records.Column> processColumns(String schemaPath, SchemaPlus schema, AbstractSchema drillSchema) {
List<Records.Column> records = new ArrayList<>();
for (Pair<String, ? extends Table> tableNameToTable : drillSchema.getTablesByNames(schema.getTableNames())) {
String tableName = tableNameToTable.getKey();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java
index 3edcb1d..d6854be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java
@@ -99,8 +99,6 @@ public class ImpersonationUtil {
}
return true;
}
-
-
}
/**
* Create and return proxy user {@link org.apache.hadoop.security.UserGroupInformation} of operator owner if operator
@@ -127,7 +125,7 @@ public class ImpersonationUtil {
}
/**
- * Create and return proxy user {@link org.apache.hadoop.security.UserGroupInformation} for give user name.
+ * Create and return proxy user {@link org.apache.hadoop.security.UserGroupInformation} for given user name.
*
* @param proxyUserName Proxy user name (must be valid)
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 0e03b01..e13857a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -307,27 +307,24 @@ public class FragmentExecutor implements Runnable {
ImpersonationUtil.createProxyUgi(fragmentContext.getQueryUserName()) :
ImpersonationUtil.getProcessUserUGI();
- queryUserUgi.doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
-
- while (shouldContinue()) {
- // Fragment is not cancelled
-
- for (FragmentHandle fragmentHandle; (fragmentHandle = receiverFinishedQueue.poll()) != null;) {
- // See if we have any finished requests. If so execute them.
- root.receivingFragmentFinished(fragmentHandle);
- }
-
- if (!root.next()) {
- // Fragment has processed all of its data
- break;
- }
+ queryUserUgi.doAs((PrivilegedExceptionAction<Void>) () -> {
+ injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
+
+ while (shouldContinue()) {
+ // Fragment is not cancelled
+
+ for (FragmentHandle fragmentHandle1; (fragmentHandle1 = receiverFinishedQueue.poll()) != null;) {
+ // See if we have any finished requests. If so execute them.
+ root.receivingFragmentFinished(fragmentHandle1);
}
- return null;
+ if (!root.next()) {
+ // Fragment has processed all of its data
+ break;
+ }
}
+
+ return null;
});
} catch (QueryCancelledException e) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java
index 570d20a..5670b0b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java
@@ -152,4 +152,4 @@ public class KerberosHelper {
Files.deleteIfExists(file.toPath());
}
}
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
index 442fd8e..1a55c4f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
@@ -66,6 +66,11 @@ public class ClientFixture implements AutoCloseable {
clientProps = cluster.getClientProps();
}
+ protected ClientBuilder(ClusterFixture cluster, Properties properties) {
+ this.cluster = cluster;
+ clientProps = properties;
+ }
+
/**
* Specify an optional client property.
* @param key property name
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index 3a027ed..16f302e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -72,6 +72,8 @@ import org.apache.hadoop.fs.FileSystem;
* execute queries. Can be used in JUnit tests, or in ad-hoc programs. Provides
* a builder to set the necessary embedded Drillbit and client options, then
* creates the requested Drillbit and client.
+ * TODO: To support User Impersonation add Configuration dfsConf and set hadoop.proxyuser settings
+ * similar to {@link org.apache.drill.exec.impersonation.BaseTestImpersonation}
*/
public class ClusterFixture extends BaseFixture implements AutoCloseable {
public static final int MAX_WIDTH_PER_NODE = 2;
@@ -302,6 +304,10 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
return new ClientFixture.ClientBuilder(this);
}
+ public ClientFixture.ClientBuilder clientBuilder(Properties properties) {
+ return new ClientFixture.ClientBuilder(this, properties);
+ }
+
public RestClientFixture.Builder restClientBuilder() {
return new RestClientFixture.Builder(this);
}
@@ -316,6 +322,19 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
}
/**
+ * It can be used to add one more client for {@link ClusterFixture}. <br>
+ * Note: {@link ClusterTest#client}
+ *
+ * @param properties client Properties (clientProps)
+ * @return new ClientFixture for current ClusterFixture
+ */
+ public ClientFixture addClientFixture(Properties properties) {
+ return clientBuilder(properties)
+ .property(DrillProperties.DRILLBIT_CONNECTION, String.format("localhost:%s", drillbit().getUserPort()))
+ .build();
+ }
+
+ /**
* Create a test client for a specific host and port.
*
* @param host host, must be one of those created by this
@@ -343,6 +362,10 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
return clientFixture().client();
}
+ public ClientFixture client(int number) {
+ return clients.get(number);
+ }
+
/**
* Return a JDBC connection to the default (first) Drillbit.
* Note that this code requires special setup of the test code.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
index 4b2b4e0..a510c58 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
@@ -115,7 +115,7 @@ public class ClusterFixtureBuilder {
/**
* Add an additional boot-time property for the embedded Drillbit.
* @param key config property name
- * @param value property value
+ * @param value String property value
* @return this builder
*/
public ClusterFixtureBuilder configProperty(String key, Object value) {
@@ -123,6 +123,17 @@ public class ClusterFixtureBuilder {
return this;
}
+ /**
+ * Non-string {@link #configProperty}
+ * @param key config property name
+ * @param value property value
+ * @return this builder
+ */
+ public ClusterFixtureBuilder configNonStringProperty(String key, Object value) {
+ configBuilder.put(key, value);
+ return this;
+ }
+
public ClusterFixtureBuilder putDefinition(OptionDefinition definition) {
configBuilder.putDefinition(definition);
return this;
diff --git a/pom.xml b/pom.xml
index 76e4bfc..2047d0d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,7 @@
<proto.cas.path>${project.basedir}/src/main/protobuf/</proto.cas.path>
<junit.version>5.7.2</junit.version>
<junit4.version>4.13.2</junit4.version>
+ <junit.platform.version>1.8.2</junit.platform.version>
<slf4j.version>1.7.26</slf4j.version>
<shaded.guava.version>28.2-jre</shaded.guava.version>
<guava.version>30.1.1-jre</guava.version>
@@ -1143,6 +1144,30 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.junit.platform</groupId>
+ <artifactId>junit-platform-suite-api</artifactId>
+ <version>${junit.platform.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.platform</groupId>
+ <artifactId>junit-platform-suite-engine</artifactId>
+ <version>${junit.platform.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.platform</groupId>
+ <artifactId>junit-platform-engine</artifactId>
+ <version>${junit.platform.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.platform</groupId>
+ <artifactId>junit-platform-launcher</artifactId>
+ <version>${junit.platform.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
@@ -2269,6 +2294,10 @@
<groupId>com.nimbusds</groupId>
<artifactId>nimbus-jose-jwt</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>