You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2023/02/07 12:22:07 UTC
[drill] branch master updated: DRILL-8387: Add Support for User Translation to ElasticSearch Plugin (#2739)
This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new a7b7735c51 DRILL-8387: Add Support for User Translation to ElasticSearch Plugin (#2739)
a7b7735c51 is described below
commit a7b7735c51cd92667f7ac023044ad7b3fbeb8002
Author: Charles S. Givre <cg...@apache.org>
AuthorDate: Tue Feb 7 07:21:59 2023 -0500
DRILL-8387: Add Support for User Translation to ElasticSearch Plugin (#2739)
---
contrib/storage-elasticsearch/.gitignore | 1 +
contrib/storage-elasticsearch/README.md | 18 +-
contrib/storage-elasticsearch/pom.xml | 22 +-
.../elasticsearch/ElasticsearchStorageConfig.java | 86 +++-
.../elasticsearch/ElasticsearchStoragePlugin.java | 20 +
.../schema/ElasticsearchDrillSchemaFactory.java | 22 +-
.../elasticsearch/ElasticComplexTypesTest.java | 135 +++--
.../store/elasticsearch/ElasticInfoSchemaTest.java | 118 +++--
.../store/elasticsearch/ElasticSearchPlanTest.java | 87 ++--
.../elasticsearch/ElasticSearchQueryTest.java | 565 ++++++++++++---------
.../ElasticSearchUserTranslationTest.java | 245 +++++++++
.../elasticsearch/TestElasticsearchSuite.java | 123 ++++-
12 files changed, 1006 insertions(+), 436 deletions(-)
diff --git a/contrib/storage-elasticsearch/.gitignore b/contrib/storage-elasticsearch/.gitignore
new file mode 100644
index 0000000000..05c3e7ac9a
--- /dev/null
+++ b/contrib/storage-elasticsearch/.gitignore
@@ -0,0 +1 @@
+/src/test/resources/logback-test.xml
diff --git a/contrib/storage-elasticsearch/README.md b/contrib/storage-elasticsearch/README.md
index 2ef77adb61..c55f7ee0b3 100644
--- a/contrib/storage-elasticsearch/README.md
+++ b/contrib/storage-elasticsearch/README.md
@@ -5,7 +5,7 @@ This storage plugin implementation is based on [Apache Calcite adapter for Elast
For more details about supported versions please refer to [Supported versions](https://calcite.apache.org/docs/elasticsearch_adapter.html#supported-versions) page.
-### Supported optimizations and features
+## Supported Optimizations and Features
This storage plugin supports the following optimizations:
@@ -19,7 +19,7 @@ This storage plugin supports the following optimizations:
Besides these optimizations, ElasticSearch storage plugin supports the schema provisioning feature.
For more details please refer to [Specifying the Schema as Table Function Parameter](https://drill.apache.org/docs/plugin-configuration-basics/#specifying-the-schema-as-table-function-parameter).
-### Plugin registration
+## Plugin Registration
The plugin can be registered in Apache Drill using the drill web interface by navigating to the `storage` page.
Following is the default registration configuration.
@@ -36,7 +36,19 @@ Following is the default registration configuration.
}
```
-### Developer notes
+## User Translation
+The ElasticSearch plugin supports user translation, which allows each individual user to access ElasticSearch with their own credentials.
+
+Simply add the following to the storage plugin configuration:
+```json
+ "authType": "USER_TRANSLATION",
+ "credentialsProvider": {
+ "credentialsProviderType": "PlainCredentialsProvider"
+ }
+```
+
+
+## Developer Notes
Most of the common classes required for creating storage plugins based on Calcite adapters are placed in the
`java-exec` module, so they can be reused in future plugin implementations.
diff --git a/contrib/storage-elasticsearch/pom.xml b/contrib/storage-elasticsearch/pom.xml
index 7576216757..51a6cada62 100644
--- a/contrib/storage-elasticsearch/pom.xml
+++ b/contrib/storage-elasticsearch/pom.xml
@@ -67,10 +67,28 @@
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
- <version>7.17.5</version>
+ <artifactId>elasticsearch-rest-client</artifactId>
+ <version>8.6.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>co.elastic.clients</groupId>
+ <artifactId>elasticsearch-java</artifactId>
+ <version>8.6.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.http-client</groupId>
+ <artifactId>google-http-client</artifactId>
+ <version>1.42.3</version>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>net.hydromatic</groupId>
<artifactId>foodmart-data-json</artifactId>
diff --git a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
index cc1c165cd4..f38246b796 100644
--- a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
+++ b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
@@ -24,7 +24,9 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
import org.apache.drill.exec.store.security.CredentialProviderUtils;
import org.apache.drill.common.logical.security.CredentialsProvider;
import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
@@ -63,36 +65,27 @@ public class ElasticsearchStorageConfig extends StoragePluginConfig {
@JsonProperty(USERNAME) String username,
@JsonProperty(PASSWORD) String password,
@JsonProperty(PATH_PREFIX) String pathPrefix,
+ @JsonProperty("authMode") String authMode,
@JsonProperty(CREDENTIALS_PROVIDER) CredentialsProvider credentialsProvider) {
- super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider), credentialsProvider == null);
+ super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider),
+ credentialsProvider == null, AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER));
this.hosts = hosts;
this.pathPrefix = pathPrefix;
}
- public List<String> getHosts() {
- return hosts;
+ private ElasticsearchStorageConfig(ElasticsearchStorageConfig that, CredentialsProvider credentialsProvider) {
+ super(getCredentialsProvider(credentialsProvider), credentialsProvider == null, that.authMode);
+ this.hosts = that.hosts;
+ this.pathPrefix = that.pathPrefix;
}
- public String getPathPrefix() {
- return pathPrefix;
+ @Override
+ public ElasticsearchStorageConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+ return new ElasticsearchStorageConfig(this, credentialsProvider);
}
- public String getUsername() {
- if (!directCredentials) {
- return null;
- }
- return getUsernamePasswordCredentials()
- .map(UsernamePasswordCredentials::getUsername)
- .orElse(null);
- }
-
- public String getPassword() {
- if (!directCredentials) {
- return null;
- }
- return getUsernamePasswordCredentials()
- .map(UsernamePasswordCredentials::getPassword)
- .orElse(null);
+ private static CredentialsProvider getCredentialsProvider(CredentialsProvider credentialsProvider) {
+ return credentialsProvider != null ? credentialsProvider : PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
}
@JsonIgnore
@@ -102,6 +95,18 @@ public class ElasticsearchStorageConfig extends StoragePluginConfig {
.build();
}
+ /**
+ * Gets the credentials. This method is used when user translation is enabled.
+ * @return An {@link Optional} containing {@link UsernamePasswordCredentials} from the config.
+ */
+ @JsonIgnore
+ public Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials(String username) {
+ return new UsernamePasswordCredentials.Builder()
+ .setCredentialsProvider(credentialsProvider)
+ .setQueryUser(username)
+ .build();
+ }
+
@JsonIgnore
public Map<String, Object> toConfigMap()
throws JsonProcessingException {
@@ -118,6 +123,29 @@ public class ElasticsearchStorageConfig extends StoragePluginConfig {
return builder.build();
}
+ /**
+ * This method is used when user translation is enabled.
+ * @param queryUser The user who submitted the query
+ * @return A map of the configuration details.
+ * @throws JsonProcessingException If JSON is unparsable, throw exception
+ */
+ @JsonIgnore
+ public Map<String, Object> toConfigMap(String queryUser)
+ throws JsonProcessingException {
+ Map<String, String> credentials = new HashMap<>(credentialsProvider.getUserCredentials(queryUser));
+ ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
+ builder.put(HOSTS, OBJECT_WRITER.writeValueAsString(hosts));
+ builder.put(PATH_PREFIX, pathPrefix != null ? pathPrefix : EMPTY_STRING);
+ builder.put(USERNAME, credentials.getOrDefault(USERNAME, EMPTY_STRING));
+ builder.put(PASSWORD, credentials.getOrDefault(PASSWORD, EMPTY_STRING));
+
+ credentials.remove(USERNAME);
+ credentials.remove(PASSWORD);
+ builder.putAll(credentials);
+ return builder.build();
+ }
+
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -127,12 +155,22 @@ public class ElasticsearchStorageConfig extends StoragePluginConfig {
return false;
}
ElasticsearchStorageConfig that = (ElasticsearchStorageConfig) o;
- return Objects.equals(hosts, that.hosts)
- && Objects.equals(credentialsProvider, that.credentialsProvider);
+ return Objects.equals(hosts, that.hosts) &&
+ Objects.equals(pathPrefix, that.pathPrefix) &&
+ Objects.equals(credentialsProvider, that.credentialsProvider);
}
@Override
public int hashCode() {
- return Objects.hash(hosts, credentialsProvider);
+ return Objects.hash(hosts, pathPrefix, credentialsProvider);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("hosts", hosts)
+ .field("pathPrefix", pathPrefix)
+ .field("credentialsProvider", credentialsProvider)
+ .toString();
}
}
diff --git a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStoragePlugin.java b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStoragePlugin.java
index 95cdbee18a..27629ac7a8 100644
--- a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStoragePlugin.java
+++ b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStoragePlugin.java
@@ -21,17 +21,23 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.planner.PlannerPhase;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.elasticsearch.schema.ElasticsearchDrillSchemaFactory;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collections;
+import java.util.Optional;
import java.util.Set;
public class ElasticsearchStoragePlugin extends AbstractStoragePlugin {
+ private static final Logger logger = LoggerFactory.getLogger(ElasticsearchStoragePlugin.class);
private final ElasticsearchStorageConfig config;
private final ElasticsearchDrillSchemaFactory schemaFactory;
@@ -44,6 +50,20 @@ public class ElasticsearchStoragePlugin extends AbstractStoragePlugin {
@Override
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws JsonProcessingException {
+ // Check to see if user translation is enabled. If so, and creds are
+ // not present, then do not register any schemata. This prevents
+ // info schema errors.
+ if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
+ Optional<UsernamePasswordCredentials> userCreds = config.getUsernamePasswordCredentials(schemaConfig.getUserName());
+ if (! userCreds.isPresent()) {
+ logger.debug(
+ "No schemas will be registered in {} for query user {}.",
+ getName(), schemaConfig.getUserName()
+ );
+ return;
+ }
+ }
+
schemaFactory.registerSchemas(schemaConfig, parent);
}
diff --git a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/schema/ElasticsearchDrillSchemaFactory.java b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/schema/ElasticsearchDrillSchemaFactory.java
index 5430743149..15005ec638 100644
--- a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/schema/ElasticsearchDrillSchemaFactory.java
+++ b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/schema/ElasticsearchDrillSchemaFactory.java
@@ -19,13 +19,20 @@ package org.apache.drill.exec.store.elasticsearch.schema;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.calcite.adapter.elasticsearch.ElasticsearchSchemaFactory;
+import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
import org.apache.drill.exec.store.AbstractSchemaFactory;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.elasticsearch.ElasticsearchStoragePlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class ElasticsearchDrillSchemaFactory extends AbstractSchemaFactory {
+ private static final Logger logger = LoggerFactory.getLogger(ElasticsearchDrillSchemaFactory.class);
private final ElasticsearchStoragePlugin plugin;
private final ElasticsearchSchemaFactory delegate;
@@ -37,8 +44,19 @@ public class ElasticsearchDrillSchemaFactory extends AbstractSchemaFactory {
@Override
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws JsonProcessingException {
- ElasticsearchDrillSchema schema = new ElasticsearchDrillSchema(getName(), plugin,
- delegate.create(parent, getName(), plugin.getConfig().toConfigMap()));
+ ElasticsearchDrillSchema schema;
+ if (plugin.getConfig().getAuthMode() == AuthMode.SHARED_USER) {
+ Schema elasticsearchSchema = delegate.create(parent, getName(), plugin.getConfig().toConfigMap());
+ schema = new ElasticsearchDrillSchema(getName(), plugin, elasticsearchSchema);
+ } else if (plugin.getConfig().getAuthMode() == AuthMode.USER_TRANSLATION) {
+ // Get user's info
+ Schema elasticsearchUTSchema = delegate.create(parent, getName(), plugin.getConfig().toConfigMap(schemaConfig.getUserName()));
+ schema = new ElasticsearchDrillSchema(getName(), plugin, elasticsearchUTSchema);
+ } else {
+ throw UserException.internalError()
+ .message("User Impersonation not supported as an authentication mode for ElasticSearch. The only authentication modes supported are SHARED_USER and USER_TRANSLATION")
+ .build(logger);
+ }
parent.add(getName(), schema);
}
}
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java
index a7109b2725..49826ffef7 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java
@@ -17,38 +17,41 @@
*/
package org.apache.drill.exec.store.elasticsearch;
-import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
-import org.apache.http.HttpHost;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xcontent.XContentFactory;
+import org.json.simple.JSONObject;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.RefreshRequest;
+import co.elastic.clients.json.JsonData;
import java.io.IOException;
-import java.util.ArrayList;
+import java.io.Reader;
+import java.io.StringReader;
import java.util.Arrays;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import static org.apache.drill.test.TestBuilder.listOf;
import static org.apache.drill.test.TestBuilder.mapOf;
public class ElasticComplexTypesTest extends ClusterTest {
- private static final List<String> indexNames = new ArrayList<>();
-
- public static RestHighLevelClient restHighLevelClient;
+ private static final Logger logger = LoggerFactory.getLogger(ElasticComplexTypesTest.class);
+ private static final List<String> indexNames = new LinkedList<>();
+ private static ElasticsearchClient elasticsearchClient;
@BeforeClass
public static void init() throws Exception {
@@ -57,57 +60,81 @@ public class ElasticComplexTypesTest extends ClusterTest {
ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
Collections.singletonList(TestElasticsearchSuite.getAddress()),
- null, null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+ TestElasticsearchSuite.ELASTICSEARCH_USERNAME,
+ TestElasticsearchSuite.ELASTICSEARCH_PASSWORD,
+ null, AuthMode.SHARED_USER.name(),
+ null
+ );
config.setEnabled(true);
cluster.defineStoragePlugin("elastic", config);
+ elasticsearchClient = TestElasticsearchSuite.getESClient();
prepareData();
}
@AfterClass
public static void cleanUp() throws IOException {
- for (String indexName : indexNames) {
- restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
- }
+ DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder()
+ .index(indexNames)
+ .build();
+
+ elasticsearchClient.indices().delete(deleteIndexRequest);
TestElasticsearchSuite.tearDownCluster();
}
private static void prepareData() throws IOException {
- restHighLevelClient = new RestHighLevelClient(
- RestClient.builder(HttpHost.create(TestElasticsearchSuite.elasticsearch.getHttpHostAddress())));
-
- String indexName = "arr";
- indexNames.add(indexName);
- CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
-
- restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
-
- XContentBuilder builder = XContentFactory.jsonBuilder();
- builder.startObject();
- builder.field("string_arr", Arrays.asList("a", "b", "c", "d"));
- builder.field("int_arr", Arrays.asList(1, 2, 3, 4, 0));
- builder.field("nest_int_arr", Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4, 0)));
- builder.endObject();
- IndexRequest indexRequest = new IndexRequest(indexName).source(builder);
- restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
- restHighLevelClient.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
-
- indexName = "map";
- indexNames.add(indexName);
- createIndexRequest = new CreateIndexRequest(indexName);
-
- restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
-
- builder = XContentFactory.jsonBuilder();
- builder.startObject();
- builder.field("prim_field", 321);
- builder.field("nest_field", ImmutableMap.of("a", 123, "b", "abc"));
- builder.field("more_nest_field", ImmutableMap.of("a", 123, "b", ImmutableMap.of("c", "abc")));
- builder.field("map_arr", Collections.singletonList(ImmutableMap.of("a", 123, "b", ImmutableMap.of("c", "abc"))));
- builder.endObject();
- indexRequest = new IndexRequest(indexName).source(builder);
- restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
- restHighLevelClient.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
+ {
+ indexNames.add("arr");
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
+ .index("arr")
+ .build();
+ elasticsearchClient.indices().create(createIndexRequest);
+
+ final Reader input1 = new StringReader(
+ JSONObject.toJSONString(ImmutableMap.of(
+ "string_arr", Arrays.asList("a", "b", "c", "d"),
+ "int_arr", Arrays.asList(1, 2, 3, 4, 0),
+ "nest_int_arr", Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4, 0))
+ ))
+ );
+ IndexRequest<JsonData> request = IndexRequest.of(i -> i
+ .index("arr")
+ .withJson(input1)
+ );
+ logger.debug("Insert response {}", elasticsearchClient.index(request));
+ }
+ {
+ indexNames.add("map");
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
+ .index("map")
+ .build();
+ elasticsearchClient.indices().create(createIndexRequest);
+
+ Map<String, Object> map = ImmutableMap.of("a", 123, "b", "abc");
+ Map<String, Object> nested_map = ImmutableMap.of(
+ "a", 123,
+ "b", ImmutableMap.of("c", "abc")
+ );
+ final Reader input1 = new StringReader(
+ JSONObject.toJSONString(ImmutableMap.of(
+ "prim_field", 321,
+ "nest_field", map,
+ "more_nest_field", nested_map,
+ "map_arr", Collections.singletonList(nested_map)
+ ))
+ );
+ IndexRequest<JsonData> request = IndexRequest.of(i -> i
+ .index("map")
+ .withJson(input1)
+ );
+ logger.debug("Insert response {}", elasticsearchClient.index(request));
+
+ RefreshRequest refreshRequest = new RefreshRequest.Builder()
+ .index(indexNames)
+ .build();
+ elasticsearchClient.indices().refresh(refreshRequest);
+ logger.debug("Data preparation complete.");
+ }
}
@Test
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java
index dba28c0fb5..d28f3133d4 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java
@@ -17,33 +17,33 @@
*/
package org.apache.drill.exec.store.elasticsearch;
-import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
-import org.apache.http.HttpHost;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xcontent.XContentFactory;
+import org.json.simple.JSONObject;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.RefreshRequest;
+import co.elastic.clients.json.JsonData;
+
import java.io.IOException;
-import java.util.ArrayList;
+import java.io.Reader;
+import java.io.StringReader;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
public class ElasticInfoSchemaTest extends ClusterTest {
- private static final List<String> indexNames = new ArrayList<>();
-
- public static RestHighLevelClient restHighLevelClient;
+ private static final List<String> indexNames = new LinkedList<>();
+ private static ElasticsearchClient elasticsearchClient;
@BeforeClass
public static void init() throws Exception {
@@ -52,53 +52,71 @@ public class ElasticInfoSchemaTest extends ClusterTest {
ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
Collections.singletonList(TestElasticsearchSuite.getAddress()),
- null, null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+ TestElasticsearchSuite.ELASTICSEARCH_USERNAME,
+ TestElasticsearchSuite.ELASTICSEARCH_PASSWORD,
+ null, AuthMode.SHARED_USER.name(),
+ null
+ );
+
config.setEnabled(true);
cluster.defineStoragePlugin("elastic", config);
+ elasticsearchClient = TestElasticsearchSuite.getESClient();
prepareData();
}
@AfterClass
public static void cleanUp() throws IOException {
- for (String indexName : indexNames) {
- restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
- }
+ DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder()
+ .index(indexNames)
+ .build();
+
+ elasticsearchClient.indices().delete(deleteIndexRequest);
TestElasticsearchSuite.tearDownCluster();
}
private static void prepareData() throws IOException {
- restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(TestElasticsearchSuite.getAddress())));
-
- String indexName = "t1";
- indexNames.add(indexName);
- CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
-
- restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
-
- XContentBuilder builder = XContentFactory.jsonBuilder();
- builder.startObject();
- builder.field("string_field", "a");
- builder.field("int_field", 123);
- builder.endObject();
- IndexRequest indexRequest = new IndexRequest(indexName).source(builder);
- restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
- restHighLevelClient.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
-
- indexName = "t2";
- indexNames.add(indexName);
- createIndexRequest = new CreateIndexRequest(indexName);
-
- restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
-
- builder = XContentFactory.jsonBuilder();
- builder.startObject();
- builder.field("another_int_field", 321);
- builder.field("another_string_field", "b");
- builder.endObject();
- indexRequest = new IndexRequest(indexName).source(builder);
- restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
- restHighLevelClient.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
+ {
+ indexNames.add("t1");
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
+ .index("t1")
+ .build();
+ elasticsearchClient.indices().create(createIndexRequest);
+
+ final Reader input1 = new StringReader(
+ JSONObject.toJSONString(
+ ImmutableMap.of("string_field", "a", "int_field", 123)
+ )
+ );
+ IndexRequest<JsonData> request = IndexRequest.of(i -> i
+ .index("t1")
+ .withJson(input1)
+ );
+ elasticsearchClient.index(request);
+ }
+ {
+ indexNames.add("t2");
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
+ .index("t2")
+ .build();
+ elasticsearchClient.indices().create(createIndexRequest);
+
+ final Reader input2 = new StringReader(
+ JSONObject.toJSONString(
+ ImmutableMap.of("another_string_field", "b", "another_int_field", 321)
+ )
+ );
+ IndexRequest<JsonData> request = IndexRequest.of(i -> i
+ .index("t2")
+ .withJson(input2)
+ );
+ elasticsearchClient.index(request);
+
+ RefreshRequest refreshRequest = new RefreshRequest.Builder()
+ .index(indexNames)
+ .build();
+ elasticsearchClient.indices().refresh(refreshRequest);
+ }
}
@Test
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
index 99e520d0a9..01fa7546ec 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
@@ -17,31 +17,33 @@
*/
package org.apache.drill.exec.store.elasticsearch;
-import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
-import org.apache.http.HttpHost;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xcontent.XContentFactory;
+import org.json.simple.JSONObject;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.RefreshRequest;
+import co.elastic.clients.json.JsonData;
+
import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
public class ElasticSearchPlanTest extends ClusterTest {
- public static RestHighLevelClient restHighLevelClient;
-
- private static String indexName;
+ private static ElasticsearchClient elasticsearchClient;
+ private static final List<String> indexNames = new LinkedList<>();
@BeforeClass
public static void init() throws Exception {
@@ -49,38 +51,55 @@ public class ElasticSearchPlanTest extends ClusterTest {
startCluster(ClusterFixture.builder(dirTestWatcher));
ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
- Collections.singletonList(TestElasticsearchSuite.getAddress()),
- null, null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+ Collections.singletonList(TestElasticsearchSuite.getAddress()),
+ TestElasticsearchSuite.ELASTICSEARCH_USERNAME,
+ TestElasticsearchSuite.ELASTICSEARCH_PASSWORD,
+ null,
+ AuthMode.SHARED_USER.name(),
+ null
+ );
config.setEnabled(true);
cluster.defineStoragePlugin("elastic", config);
+ elasticsearchClient = TestElasticsearchSuite.getESClient();
prepareData();
}
@AfterClass
public static void cleanUp() throws IOException {
- restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
+ DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder()
+ .index(indexNames)
+ .build();
+
+ elasticsearchClient.indices().delete(deleteIndexRequest);
TestElasticsearchSuite.tearDownCluster();
}
private static void prepareData() throws IOException {
- restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(TestElasticsearchSuite.getAddress())));
-
- indexName = "nation";
- CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
-
- restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
-
- XContentBuilder builder = XContentFactory.jsonBuilder();
- builder.startObject();
- builder.field("n_nationkey", 0);
- builder.field("n_name", "ALGERIA");
- builder.field("n_regionkey", 1);
- builder.endObject();
- IndexRequest indexRequest = new IndexRequest(indexName).source(builder);
- restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
- restHighLevelClient.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
+ indexNames.add("nation");
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
+ .index("nation")
+ .build();
+ elasticsearchClient.indices().create(createIndexRequest);
+
+ final Reader input1 = new StringReader(
+ JSONObject.toJSONString(ImmutableMap.of(
+ "n_nationkey", 0,
+ "n_name", "ALGERIA",
+ "n_regionkey", 1
+ ))
+ );
+ IndexRequest<JsonData> request = IndexRequest.of(i -> i
+ .index("nation")
+ .withJson(input1)
+ );
+ elasticsearchClient.index(request);
+
+ RefreshRequest refreshRequest = new RefreshRequest.Builder()
+ .index(indexNames)
+ .build();
+
+ elasticsearchClient.indices().refresh(refreshRequest);
}
@Test
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java
index 9a0f64ecf7..704332ad05 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java
@@ -18,29 +18,33 @@
package org.apache.drill.exec.store.elasticsearch;
import org.apache.drill.common.exceptions.UserRemoteException;
-import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
-import org.apache.http.HttpHost;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xcontent.XContentFactory;
+import org.json.simple.JSONObject;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.RefreshRequest;
+import co.elastic.clients.json.JsonData;
+
import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.util.Base64;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -48,9 +52,8 @@ import static org.junit.Assert.fail;
public class ElasticSearchQueryTest extends ClusterTest {
- public static RestHighLevelClient restHighLevelClient;
-
- private static String indexName;
+ private static ElasticsearchClient elasticsearchClient;
+ private static final List<String> indexNames = new LinkedList<>();
@BeforeClass
public static void init() throws Exception {
@@ -58,257 +61,311 @@ public class ElasticSearchQueryTest extends ClusterTest {
startCluster(ClusterFixture.builder(dirTestWatcher));
ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
- Collections.singletonList(TestElasticsearchSuite.getAddress()),
- null, null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+ Collections.singletonList(TestElasticsearchSuite.getAddress()),
+ TestElasticsearchSuite.ELASTICSEARCH_USERNAME,
+ TestElasticsearchSuite.ELASTICSEARCH_PASSWORD,
+ null, AuthMode.SHARED_USER.name(),
+ null
+ );
config.setEnabled(true);
cluster.defineStoragePlugin("elastic", config);
+ elasticsearchClient = TestElasticsearchSuite.getESClient();
prepareData();
}
@AfterClass
public static void cleanUp() throws IOException {
- restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
+ DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder()
+ .index(indexNames)
+ .build();
+
+ elasticsearchClient.indices().delete(deleteIndexRequest);
TestElasticsearchSuite.tearDownCluster();
}
- private static void prepareData() throws IOException {
- restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(TestElasticsearchSuite.getAddress())));
-
- indexName = "employee";
- CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
-
- restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
-
- XContentBuilder builder = XContentFactory.jsonBuilder();
- builder.startObject();
- builder.field("employee_id", 1);
- builder.field("full_name", "Sheri Nowmer");
- builder.field("first_name", "Sheri");
- builder.field("last_name", "Nowmer");
- builder.field("position_id", 1);
- builder.field("position_title", "President");
- builder.field("store_id", 0);
- builder.field("department_id", 1);
- builder.field("birth_date", "1961-08-26");
- builder.field("hire_date", "1994-12-01 00:00:00.0");
- builder.field("salary", 80000.0);
- builder.field("supervisor_id", 0);
- builder.field("education_level", "Graduate Degree");
- builder.field("marital_status", "S");
- builder.field("gender", "F");
- builder.field("management_role", "Senior Management");
- builder.field("binary_field", "Senior Management".getBytes(StandardCharsets.UTF_8));
- builder.field("boolean_field", true);
- builder.timeField("date_field", "2015/01/01 12:10:30");
- builder.field("byte_field", (byte) 123);
- builder.field("long_field", 123L);
- builder.field("float_field", 123F);
- builder.field("short_field", (short) 123);
- builder.field("decimal_field", new BigDecimal("123.45"));
- builder.endObject();
- IndexRequest indexRequest = new IndexRequest(indexName).source(builder);
- restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
- builder = XContentFactory.jsonBuilder();
- builder.startObject();
- builder.field("employee_id", 2);
- builder.field("full_name", "Derrick Whelply");
- builder.field("first_name", "Derrick");
- builder.field("last_name", "Whelply");
- builder.field("position_id", 2);
- builder.field("position_title", "VP Country Manager");
- builder.field("store_id", 0);
- builder.field("department_id", 1);
- builder.field("birth_date", "1915-07-03");
- builder.field("hire_date", "1994-12-01 00:00:00.0");
- builder.field("salary", 40000.0);
- builder.field("supervisor_id", 1);
- builder.field("education_level", "Graduate Degree");
- builder.field("marital_status", "M");
- builder.field("gender", "M");
- builder.field("management_role", "Senior Management");
- builder.endObject();
- indexRequest = new IndexRequest(indexName).source(builder);
- restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
- builder = XContentFactory.jsonBuilder();
- builder.startObject();
- builder.field("employee_id", 4);
- builder.field("full_name", "Michael Spence");
- builder.field("first_name", "Michael");
- builder.field("last_name", "Spence");
- builder.field("position_id", 2);
- builder.field("position_title", "VP Country Manager");
- builder.field("store_id", 0);
- builder.field("department_id", 1);
- builder.field("birth_date", "1969-06-20");
- builder.field("hire_date", "1998-01-01 00:00:00.0");
- builder.field("salary", 40000.0);
- builder.field("supervisor_id", 1);
- builder.field("education_level", "Graduate Degree");
- builder.field("marital_status", "S");
- builder.field("gender", "M");
- builder.field("management_role", "Senior Management");
- builder.endObject();
- indexRequest = new IndexRequest(indexName).source(builder);
- restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
- builder = XContentFactory.jsonBuilder();
- builder.startObject();
- builder.field("employee_id", 5);
- builder.field("full_name", "Maya Gutierrez");
- builder.field("first_name", "Maya");
- builder.field("last_name", "Gutierrez");
- builder.field("position_id", 2);
- builder.field("position_title", "VP Country Manager");
- builder.field("store_id", 0);
- builder.field("department_id", 1);
- builder.field("birth_date", "1951-05-10");
- builder.field("hire_date", "1998-01-01 00:00:00.0");
- builder.field("salary", 35000.0);
- builder.field("supervisor_id", 1);
- builder.field("education_level", "Bachelors Degree");
- builder.field("marital_status", "M");
- builder.field("gender", "F");
- builder.field("management_role", "Senior Management");
- builder.endObject();
- indexRequest = new IndexRequest(indexName).source(builder);
- restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
- builder = XContentFactory.jsonBuilder();
- builder.startObject();
- builder.field("employee_id", 6);
- builder.field("full_name", "Roberta Damstra");
- builder.field("first_name", "Roberta");
- builder.field("last_name", "Damstra");
- builder.field("position_id", 3);
- builder.field("position_title", "VP Information Systems");
- builder.field("store_id", 0);
- builder.field("department_id", 2);
- builder.field("birth_date", "1942-10-08");
- builder.field("hire_date", "1994-12-01 00:00:00.0");
- builder.field("salary", 25000.0);
- builder.field("supervisor_id", 1);
- builder.field("education_level", "Bachelors Degree");
- builder.field("marital_status", "M");
- builder.field("gender", "F");
- builder.field("management_role", "Senior Management");
- builder.endObject();
- indexRequest = new IndexRequest(indexName).source(builder);
- restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
- builder = XContentFactory.jsonBuilder();
- builder.startObject();
- builder.field("employee_id", 7);
- builder.field("full_name", "Rebecca Kanagaki");
- builder.field("first_name", "Rebecca");
- builder.field("last_name", "Kanagaki");
- builder.field("position_id", 4);
- builder.field("position_title", "VP Human Resources");
- builder.field("store_id", 0);
- builder.field("department_id", 3);
- builder.field("birth_date", "1949-03-27");
- builder.field("hire_date", "1994-12-01 00:00:00.0");
- builder.field("salary", 15000.0);
- builder.field("supervisor_id", 1);
- builder.field("education_level", "Bachelors Degree");
- builder.field("marital_status", "M");
- builder.field("gender", "F");
- builder.field("management_role", "Senior Management");
- builder.endObject();
- indexRequest = new IndexRequest(indexName).source(builder);
- restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
- builder = XContentFactory.jsonBuilder();
- builder.startObject();
- builder.field("employee_id", 8);
- builder.field("full_name", "Kim Brunner");
- builder.field("first_name", "Kim");
- builder.field("last_name", "Brunner");
- builder.field("position_id", 11);
- builder.field("position_title", "Store Manager");
- builder.field("store_id", 9);
- builder.field("department_id", 11);
- builder.field("birth_date", "1922-08-10");
- builder.field("hire_date", "1998-01-01 00:00:00.0");
- builder.field("salary", 10000.0);
- builder.field("supervisor_id", 5);
- builder.field("education_level", "Bachelors Degree");
- builder.field("marital_status", "S");
- builder.field("gender", "F");
- builder.field("management_role", "Store Management");
- builder.endObject();
- indexRequest = new IndexRequest(indexName).source(builder);
- restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
- builder = XContentFactory.jsonBuilder();
- builder.startObject();
- builder.field("employee_id", 9);
- builder.field("full_name", "Brenda Blumberg");
- builder.field("first_name", "Brenda");
- builder.field("last_name", "Blumberg");
- builder.field("position_id", 11);
- builder.field("position_title", "Store Manager");
- builder.field("store_id", 21);
- builder.field("department_id", 11);
- builder.field("birth_date", "1979-06-23");
- builder.field("hire_date", "1998-01-01 00:00:00.0");
- builder.field("salary", 17000.0);
- builder.field("supervisor_id", 5);
- builder.field("education_level", "Graduate Degree");
- builder.field("marital_status", "M");
- builder.field("gender", "F");
- builder.field("management_role", "Store Management");
- builder.endObject();
- indexRequest = new IndexRequest(indexName).source(builder);
- restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
- builder = XContentFactory.jsonBuilder();
- builder.startObject();
- builder.field("employee_id", 10);
- builder.field("full_name", "Darren Stanz");
- builder.field("first_name", "Darren");
- builder.field("last_name", "Stanz");
- builder.field("position_id", 5);
- builder.field("position_title", "VP Finance");
- builder.field("store_id", 0);
- builder.field("department_id", 5);
- builder.field("birth_date", "1949-08-26");
- builder.field("hire_date", "1994-12-01 00:00:00.0");
- builder.field("salary", 50000.0);
- builder.field("supervisor_id", 1);
- builder.field("education_level", "Partial College");
- builder.field("marital_status", "M");
- builder.field("gender", "M");
- builder.field("management_role", "Senior Management");
- builder.endObject();
- indexRequest = new IndexRequest(indexName).source(builder);
- restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
- builder = XContentFactory.jsonBuilder();
- builder.startObject();
- builder.field("employee_id", 11);
- builder.field("full_name", "Jonathan Murraiin");
- builder.field("first_name", "Jonathan");
- builder.field("last_name", "Murraiin");
- builder.field("position_id", 11);
- builder.field("position_title", "Store Manager");
- builder.field("store_id", 1);
- builder.field("department_id", 11);
- builder.field("birth_date", "1967-06-20");
- builder.field("hire_date", "1998-01-01 00:00:00.0");
- builder.field("salary", 15000.0);
- builder.field("supervisor_id", 5);
- builder.field("education_level", "Graduate Degree");
- builder.field("marital_status", "S");
- builder.field("gender", "M");
- builder.field("management_role", "Store Management");
- builder.endObject();
- indexRequest = new IndexRequest(indexName).source(builder);
- restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
- restHighLevelClient.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
+ public static void prepareData() throws IOException {
+ indexNames.add("employee");
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
+ .index("employee")
+ .build();
+ elasticsearchClient.indices().create(createIndexRequest);
+
+ {
+ Map<String, Object> inputMap = new HashMap<>();
+ inputMap.put("employee_id", 1);
+ inputMap.put("full_name", "Sheri Nowmer");
+ inputMap.put("first_name", "Sheri");
+ inputMap.put("last_name", "Nowmer");
+ inputMap.put("position_id", 1);
+ inputMap.put("position_title", "President");
+ inputMap.put("store_id", 0);
+ inputMap.put("department_id", 1);
+ inputMap.put("birth_date", "1961-08-26");
+ inputMap.put("hire_date", "1994-12-01 00:00:00.0");
+ inputMap.put("salary", 80000.0);
+ inputMap.put("supervisor_id", 0);
+ inputMap.put("education_level", "Graduate Degree");
+ inputMap.put("marital_status", "S");
+ inputMap.put("gender", "F");
+ inputMap.put("management_role", "Senior Management");
+ inputMap.put("binary_field", Base64.getEncoder().encodeToString(
+ "Senior Management".getBytes(StandardCharsets.UTF_8)
+ ));
+ inputMap.put("boolean_field", true);
+ inputMap.put("date_field", "2015/01/01 12:10:30");
+ inputMap.put("byte_field", (byte) 123);
+ inputMap.put("long_field", 123L);
+ inputMap.put("float_field", 123F);
+ inputMap.put("short_field", (short) 123);
+ inputMap.put("decimal_field", new BigDecimal("123.45"));
+
+ final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+ IndexRequest<JsonData> request = IndexRequest.of(i -> i
+ .index("employee")
+ .withJson(input1)
+ );
+ elasticsearchClient.index(request);
+ }
+ {
+ Map<String, Object> inputMap = new HashMap<>();
+ inputMap.put("employee_id", 2);
+ inputMap.put("full_name", "Derrick Whelply");
+ inputMap.put("first_name", "Derrick");
+ inputMap.put("last_name", "Whelply");
+ inputMap.put("position_id", 2);
+ inputMap.put("position_title", "VP Country Manager");
+ inputMap.put("store_id", 0);
+ inputMap.put("department_id", 1);
+ inputMap.put("birth_date", "1915-07-03");
+ inputMap.put("hire_date", "1994-12-01 00:00:00.0");
+ inputMap.put("salary", 40000.0);
+ inputMap.put("supervisor_id", 1);
+ inputMap.put("education_level", "Graduate Degree");
+ inputMap.put("marital_status", "M");
+ inputMap.put("gender", "M");
+ inputMap.put("management_role", "Senior Management");
+
+ final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+ IndexRequest<JsonData> request = IndexRequest.of(i -> i
+ .index("employee")
+ .withJson(input1)
+ );
+ elasticsearchClient.index(request);
+ }
+ {
+ Map<String, Object> inputMap = new HashMap<>();
+ inputMap.put("employee_id", 4);
+ inputMap.put("full_name", "Michael Spence");
+ inputMap.put("first_name", "Michael");
+ inputMap.put("last_name", "Spence");
+ inputMap.put("position_id", 2);
+ inputMap.put("position_title", "VP Country Manager");
+ inputMap.put("store_id", 0);
+ inputMap.put("department_id", 1);
+ inputMap.put("birth_date", "1969-06-20");
+ inputMap.put("hire_date", "1998-01-01 00:00:00.0");
+ inputMap.put("salary", 40000.0);
+ inputMap.put("supervisor_id", 1);
+ inputMap.put("education_level", "Graduate Degree");
+ inputMap.put("marital_status", "S");
+ inputMap.put("gender", "M");
+ inputMap.put("management_role", "Senior Management");
+ final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+
+ IndexRequest<JsonData> request = IndexRequest.of(i -> i
+ .index("employee")
+ .withJson(input1)
+ );
+ elasticsearchClient.index(request);
+ }
+ {
+ Map<String, Object> inputMap = new HashMap<>();
+ inputMap.put("employee_id", 5);
+ inputMap.put("full_name", "Maya Gutierrez");
+ inputMap.put("first_name", "Maya");
+ inputMap.put("last_name", "Gutierrez");
+ inputMap.put("position_id", 2);
+ inputMap.put("position_title", "VP Country Manager");
+ inputMap.put("store_id", 0);
+ inputMap.put("department_id", 1);
+ inputMap.put("birth_date", "1951-05-10");
+ inputMap.put("hire_date", "1998-01-01 00:00:00.0");
+ inputMap.put("salary", 35000.0);
+ inputMap.put("supervisor_id", 1);
+ inputMap.put("education_level", "Bachelors Degree");
+ inputMap.put("marital_status", "M");
+ inputMap.put("gender", "F");
+ inputMap.put("management_role", "Senior Management");
+ final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+
+ IndexRequest<JsonData> request = IndexRequest.of(i -> i
+ .index("employee")
+ .withJson(input1)
+ );
+ elasticsearchClient.index(request);
+ }
+ {
+ Map<String, Object> inputMap = new HashMap<>();
+ inputMap.put("employee_id", 6);
+ inputMap.put("full_name", "Roberta Damstra");
+ inputMap.put("first_name", "Roberta");
+ inputMap.put("last_name", "Damstra");
+ inputMap.put("position_id", 3);
+ inputMap.put("position_title", "VP Information Systems");
+ inputMap.put("store_id", 0);
+ inputMap.put("department_id", 2);
+ inputMap.put("birth_date", "1942-10-08");
+ inputMap.put("hire_date", "1994-12-01 00:00:00.0");
+ inputMap.put("salary", 25000.0);
+ inputMap.put("supervisor_id", 1);
+ inputMap.put("education_level", "Bachelors Degree");
+ inputMap.put("marital_status", "M");
+ inputMap.put("gender", "F");
+ inputMap.put("management_role", "Senior Management");
+ final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+
+ IndexRequest<JsonData> request = IndexRequest.of(i -> i
+ .index("employee")
+ .withJson(input1)
+ );
+ elasticsearchClient.index(request);
+ }
+ {
+ Map<String, Object> inputMap = new HashMap<>();
+ inputMap.put("employee_id", 7);
+ inputMap.put("full_name", "Rebecca Kanagaki");
+ inputMap.put("first_name", "Rebecca");
+ inputMap.put("last_name", "Kanagaki");
+ inputMap.put("position_id", 4);
+ inputMap.put("position_title", "VP Human Resources");
+ inputMap.put("store_id", 0);
+ inputMap.put("department_id", 3);
+ inputMap.put("birth_date", "1949-03-27");
+ inputMap.put("hire_date", "1994-12-01 00:00:00.0");
+ inputMap.put("salary", 15000.0);
+ inputMap.put("supervisor_id", 1);
+ inputMap.put("education_level", "Bachelors Degree");
+ inputMap.put("marital_status", "M");
+ inputMap.put("gender", "F");
+ inputMap.put("management_role", "Senior Management");
+ final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+
+ IndexRequest<JsonData> request = IndexRequest.of(i -> i
+ .index("employee")
+ .withJson(input1)
+ );
+ elasticsearchClient.index(request);
+ }
+ {
+ Map<String, Object> inputMap = new HashMap<>();
+ inputMap.put("employee_id", 8);
+ inputMap.put("full_name", "Kim Brunner");
+ inputMap.put("first_name", "Kim");
+ inputMap.put("last_name", "Brunner");
+ inputMap.put("position_id", 11);
+ inputMap.put("position_title", "Store Manager");
+ inputMap.put("store_id", 9);
+ inputMap.put("department_id", 11);
+ inputMap.put("birth_date", "1922-08-10");
+ inputMap.put("hire_date", "1998-01-01 00:00:00.0");
+ inputMap.put("salary", 10000.0);
+ inputMap.put("supervisor_id", 5);
+ inputMap.put("education_level", "Bachelors Degree");
+ inputMap.put("marital_status", "S");
+ inputMap.put("gender", "F");
+ inputMap.put("management_role", "Store Management");
+ final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+
+ IndexRequest<JsonData> request = IndexRequest.of(i -> i
+ .index("employee")
+ .withJson(input1)
+ );
+ elasticsearchClient.index(request);
+ }
+ {
+ Map<String, Object> inputMap = new HashMap<>();
+ inputMap.put("employee_id", 9);
+ inputMap.put("full_name", "Brenda Blumberg");
+ inputMap.put("first_name", "Brenda");
+ inputMap.put("last_name", "Blumberg");
+ inputMap.put("position_id", 11);
+ inputMap.put("position_title", "Store Manager");
+ inputMap.put("store_id", 21);
+ inputMap.put("department_id", 11);
+ inputMap.put("birth_date", "1979-06-23");
+ inputMap.put("hire_date", "1998-01-01 00:00:00.0");
+ inputMap.put("salary", 17000.0);
+ inputMap.put("supervisor_id", 5);
+ inputMap.put("education_level", "Graduate Degree");
+ inputMap.put("marital_status", "M");
+ inputMap.put("gender", "F");
+ inputMap.put("management_role", "Store Management");
+ final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+
+ IndexRequest<JsonData> request = IndexRequest.of(i -> i
+ .index("employee")
+ .withJson(input1)
+ );
+ elasticsearchClient.index(request);
+ }
+ {
+ Map<String, Object> inputMap = new HashMap<>();
+ inputMap.put("employee_id", 10);
+ inputMap.put("full_name", "Darren Stanz");
+ inputMap.put("first_name", "Darren");
+ inputMap.put("last_name", "Stanz");
+ inputMap.put("position_id", 5);
+ inputMap.put("position_title", "VP Finance");
+ inputMap.put("store_id", 0);
+ inputMap.put("department_id", 5);
+ inputMap.put("birth_date", "1949-08-26");
+ inputMap.put("hire_date", "1994-12-01 00:00:00.0");
+ inputMap.put("salary", 50000.0);
+ inputMap.put("supervisor_id", 1);
+ inputMap.put("education_level", "Partial College");
+ inputMap.put("marital_status", "M");
+ inputMap.put("gender", "M");
+ inputMap.put("management_role", "Senior Management");
+ final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+
+ IndexRequest<JsonData> request = IndexRequest.of(i -> i
+ .index("employee")
+ .withJson(input1)
+ );
+ elasticsearchClient.index(request);
+ }
+ {
+ Map<String, Object> inputMap = new HashMap<>();
+ inputMap.put("employee_id", 11);
+ inputMap.put("full_name", "Jonathan Murraiin");
+ inputMap.put("first_name", "Jonathan");
+ inputMap.put("last_name", "Murraiin");
+ inputMap.put("position_id", 11);
+ inputMap.put("position_title", "Store Manager");
+ inputMap.put("store_id", 1);
+ inputMap.put("department_id", 11);
+ inputMap.put("birth_date", "1967-06-20");
+ inputMap.put("hire_date", "1998-01-01 00:00:00.0");
+ inputMap.put("salary", 15000.0);
+ inputMap.put("supervisor_id", 5);
+ inputMap.put("education_level", "Graduate Degree");
+ inputMap.put("marital_status", "S");
+ inputMap.put("gender", "M");
+ inputMap.put("management_role", "Store Management");
+ final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+
+ IndexRequest<JsonData> request = IndexRequest.of(i -> i
+ .index("employee")
+ .withJson(input1)
+ );
+ elasticsearchClient.index(request);
+ }
+
+ RefreshRequest refreshRequest = new RefreshRequest.Builder()
+ .index(indexNames)
+ .build();
+ elasticsearchClient.indices().refresh(refreshRequest);
}
@Test
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchUserTranslationTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchUserTranslationTest.java
new file mode 100644
index 0000000000..3574060be6
--- /dev/null
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchUserTranslationTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.elasticsearch;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.core.IndexResponse;
+import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.RefreshRequest;
+import co.elastic.clients.json.JsonData;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.hamcrest.CoreMatchers;
+import org.json.simple.JSONObject;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.ADMIN_USER;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.ADMIN_USER_PASSWORD;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1_PASSWORD;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_2;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+public class ElasticSearchUserTranslationTest extends ClusterTest {
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUserTranslationTest.class);
+ private static final List<String> indexNames = new LinkedList<>();
+ private static ElasticsearchClient elasticsearchClient;
+
+ @BeforeClass
+ public static void init() throws Exception {
+ TestElasticsearchSuite.initElasticsearch();
+ ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher)
+ .configProperty(ExecConstants.HTTP_ENABLE, true)
+ .configProperty(ExecConstants.HTTP_PORT_HUNT, true)
+ .configProperty(ExecConstants.IMPERSONATION_ENABLED, true);
+
+ startCluster(builder);
+
+
+ PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(new HashMap<>());
+ // Add authorized user
+ credentialsProvider.setUserCredentials(TestElasticsearchSuite.ELASTICSEARCH_USERNAME,
+ TestElasticsearchSuite.ELASTICSEARCH_PASSWORD, TEST_USER_1);
+ // Add unauthorized user
+ credentialsProvider.setUserCredentials("nope", "no way dude", TEST_USER_2);
+
+ ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
+ Collections.singletonList(TestElasticsearchSuite.getAddress()),
+ TestElasticsearchSuite.ELASTICSEARCH_USERNAME,
+ TestElasticsearchSuite.ELASTICSEARCH_PASSWORD,
+ null,
+ AuthMode.SHARED_USER.name(),
+ null
+ );
+
+ config.setEnabled(true);
+ cluster.defineStoragePlugin("elastic", config);
+
+ ElasticsearchStorageConfig ut_config = new ElasticsearchStorageConfig(
+ Collections.singletonList(TestElasticsearchSuite.getAddress()),
+ null,
+ null,
+ null,
+ AuthMode.USER_TRANSLATION.name(),
+ credentialsProvider);
+
+ ut_config.setEnabled(true);
+ cluster.defineStoragePlugin("ut_elastic", ut_config);
+
+ elasticsearchClient = TestElasticsearchSuite.getESClient();
+ prepareData();
+ }
+
+ @AfterClass
+ public static void cleanUp() throws IOException {
+ DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder()
+ .index(indexNames)
+ .build();
+ elasticsearchClient.indices().delete(deleteIndexRequest);
+ TestElasticsearchSuite.tearDownCluster();
+ }
+
+ private static void prepareData() throws IOException {
+ String indexName = "t1";
+ indexNames.add(indexName);
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
+ .index(indexName)
+ .build();
+
+ elasticsearchClient.indices().create(createIndexRequest);
+
+ Map<String, Object> map = new HashMap<>();
+ map.put("string_field", "a");
+ map.put("int_field", 123);
+ JSONObject jsonObject = new JSONObject(map);
+
+ Reader input = new StringReader(jsonObject.toJSONString());
+ IndexRequest<JsonData> request = IndexRequest.of(i -> i
+ .index("t1")
+ .withJson(input)
+ );
+
+ IndexResponse response = elasticsearchClient.index(request);
+ logger.debug("Insert response: {}", response.toString() );
+
+ RefreshRequest refreshRequest = new RefreshRequest.Builder()
+ .index(indexNames)
+ .build();
+ elasticsearchClient.indices().refresh(refreshRequest);
+
+ indexName = "t2";
+ indexNames.add(indexName);
+ createIndexRequest = new CreateIndexRequest.Builder()
+ .index(indexName)
+ .build();
+
+ elasticsearchClient.indices().create(createIndexRequest);
+
+ map = new HashMap<>();
+ map.put("another_int_field", 321);
+ map.put("another_string_field", "b");
+ jsonObject = new JSONObject(map);
+
+ Reader input2 = new StringReader(jsonObject.toJSONString());
+ request = IndexRequest.of(i -> i
+ .index("t2")
+ .withJson(input2)
+ );
+
+ response = elasticsearchClient.index(request);
+ logger.debug("Insert response: {}", response.toString() );
+
+ refreshRequest = new RefreshRequest.Builder()
+ .index(indexNames)
+ .build();
+ elasticsearchClient.indices().refresh(refreshRequest);
+ logger.debug("Data preparation complete.");
+ }
+
+ @Test
+ public void testInfoSchemaQueryWithMissingCredentials() throws Exception {
+ // This test validates that the correct credentials are sent down to ElasticSearch.
+ // This user should not see the ut_elastic because they do not have valid credentials.
+ ClientFixture client = cluster
+ .clientBuilder()
+ .property(DrillProperties.USER, ADMIN_USER)
+ .property(DrillProperties.PASSWORD, ADMIN_USER_PASSWORD)
+ .build();
+
+ String sql = "SHOW DATABASES WHERE schema_name LIKE '%elastic%'";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ results.print();
+ assertEquals(1, results.rowCount());
+ results.clear();
+ }
+
+ @Test
+ public void testInfoSchemaQueryWithValidCredentials() throws Exception {
+ // This test validates that the ElasticSearch connection with user translation appears when the user is
+ // authenticated.
+ ClientFixture client = cluster
+ .clientBuilder()
+ .property(DrillProperties.USER, TEST_USER_1)
+ .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+ .build();
+
+ String sql = "SHOW DATABASES WHERE schema_name LIKE '%elastic%'";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ assertEquals(2, results.rowCount());
+ results.clear();
+ }
+
+ @Test
+ public void testQueryWithUserTranslation() throws Exception {
+ ClientFixture client = cluster
+ .clientBuilder()
+ .property(DrillProperties.USER, TEST_USER_1)
+ .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+ .build();
+
+ String sql = "select * from ut_elastic.t1";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ assertEquals(1, results.rowCount());
+ results.clear();
+ }
+
+ @Test
+ public void testQueryWithUserTranslationAndInvalidCredentials() throws Exception {
+ ClientFixture client = cluster
+ .clientBuilder()
+ .property(DrillProperties.USER, ADMIN_USER)
+ .property(DrillProperties.PASSWORD, ADMIN_USER_PASSWORD)
+ .build();
+
+ String sql = "select * from ut_elastic.t1";
+ try {
+ client.queryBuilder().sql(sql).rowSet();
+ fail();
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), CoreMatchers.containsString("Object 'ut_elastic' not found"));
+ }
+ }
+}
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/TestElasticsearchSuite.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/TestElasticsearchSuite.java
index dec9b7659e..12fbe72e6f 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/TestElasticsearchSuite.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/TestElasticsearchSuite.java
@@ -17,32 +17,65 @@
*/
package org.apache.drill.exec.store.elasticsearch;
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.ElasticsearchTransport;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
+import org.apache.commons.io.IOUtils;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.test.BaseTest;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
-import org.testcontainers.utility.DockerImageName;
+import com.google.api.client.util.SslUtils;
+
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
+
@Category(SlowTest.class)
@RunWith(Suite.class)
-@Suite.SuiteClasses({ElasticComplexTypesTest.class, ElasticInfoSchemaTest.class, ElasticSearchPlanTest.class, ElasticSearchQueryTest.class})
+@Suite.SuiteClasses({
+ ElasticComplexTypesTest.class,
+ ElasticInfoSchemaTest.class,
+ ElasticSearchPlanTest.class,
+ ElasticSearchQueryTest.class,
+ ElasticSearchUserTranslationTest.class})
public class TestElasticsearchSuite extends BaseTest {
protected static ElasticsearchContainer elasticsearch;
+ public static final String ELASTICSEARCH_USERNAME = "elastic";
+ public static final String ELASTICSEARCH_PASSWORD = "s3cret";
+ private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:8.6.0";
private static final AtomicInteger initCount = new AtomicInteger(0);
private static volatile boolean runningSuite = false;
@BeforeClass
- public static void initElasticsearch() {
+ public static void initElasticsearch() throws IOException, GeneralSecurityException {
synchronized (TestElasticsearchSuite.class) {
if (initCount.get() == 0) {
startElasticsearch();
@@ -52,25 +85,89 @@ public class TestElasticsearchSuite extends BaseTest {
}
}
- public static boolean isRunningSuite() {
- return runningSuite;
- }
-
@AfterClass
public static void tearDownCluster() {
synchronized (TestElasticsearchSuite.class) {
if (initCount.decrementAndGet() == 0 && elasticsearch != null) {
elasticsearch.stop();
+ elasticsearch.close();
}
}
}
- private static void startElasticsearch() {
- DockerImageName imageName = DockerImageName.parse("elasticsearch:7.14.2")
- .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
- TestElasticsearchSuite.elasticsearch = new ElasticsearchContainer(imageName)
- .withStartupTimeout(Duration.ofMinutes(2));
- TestElasticsearchSuite.elasticsearch.start();
+ public static boolean isRunningSuite() {
+ return runningSuite;
+ }
+
+ /**
+ * Returns an {@link CredentialsProvider} for use with the ElasticSearch test container.
+ * @return A {@link CredentialsProvider} with the default credentials.
+ */
+ public static CredentialsProvider getCredentialsProvider() {
+ CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY,
+ new UsernamePasswordCredentials(ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD));
+ return credentialsProvider;
+ }
+
+ public static byte[] getCertAsBytes(ElasticsearchContainer container) {
+ return container.copyFileFromContainer(
+ "/usr/share/elasticsearch/config/certs/http_ca.crt",
+ IOUtils::toByteArray);
+ }
+
+ public static SSLContext createContextFromCaCert(byte[] certAsBytes) {
+ try {
+ CertificateFactory factory = CertificateFactory.getInstance("X.509");
+ Certificate trustedCa = factory.generateCertificate(
+ new ByteArrayInputStream(certAsBytes)
+ );
+ KeyStore trustStore = KeyStore.getInstance("pkcs12");
+ trustStore.load(null, null);
+ trustStore.setCertificateEntry("ca", trustedCa);
+ SSLContextBuilder sslContextBuilder =
+ SSLContexts.custom().loadTrustMaterial(trustStore, null);
+ return sslContextBuilder.build();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static ElasticsearchClient getESClient() {
+ HttpHost host = new HttpHost(
+ elasticsearch.getHost(),
+ elasticsearch.getMappedPort(9200),
+ "http"
+ );
+
+ final RestClientBuilder builder = RestClient.builder(host);
+
+ builder.setHttpClientConfigCallback(clientBuilder -> {
+ //clientBuilder.setSSLContext(TestElasticsearchSuite.createContextFromCaCert(getCertAsBytes(elasticsearch)));
+ clientBuilder.setDefaultCredentialsProvider(getCredentialsProvider());
+ return clientBuilder;
+ });
+
+ ElasticsearchTransport transport = new RestClientTransport(
+ builder.build(), new JacksonJsonpMapper());
+
+ return new ElasticsearchClient(transport);
+ }
+
+
+
+ private static void startElasticsearch() throws GeneralSecurityException {
+ elasticsearch = new ElasticsearchContainer(IMAGE_NAME)
+ .withExposedPorts(9200)
+ .withStartupTimeout(Duration.ofMinutes(2))
+ .withStartupAttempts(5)
+ .withPassword(ELASTICSEARCH_PASSWORD)
+ .withEnv("xpack.security.enabled", "true")
+ .withEnv("xpack.security.transport.ssl.enabled", "false")
+ .withEnv("ES_JAVA_OPTS", "-Xmx1g"); // ES gobbles up lots of RAM under defaults.
+
+ HttpsURLConnection.setDefaultSSLSocketFactory(SslUtils.trustAllSSLContext().getSocketFactory());
+ elasticsearch.start();
}
public static String getAddress() {