You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2023/02/07 12:22:07 UTC

[drill] branch master updated: DRILL-8387: Add Support for User Translation to ElasticSearch Plugin (#2739)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new a7b7735c51 DRILL-8387: Add Support for User Translation to ElasticSearch Plugin (#2739)
a7b7735c51 is described below

commit a7b7735c51cd92667f7ac023044ad7b3fbeb8002
Author: Charles S. Givre <cg...@apache.org>
AuthorDate: Tue Feb 7 07:21:59 2023 -0500

    DRILL-8387: Add Support for User Translation to ElasticSearch Plugin (#2739)
---
 contrib/storage-elasticsearch/.gitignore           |   1 +
 contrib/storage-elasticsearch/README.md            |  18 +-
 contrib/storage-elasticsearch/pom.xml              |  22 +-
 .../elasticsearch/ElasticsearchStorageConfig.java  |  86 +++-
 .../elasticsearch/ElasticsearchStoragePlugin.java  |  20 +
 .../schema/ElasticsearchDrillSchemaFactory.java    |  22 +-
 .../elasticsearch/ElasticComplexTypesTest.java     | 135 +++--
 .../store/elasticsearch/ElasticInfoSchemaTest.java | 118 +++--
 .../store/elasticsearch/ElasticSearchPlanTest.java |  87 ++--
 .../elasticsearch/ElasticSearchQueryTest.java      | 565 ++++++++++++---------
 .../ElasticSearchUserTranslationTest.java          | 245 +++++++++
 .../elasticsearch/TestElasticsearchSuite.java      | 123 ++++-
 12 files changed, 1006 insertions(+), 436 deletions(-)

diff --git a/contrib/storage-elasticsearch/.gitignore b/contrib/storage-elasticsearch/.gitignore
new file mode 100644
index 0000000000..05c3e7ac9a
--- /dev/null
+++ b/contrib/storage-elasticsearch/.gitignore
@@ -0,0 +1 @@
+/src/test/resources/logback-test.xml
diff --git a/contrib/storage-elasticsearch/README.md b/contrib/storage-elasticsearch/README.md
index 2ef77adb61..c55f7ee0b3 100644
--- a/contrib/storage-elasticsearch/README.md
+++ b/contrib/storage-elasticsearch/README.md
@@ -5,7 +5,7 @@ This storage plugin implementation is based on [Apache Calcite adapter for Elast
 
 For more details about supported versions please refer to [Supported versions](https://calcite.apache.org/docs/elasticsearch_adapter.html#supported-versions) page.
 
-### Supported optimizations and features
+## Supported Optimizations and Features
 
 This storage plugin supports the following optimizations:
 
@@ -19,7 +19,7 @@ This storage plugin supports the following optimizations:
 Besides these optimizations, ElasticSearch storage plugin supports the schema provisioning feature.
 For more details please refer to [Specifying the Schema as Table Function Parameter](https://drill.apache.org/docs/plugin-configuration-basics/#specifying-the-schema-as-table-function-parameter).
 
-### Plugin registration
+## Plugin Registration
 
 The plugin can be registered in Apache Drill using the drill web interface by navigating to the `storage` page.
 Following is the default registration configuration.
@@ -36,7 +36,19 @@ Following is the default registration configuration.
 }
 ```
 
-### Developer notes
+## User Translation
+The ElasticSearch plugin supports user translation, which allows each individual user to access ElasticSearch with their own credentials. 
+
+Simply add the following to the storage plugin configuration:
+```json
+  "authType": "USER_TRANSLATION",
+  "credentialsProvider": {
+    "credentialsProviderType": "PlainCredentialsProvider"
+  }
+```
+
+
+## Developer Notes
 
 Most of the common classes required for creating storage plugins based on Calcite adapters are placed in the 
 `java-exec` module, so they can be reused in future plugin implementations.
diff --git a/contrib/storage-elasticsearch/pom.xml b/contrib/storage-elasticsearch/pom.xml
index 7576216757..51a6cada62 100644
--- a/contrib/storage-elasticsearch/pom.xml
+++ b/contrib/storage-elasticsearch/pom.xml
@@ -67,10 +67,28 @@
     </dependency>
     <dependency>
       <groupId>org.elasticsearch.client</groupId>
-      <artifactId>elasticsearch-rest-high-level-client</artifactId>
-      <version>7.17.5</version>
+      <artifactId>elasticsearch-rest-client</artifactId>
+      <version>8.6.0</version>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>co.elastic.clients</groupId>
+      <artifactId>elasticsearch-java</artifactId>
+      <version>8.6.0</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+      <version>1.42.3</version>
       <scope>test</scope>
     </dependency>
+
     <dependency>
       <groupId>net.hydromatic</groupId>
       <artifactId>foodmart-data-json</artifactId>
diff --git a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
index cc1c165cd4..f38246b796 100644
--- a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
+++ b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
@@ -24,7 +24,9 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.exec.store.security.CredentialProviderUtils;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
@@ -63,36 +65,27 @@ public class ElasticsearchStorageConfig extends StoragePluginConfig {
       @JsonProperty(USERNAME) String username,
       @JsonProperty(PASSWORD) String password,
       @JsonProperty(PATH_PREFIX) String pathPrefix,
+      @JsonProperty("authMode") String authMode,
       @JsonProperty(CREDENTIALS_PROVIDER) CredentialsProvider credentialsProvider) {
-    super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider), credentialsProvider == null);
+    super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider),
+        credentialsProvider == null, AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER));
     this.hosts = hosts;
     this.pathPrefix = pathPrefix;
   }
 
-  public List<String> getHosts() {
-    return hosts;
+  private ElasticsearchStorageConfig(ElasticsearchStorageConfig that, CredentialsProvider credentialsProvider) {
+    super(getCredentialsProvider(credentialsProvider), credentialsProvider == null, that.authMode);
+    this.hosts = that.hosts;
+    this.pathPrefix = that.pathPrefix;
   }
 
-  public String getPathPrefix() {
-    return pathPrefix;
+  @Override
+  public ElasticsearchStorageConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return new ElasticsearchStorageConfig(this, credentialsProvider);
   }
 
-  public String getUsername() {
-    if (!directCredentials) {
-      return null;
-    }
-    return getUsernamePasswordCredentials()
-      .map(UsernamePasswordCredentials::getUsername)
-      .orElse(null);
-  }
-
-  public String getPassword() {
-    if (!directCredentials) {
-      return null;
-    }
-    return getUsernamePasswordCredentials()
-      .map(UsernamePasswordCredentials::getPassword)
-      .orElse(null);
+  private static CredentialsProvider getCredentialsProvider(CredentialsProvider credentialsProvider) {
+    return credentialsProvider != null ? credentialsProvider : PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
   }
 
   @JsonIgnore
@@ -102,6 +95,18 @@ public class ElasticsearchStorageConfig extends StoragePluginConfig {
       .build();
   }
 
+  /**
+   * Gets the credentials. This method is used when user translation is enabled.
+   * @return An {@link Optional} containing {@link UsernamePasswordCredentials} from the config.
+   */
+  @JsonIgnore
+  public Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials(String username) {
+    return new UsernamePasswordCredentials.Builder()
+        .setCredentialsProvider(credentialsProvider)
+        .setQueryUser(username)
+        .build();
+  }
+
   @JsonIgnore
   public Map<String, Object> toConfigMap()
       throws JsonProcessingException {
@@ -118,6 +123,29 @@ public class ElasticsearchStorageConfig extends StoragePluginConfig {
     return builder.build();
   }
 
+  /**
+   * This method is used when user translation is enabled.
+   * @param queryUser The user who submitted the query
+   * @return A map of the configuration details.
+   * @throws JsonProcessingException If JSON is unparsable, throw exception
+   */
+  @JsonIgnore
+  public Map<String, Object> toConfigMap(String queryUser)
+      throws JsonProcessingException {
+    Map<String, String> credentials = new HashMap<>(credentialsProvider.getUserCredentials(queryUser));
+    ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
+    builder.put(HOSTS, OBJECT_WRITER.writeValueAsString(hosts));
+    builder.put(PATH_PREFIX, pathPrefix != null ? pathPrefix : EMPTY_STRING);
+    builder.put(USERNAME, credentials.getOrDefault(USERNAME, EMPTY_STRING));
+    builder.put(PASSWORD, credentials.getOrDefault(PASSWORD, EMPTY_STRING));
+
+    credentials.remove(USERNAME);
+    credentials.remove(PASSWORD);
+    builder.putAll(credentials);
+    return builder.build();
+  }
+
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -127,12 +155,22 @@ public class ElasticsearchStorageConfig extends StoragePluginConfig {
       return false;
     }
     ElasticsearchStorageConfig that = (ElasticsearchStorageConfig) o;
-    return Objects.equals(hosts, that.hosts)
-        && Objects.equals(credentialsProvider, that.credentialsProvider);
+    return Objects.equals(hosts, that.hosts) &&
+        Objects.equals(pathPrefix, that.pathPrefix) &&
+        Objects.equals(credentialsProvider, that.credentialsProvider);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(hosts, credentialsProvider);
+    return Objects.hash(hosts, pathPrefix, credentialsProvider);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+        .field("hosts", hosts)
+        .field("pathPrefix", pathPrefix)
+        .field("credentialsProvider", credentialsProvider)
+        .toString();
   }
 }
diff --git a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStoragePlugin.java b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStoragePlugin.java
index 95cdbee18a..27629ac7a8 100644
--- a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStoragePlugin.java
+++ b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStoragePlugin.java
@@ -21,17 +21,23 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.planner.PlannerPhase;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.elasticsearch.schema.ElasticsearchDrillSchemaFactory;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
+import java.util.Optional;
 import java.util.Set;
 
 public class ElasticsearchStoragePlugin extends AbstractStoragePlugin {
+  private static final Logger logger = LoggerFactory.getLogger(ElasticsearchStoragePlugin.class);
   private final ElasticsearchStorageConfig config;
   private final ElasticsearchDrillSchemaFactory schemaFactory;
 
@@ -44,6 +50,20 @@ public class ElasticsearchStoragePlugin extends AbstractStoragePlugin {
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws JsonProcessingException {
+    // Check to see if user translation is enabled.  If so, and creds are
+    // not present, then do not register any schemata.  This prevents
+    // info schema errors.
+    if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
+      Optional<UsernamePasswordCredentials> userCreds = config.getUsernamePasswordCredentials(schemaConfig.getUserName());
+      if (! userCreds.isPresent()) {
+        logger.debug(
+            "No schemas will be registered in {} for query user {}.",
+            getName(), schemaConfig.getUserName()
+        );
+        return;
+      }
+    }
+
     schemaFactory.registerSchemas(schemaConfig, parent);
   }
 
diff --git a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/schema/ElasticsearchDrillSchemaFactory.java b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/schema/ElasticsearchDrillSchemaFactory.java
index 5430743149..15005ec638 100644
--- a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/schema/ElasticsearchDrillSchemaFactory.java
+++ b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/schema/ElasticsearchDrillSchemaFactory.java
@@ -19,13 +19,20 @@ package org.apache.drill.exec.store.elasticsearch.schema;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.calcite.adapter.elasticsearch.ElasticsearchSchemaFactory;
+import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.elasticsearch.ElasticsearchStoragePlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 public class ElasticsearchDrillSchemaFactory extends AbstractSchemaFactory {
 
+  private static final Logger logger = LoggerFactory.getLogger(ElasticsearchDrillSchemaFactory.class);
   private final ElasticsearchStoragePlugin plugin;
   private final ElasticsearchSchemaFactory delegate;
 
@@ -37,8 +44,19 @@ public class ElasticsearchDrillSchemaFactory extends AbstractSchemaFactory {
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws JsonProcessingException {
-    ElasticsearchDrillSchema schema = new ElasticsearchDrillSchema(getName(), plugin,
-        delegate.create(parent, getName(), plugin.getConfig().toConfigMap()));
+    ElasticsearchDrillSchema schema;
+    if (plugin.getConfig().getAuthMode() == AuthMode.SHARED_USER) {
+      Schema elasticsearchSchema = delegate.create(parent, getName(), plugin.getConfig().toConfigMap());
+      schema = new ElasticsearchDrillSchema(getName(), plugin, elasticsearchSchema);
+    } else if (plugin.getConfig().getAuthMode() == AuthMode.USER_TRANSLATION) {
+      // Get user's info
+      Schema elasticsearchUTSchema = delegate.create(parent, getName(), plugin.getConfig().toConfigMap(schemaConfig.getUserName()));
+      schema = new ElasticsearchDrillSchema(getName(), plugin, elasticsearchUTSchema);
+    } else {
+      throw UserException.internalError()
+          .message("User Impersonation not supported as an authentication mode for ElasticSearch.  The only authentication modes supported are SHARED_USER and USER_TRANSLATION")
+          .build(logger);
+    }
     parent.add(getName(), schema);
   }
 }
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java
index a7109b2725..49826ffef7 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java
@@ -17,38 +17,41 @@
  */
 package org.apache.drill.exec.store.elasticsearch;
 
-import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
-import org.apache.http.HttpHost;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xcontent.XContentFactory;
+import org.json.simple.JSONObject;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.RefreshRequest;
+import co.elastic.clients.json.JsonData;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.io.Reader;
+import java.io.StringReader;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.drill.test.TestBuilder.listOf;
 import static org.apache.drill.test.TestBuilder.mapOf;
 
 public class ElasticComplexTypesTest extends ClusterTest {
 
-  private static final List<String> indexNames = new ArrayList<>();
-
-  public static RestHighLevelClient restHighLevelClient;
+  private static final Logger logger = LoggerFactory.getLogger(ElasticComplexTypesTest.class);
+  private static final List<String> indexNames = new LinkedList<>();
+  private static ElasticsearchClient elasticsearchClient;
 
   @BeforeClass
   public static void init() throws Exception {
@@ -57,57 +60,81 @@ public class ElasticComplexTypesTest extends ClusterTest {
 
     ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
         Collections.singletonList(TestElasticsearchSuite.getAddress()),
-        null, null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+        TestElasticsearchSuite.ELASTICSEARCH_USERNAME,
+        TestElasticsearchSuite.ELASTICSEARCH_PASSWORD,
+        null, AuthMode.SHARED_USER.name(),
+        null
+    );
     config.setEnabled(true);
     cluster.defineStoragePlugin("elastic", config);
 
+    elasticsearchClient = TestElasticsearchSuite.getESClient();
     prepareData();
   }
 
   @AfterClass
   public static void cleanUp() throws IOException {
-    for (String indexName : indexNames) {
-      restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
-    }
+    DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder()
+      .index(indexNames)
+      .build();
+
+    elasticsearchClient.indices().delete(deleteIndexRequest);
     TestElasticsearchSuite.tearDownCluster();
   }
 
   private static void prepareData() throws IOException {
-    restHighLevelClient = new RestHighLevelClient(
-      RestClient.builder(HttpHost.create(TestElasticsearchSuite.elasticsearch.getHttpHostAddress())));
-
-    String indexName = "arr";
-    indexNames.add(indexName);
-    CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
-
-    restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
-
-    XContentBuilder builder = XContentFactory.jsonBuilder();
-    builder.startObject();
-    builder.field("string_arr", Arrays.asList("a", "b", "c", "d"));
-    builder.field("int_arr", Arrays.asList(1, 2, 3, 4, 0));
-    builder.field("nest_int_arr", Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4, 0)));
-    builder.endObject();
-    IndexRequest indexRequest = new IndexRequest(indexName).source(builder);
-    restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-    restHighLevelClient.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
-
-    indexName = "map";
-    indexNames.add(indexName);
-    createIndexRequest = new CreateIndexRequest(indexName);
-
-    restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
-
-    builder = XContentFactory.jsonBuilder();
-    builder.startObject();
-    builder.field("prim_field", 321);
-    builder.field("nest_field", ImmutableMap.of("a", 123, "b", "abc"));
-    builder.field("more_nest_field", ImmutableMap.of("a", 123, "b", ImmutableMap.of("c", "abc")));
-    builder.field("map_arr", Collections.singletonList(ImmutableMap.of("a", 123, "b", ImmutableMap.of("c", "abc"))));
-    builder.endObject();
-    indexRequest = new IndexRequest(indexName).source(builder);
-    restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-    restHighLevelClient.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
+    {
+      indexNames.add("arr");
+      CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
+          .index("arr")
+          .build();
+      elasticsearchClient.indices().create(createIndexRequest);
+
+      final Reader input1 = new StringReader(
+        JSONObject.toJSONString(ImmutableMap.of(
+          "string_arr", Arrays.asList("a", "b", "c", "d"),
+          "int_arr", Arrays.asList(1, 2, 3, 4, 0),
+          "nest_int_arr", Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4, 0))
+        ))
+      );
+      IndexRequest<JsonData> request = IndexRequest.of(i -> i
+          .index("arr")
+          .withJson(input1)
+      );
+      logger.debug("Insert response {}", elasticsearchClient.index(request));
+    }
+    {
+      indexNames.add("map");
+      CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
+          .index("map")
+          .build();
+      elasticsearchClient.indices().create(createIndexRequest);
+
+      Map<String, Object> map = ImmutableMap.of("a", 123, "b", "abc");
+      Map<String, Object> nested_map = ImmutableMap.of(
+        "a", 123,
+        "b", ImmutableMap.of("c", "abc")
+      );
+      final Reader input1 = new StringReader(
+        JSONObject.toJSONString(ImmutableMap.of(
+          "prim_field", 321,
+          "nest_field", map,
+          "more_nest_field", nested_map,
+          "map_arr", Collections.singletonList(nested_map)
+        ))
+      );
+      IndexRequest<JsonData> request = IndexRequest.of(i -> i
+          .index("map")
+          .withJson(input1)
+      );
+      logger.debug("Insert response {}", elasticsearchClient.index(request));
+
+      RefreshRequest refreshRequest = new RefreshRequest.Builder()
+          .index(indexNames)
+          .build();
+      elasticsearchClient.indices().refresh(refreshRequest);
+      logger.debug("Data preparation complete.");
+    }
   }
 
   @Test
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java
index dba28c0fb5..d28f3133d4 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java
@@ -17,33 +17,33 @@
  */
 package org.apache.drill.exec.store.elasticsearch;
 
-import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
-import org.apache.http.HttpHost;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xcontent.XContentFactory;
+import org.json.simple.JSONObject;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.RefreshRequest;
+import co.elastic.clients.json.JsonData;
+
 import java.io.IOException;
-import java.util.ArrayList;
+import java.io.Reader;
+import java.io.StringReader;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 
 public class ElasticInfoSchemaTest extends ClusterTest {
 
-  private static final List<String> indexNames = new ArrayList<>();
-
-  public static RestHighLevelClient restHighLevelClient;
+  private static final List<String> indexNames = new LinkedList<>();
+  private static ElasticsearchClient elasticsearchClient;
 
   @BeforeClass
   public static void init() throws Exception {
@@ -52,53 +52,71 @@ public class ElasticInfoSchemaTest extends ClusterTest {
 
     ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
         Collections.singletonList(TestElasticsearchSuite.getAddress()),
-        null, null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+        TestElasticsearchSuite.ELASTICSEARCH_USERNAME,
+        TestElasticsearchSuite.ELASTICSEARCH_PASSWORD,
+        null, AuthMode.SHARED_USER.name(),
+        null
+    );
+
     config.setEnabled(true);
     cluster.defineStoragePlugin("elastic", config);
 
+    elasticsearchClient = TestElasticsearchSuite.getESClient();
     prepareData();
   }
 
   @AfterClass
   public static void cleanUp() throws IOException {
-    for (String indexName : indexNames) {
-      restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
-    }
+    DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder()
+      .index(indexNames)
+      .build();
+
+    elasticsearchClient.indices().delete(deleteIndexRequest);
     TestElasticsearchSuite.tearDownCluster();
   }
 
   private static void prepareData() throws IOException {
-    restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(TestElasticsearchSuite.getAddress())));
-
-    String indexName = "t1";
-    indexNames.add(indexName);
-    CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
-
-    restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
-
-    XContentBuilder builder = XContentFactory.jsonBuilder();
-    builder.startObject();
-    builder.field("string_field", "a");
-    builder.field("int_field", 123);
-    builder.endObject();
-    IndexRequest indexRequest = new IndexRequest(indexName).source(builder);
-    restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-    restHighLevelClient.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
-
-    indexName = "t2";
-    indexNames.add(indexName);
-    createIndexRequest = new CreateIndexRequest(indexName);
-
-    restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
-
-    builder = XContentFactory.jsonBuilder();
-    builder.startObject();
-    builder.field("another_int_field", 321);
-    builder.field("another_string_field", "b");
-    builder.endObject();
-    indexRequest = new IndexRequest(indexName).source(builder);
-    restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-    restHighLevelClient.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
+    {
+      indexNames.add("t1");
+      CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
+          .index("t1")
+          .build();
+      elasticsearchClient.indices().create(createIndexRequest);
+
+      final Reader input1 = new StringReader(
+        JSONObject.toJSONString(
+          ImmutableMap.of("string_field", "a", "int_field", 123)
+        )
+      );
+      IndexRequest<JsonData> request = IndexRequest.of(i -> i
+          .index("t1")
+          .withJson(input1)
+      );
+      elasticsearchClient.index(request);
+    }
+    {
+      indexNames.add("t2");
+      CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
+          .index("t2")
+          .build();
+      elasticsearchClient.indices().create(createIndexRequest);
+
+      final Reader input2 = new StringReader(
+        JSONObject.toJSONString(
+          ImmutableMap.of("another_string_field", "b", "another_int_field", 321)
+        )
+      );
+      IndexRequest<JsonData> request = IndexRequest.of(i -> i
+          .index("t2")
+          .withJson(input2)
+      );
+      elasticsearchClient.index(request);
+
+      RefreshRequest refreshRequest = new RefreshRequest.Builder()
+          .index(indexNames)
+          .build();
+      elasticsearchClient.indices().refresh(refreshRequest);
+    }
   }
 
   @Test
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
index 99e520d0a9..01fa7546ec 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
@@ -17,31 +17,33 @@
  */
 package org.apache.drill.exec.store.elasticsearch;
 
-import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
-import org.apache.http.HttpHost;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xcontent.XContentFactory;
+import org.json.simple.JSONObject;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.RefreshRequest;
+import co.elastic.clients.json.JsonData;
+
 import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
 import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
 
 public class ElasticSearchPlanTest extends ClusterTest {
 
-  public static RestHighLevelClient restHighLevelClient;
-
-  private static String indexName;
+  private static ElasticsearchClient elasticsearchClient;
+  private static final List<String> indexNames = new LinkedList<>();
 
   @BeforeClass
   public static void init() throws Exception {
@@ -49,38 +51,55 @@ public class ElasticSearchPlanTest extends ClusterTest {
     startCluster(ClusterFixture.builder(dirTestWatcher));
 
     ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
-        Collections.singletonList(TestElasticsearchSuite.getAddress()),
-        null, null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+      Collections.singletonList(TestElasticsearchSuite.getAddress()),
+      TestElasticsearchSuite.ELASTICSEARCH_USERNAME,
+      TestElasticsearchSuite.ELASTICSEARCH_PASSWORD,
+      null,
+      AuthMode.SHARED_USER.name(),
+      null
+    );
     config.setEnabled(true);
     cluster.defineStoragePlugin("elastic", config);
 
+    elasticsearchClient = TestElasticsearchSuite.getESClient();
     prepareData();
   }
 
   @AfterClass
   public static void cleanUp() throws IOException {
-    restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
+    DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder()
+      .index(indexNames)
+      .build();
+
+    elasticsearchClient.indices().delete(deleteIndexRequest);
     TestElasticsearchSuite.tearDownCluster();
   }
 
   private static void prepareData() throws IOException {
-    restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(TestElasticsearchSuite.getAddress())));
-
-    indexName = "nation";
-    CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
-
-    restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
-
-    XContentBuilder builder = XContentFactory.jsonBuilder();
-    builder.startObject();
-    builder.field("n_nationkey", 0);
-    builder.field("n_name", "ALGERIA");
-    builder.field("n_regionkey", 1);
-    builder.endObject();
-    IndexRequest indexRequest = new IndexRequest(indexName).source(builder);
-    restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
-    restHighLevelClient.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
+    indexNames.add("nation");
+    CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
+        .index("nation")
+        .build();
+    elasticsearchClient.indices().create(createIndexRequest);
+
+    final Reader input1 = new StringReader(
+      JSONObject.toJSONString(ImmutableMap.of(
+        "n_nationkey", 0,
+        "n_name", "ALGERIA",
+        "n_regionkey", 1
+      ))
+    );
+    IndexRequest<JsonData> request = IndexRequest.of(i -> i
+        .index("nation")
+        .withJson(input1)
+    );
+    elasticsearchClient.index(request);
+
+    RefreshRequest refreshRequest = new RefreshRequest.Builder()
+        .index(indexNames)
+        .build();
+
+    elasticsearchClient.indices().refresh(refreshRequest);
   }
 
   @Test
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java
index 9a0f64ecf7..704332ad05 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java
@@ -18,29 +18,33 @@
 package org.apache.drill.exec.store.elasticsearch;
 
 import org.apache.drill.common.exceptions.UserRemoteException;
-import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
-import org.apache.http.HttpHost;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xcontent.XContentFactory;
+import org.json.simple.JSONObject;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.RefreshRequest;
+import co.elastic.clients.json.JsonData;
+
 import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
 import java.time.LocalDate;
 import java.util.Base64;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -48,9 +52,8 @@ import static org.junit.Assert.fail;
 
 public class ElasticSearchQueryTest extends ClusterTest {
 
-  public static RestHighLevelClient restHighLevelClient;
-
-  private static String indexName;
+  private static ElasticsearchClient elasticsearchClient;
+  private static final List<String> indexNames = new LinkedList<>();
 
   @BeforeClass
   public static void init() throws Exception {
@@ -58,257 +61,311 @@ public class ElasticSearchQueryTest extends ClusterTest {
     startCluster(ClusterFixture.builder(dirTestWatcher));
 
     ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
-        Collections.singletonList(TestElasticsearchSuite.getAddress()),
-        null, null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+      Collections.singletonList(TestElasticsearchSuite.getAddress()),
+      TestElasticsearchSuite.ELASTICSEARCH_USERNAME,
+      TestElasticsearchSuite.ELASTICSEARCH_PASSWORD,
+      null, AuthMode.SHARED_USER.name(),
+      null
+    );
     config.setEnabled(true);
     cluster.defineStoragePlugin("elastic", config);
 
+    elasticsearchClient = TestElasticsearchSuite.getESClient();
     prepareData();
   }
 
   @AfterClass
   public static void cleanUp() throws IOException {
-    restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
+    DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder()
+      .index(indexNames)
+      .build();
+
+    elasticsearchClient.indices().delete(deleteIndexRequest);
     TestElasticsearchSuite.tearDownCluster();
   }
 
-  private static void prepareData() throws IOException {
-    restHighLevelClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(TestElasticsearchSuite.getAddress())));
-
-    indexName = "employee";
-    CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
-
-    restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
-
-    XContentBuilder builder = XContentFactory.jsonBuilder();
-    builder.startObject();
-    builder.field("employee_id", 1);
-    builder.field("full_name", "Sheri Nowmer");
-    builder.field("first_name", "Sheri");
-    builder.field("last_name", "Nowmer");
-    builder.field("position_id", 1);
-    builder.field("position_title", "President");
-    builder.field("store_id", 0);
-    builder.field("department_id", 1);
-    builder.field("birth_date", "1961-08-26");
-    builder.field("hire_date", "1994-12-01 00:00:00.0");
-    builder.field("salary", 80000.0);
-    builder.field("supervisor_id", 0);
-    builder.field("education_level", "Graduate Degree");
-    builder.field("marital_status", "S");
-    builder.field("gender", "F");
-    builder.field("management_role", "Senior Management");
-    builder.field("binary_field", "Senior Management".getBytes(StandardCharsets.UTF_8));
-    builder.field("boolean_field", true);
-    builder.timeField("date_field", "2015/01/01 12:10:30");
-    builder.field("byte_field", (byte) 123);
-    builder.field("long_field", 123L);
-    builder.field("float_field", 123F);
-    builder.field("short_field", (short) 123);
-    builder.field("decimal_field", new BigDecimal("123.45"));
-    builder.endObject();
-    IndexRequest indexRequest = new IndexRequest(indexName).source(builder);
-    restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
-    builder = XContentFactory.jsonBuilder();
-    builder.startObject();
-    builder.field("employee_id", 2);
-    builder.field("full_name", "Derrick Whelply");
-    builder.field("first_name", "Derrick");
-    builder.field("last_name", "Whelply");
-    builder.field("position_id", 2);
-    builder.field("position_title", "VP Country Manager");
-    builder.field("store_id", 0);
-    builder.field("department_id", 1);
-    builder.field("birth_date", "1915-07-03");
-    builder.field("hire_date", "1994-12-01 00:00:00.0");
-    builder.field("salary", 40000.0);
-    builder.field("supervisor_id", 1);
-    builder.field("education_level", "Graduate Degree");
-    builder.field("marital_status", "M");
-    builder.field("gender", "M");
-    builder.field("management_role", "Senior Management");
-    builder.endObject();
-    indexRequest = new IndexRequest(indexName).source(builder);
-    restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
-    builder = XContentFactory.jsonBuilder();
-    builder.startObject();
-    builder.field("employee_id", 4);
-    builder.field("full_name", "Michael Spence");
-    builder.field("first_name", "Michael");
-    builder.field("last_name", "Spence");
-    builder.field("position_id", 2);
-    builder.field("position_title", "VP Country Manager");
-    builder.field("store_id", 0);
-    builder.field("department_id", 1);
-    builder.field("birth_date", "1969-06-20");
-    builder.field("hire_date", "1998-01-01 00:00:00.0");
-    builder.field("salary", 40000.0);
-    builder.field("supervisor_id", 1);
-    builder.field("education_level", "Graduate Degree");
-    builder.field("marital_status", "S");
-    builder.field("gender", "M");
-    builder.field("management_role", "Senior Management");
-    builder.endObject();
-    indexRequest = new IndexRequest(indexName).source(builder);
-    restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
-    builder = XContentFactory.jsonBuilder();
-    builder.startObject();
-    builder.field("employee_id", 5);
-    builder.field("full_name", "Maya Gutierrez");
-    builder.field("first_name", "Maya");
-    builder.field("last_name", "Gutierrez");
-    builder.field("position_id", 2);
-    builder.field("position_title", "VP Country Manager");
-    builder.field("store_id", 0);
-    builder.field("department_id", 1);
-    builder.field("birth_date", "1951-05-10");
-    builder.field("hire_date", "1998-01-01 00:00:00.0");
-    builder.field("salary", 35000.0);
-    builder.field("supervisor_id", 1);
-    builder.field("education_level", "Bachelors Degree");
-    builder.field("marital_status", "M");
-    builder.field("gender", "F");
-    builder.field("management_role", "Senior Management");
-    builder.endObject();
-    indexRequest = new IndexRequest(indexName).source(builder);
-    restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
-    builder = XContentFactory.jsonBuilder();
-    builder.startObject();
-    builder.field("employee_id", 6);
-    builder.field("full_name", "Roberta Damstra");
-    builder.field("first_name", "Roberta");
-    builder.field("last_name", "Damstra");
-    builder.field("position_id", 3);
-    builder.field("position_title", "VP Information Systems");
-    builder.field("store_id", 0);
-    builder.field("department_id", 2);
-    builder.field("birth_date", "1942-10-08");
-    builder.field("hire_date", "1994-12-01 00:00:00.0");
-    builder.field("salary", 25000.0);
-    builder.field("supervisor_id", 1);
-    builder.field("education_level", "Bachelors Degree");
-    builder.field("marital_status", "M");
-    builder.field("gender", "F");
-    builder.field("management_role", "Senior Management");
-    builder.endObject();
-    indexRequest = new IndexRequest(indexName).source(builder);
-    restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
-    builder = XContentFactory.jsonBuilder();
-    builder.startObject();
-    builder.field("employee_id", 7);
-    builder.field("full_name", "Rebecca Kanagaki");
-    builder.field("first_name", "Rebecca");
-    builder.field("last_name", "Kanagaki");
-    builder.field("position_id", 4);
-    builder.field("position_title", "VP Human Resources");
-    builder.field("store_id", 0);
-    builder.field("department_id", 3);
-    builder.field("birth_date", "1949-03-27");
-    builder.field("hire_date", "1994-12-01 00:00:00.0");
-    builder.field("salary", 15000.0);
-    builder.field("supervisor_id", 1);
-    builder.field("education_level", "Bachelors Degree");
-    builder.field("marital_status", "M");
-    builder.field("gender", "F");
-    builder.field("management_role", "Senior Management");
-    builder.endObject();
-    indexRequest = new IndexRequest(indexName).source(builder);
-    restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
-    builder = XContentFactory.jsonBuilder();
-    builder.startObject();
-    builder.field("employee_id", 8);
-    builder.field("full_name", "Kim Brunner");
-    builder.field("first_name", "Kim");
-    builder.field("last_name", "Brunner");
-    builder.field("position_id", 11);
-    builder.field("position_title", "Store Manager");
-    builder.field("store_id", 9);
-    builder.field("department_id", 11);
-    builder.field("birth_date", "1922-08-10");
-    builder.field("hire_date", "1998-01-01 00:00:00.0");
-    builder.field("salary", 10000.0);
-    builder.field("supervisor_id", 5);
-    builder.field("education_level", "Bachelors Degree");
-    builder.field("marital_status", "S");
-    builder.field("gender", "F");
-    builder.field("management_role", "Store Management");
-    builder.endObject();
-    indexRequest = new IndexRequest(indexName).source(builder);
-    restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
-    builder = XContentFactory.jsonBuilder();
-    builder.startObject();
-    builder.field("employee_id", 9);
-    builder.field("full_name", "Brenda Blumberg");
-    builder.field("first_name", "Brenda");
-    builder.field("last_name", "Blumberg");
-    builder.field("position_id", 11);
-    builder.field("position_title", "Store Manager");
-    builder.field("store_id", 21);
-    builder.field("department_id", 11);
-    builder.field("birth_date", "1979-06-23");
-    builder.field("hire_date", "1998-01-01 00:00:00.0");
-    builder.field("salary", 17000.0);
-    builder.field("supervisor_id", 5);
-    builder.field("education_level", "Graduate Degree");
-    builder.field("marital_status", "M");
-    builder.field("gender", "F");
-    builder.field("management_role", "Store Management");
-    builder.endObject();
-    indexRequest = new IndexRequest(indexName).source(builder);
-    restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
-    builder = XContentFactory.jsonBuilder();
-    builder.startObject();
-    builder.field("employee_id", 10);
-    builder.field("full_name", "Darren Stanz");
-    builder.field("first_name", "Darren");
-    builder.field("last_name", "Stanz");
-    builder.field("position_id", 5);
-    builder.field("position_title", "VP Finance");
-    builder.field("store_id", 0);
-    builder.field("department_id", 5);
-    builder.field("birth_date", "1949-08-26");
-    builder.field("hire_date", "1994-12-01 00:00:00.0");
-    builder.field("salary", 50000.0);
-    builder.field("supervisor_id", 1);
-    builder.field("education_level", "Partial College");
-    builder.field("marital_status", "M");
-    builder.field("gender", "M");
-    builder.field("management_role", "Senior Management");
-    builder.endObject();
-    indexRequest = new IndexRequest(indexName).source(builder);
-    restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
-    builder = XContentFactory.jsonBuilder();
-    builder.startObject();
-    builder.field("employee_id", 11);
-    builder.field("full_name", "Jonathan Murraiin");
-    builder.field("first_name", "Jonathan");
-    builder.field("last_name", "Murraiin");
-    builder.field("position_id", 11);
-    builder.field("position_title", "Store Manager");
-    builder.field("store_id", 1);
-    builder.field("department_id", 11);
-    builder.field("birth_date", "1967-06-20");
-    builder.field("hire_date", "1998-01-01 00:00:00.0");
-    builder.field("salary", 15000.0);
-    builder.field("supervisor_id", 5);
-    builder.field("education_level", "Graduate Degree");
-    builder.field("marital_status", "S");
-    builder.field("gender", "M");
-    builder.field("management_role", "Store Management");
-    builder.endObject();
-    indexRequest = new IndexRequest(indexName).source(builder);
-    restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
-    restHighLevelClient.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
+  public static void prepareData() throws IOException {
+    indexNames.add("employee");
+    CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
+        .index("employee")
+        .build();
+    elasticsearchClient.indices().create(createIndexRequest);
+
+    {
+      Map<String, Object> inputMap = new HashMap<>();
+      inputMap.put("employee_id", 1);
+      inputMap.put("full_name", "Sheri Nowmer");
+      inputMap.put("first_name", "Sheri");
+      inputMap.put("last_name", "Nowmer");
+      inputMap.put("position_id", 1);
+      inputMap.put("position_title", "President");
+      inputMap.put("store_id", 0);
+      inputMap.put("department_id", 1);
+      inputMap.put("birth_date", "1961-08-26");
+      inputMap.put("hire_date", "1994-12-01 00:00:00.0");
+      inputMap.put("salary", 80000.0);
+      inputMap.put("supervisor_id", 0);
+      inputMap.put("education_level", "Graduate Degree");
+      inputMap.put("marital_status", "S");
+      inputMap.put("gender", "F");
+      inputMap.put("management_role", "Senior Management");
+      inputMap.put("binary_field", Base64.getEncoder().encodeToString(
+        "Senior Management".getBytes(StandardCharsets.UTF_8)
+      ));
+      inputMap.put("boolean_field", true);
+      inputMap.put("date_field", "2015/01/01 12:10:30");
+      inputMap.put("byte_field", (byte) 123);
+      inputMap.put("long_field", 123L);
+      inputMap.put("float_field", 123F);
+      inputMap.put("short_field", (short) 123);
+      inputMap.put("decimal_field", new BigDecimal("123.45"));
+
+      final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+      IndexRequest<JsonData> request = IndexRequest.of(i -> i
+          .index("employee")
+          .withJson(input1)
+      );
+      elasticsearchClient.index(request);
+    }
+    {
+      Map<String, Object> inputMap = new HashMap<>();
+      inputMap.put("employee_id", 2);
+      inputMap.put("full_name", "Derrick Whelply");
+      inputMap.put("first_name", "Derrick");
+      inputMap.put("last_name", "Whelply");
+      inputMap.put("position_id", 2);
+      inputMap.put("position_title", "VP Country Manager");
+      inputMap.put("store_id", 0);
+      inputMap.put("department_id", 1);
+      inputMap.put("birth_date", "1915-07-03");
+      inputMap.put("hire_date", "1994-12-01 00:00:00.0");
+      inputMap.put("salary", 40000.0);
+      inputMap.put("supervisor_id", 1);
+      inputMap.put("education_level", "Graduate Degree");
+      inputMap.put("marital_status", "M");
+      inputMap.put("gender", "M");
+      inputMap.put("management_role", "Senior Management");
+
+      final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+      IndexRequest<JsonData> request = IndexRequest.of(i -> i
+          .index("employee")
+          .withJson(input1)
+      );
+      elasticsearchClient.index(request);
+    }
+    {
+      Map<String, Object> inputMap = new HashMap<>();
+      inputMap.put("employee_id", 4);
+      inputMap.put("full_name", "Michael Spence");
+      inputMap.put("first_name", "Michael");
+      inputMap.put("last_name", "Spence");
+      inputMap.put("position_id", 2);
+      inputMap.put("position_title", "VP Country Manager");
+      inputMap.put("store_id", 0);
+      inputMap.put("department_id", 1);
+      inputMap.put("birth_date", "1969-06-20");
+      inputMap.put("hire_date", "1998-01-01 00:00:00.0");
+      inputMap.put("salary", 40000.0);
+      inputMap.put("supervisor_id", 1);
+      inputMap.put("education_level", "Graduate Degree");
+      inputMap.put("marital_status", "S");
+      inputMap.put("gender", "M");
+      inputMap.put("management_role", "Senior Management");
+      final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+
+      IndexRequest<JsonData> request = IndexRequest.of(i -> i
+          .index("employee")
+          .withJson(input1)
+      );
+      elasticsearchClient.index(request);
+    }
+    {
+      Map<String, Object> inputMap = new HashMap<>();
+      inputMap.put("employee_id", 5);
+      inputMap.put("full_name", "Maya Gutierrez");
+      inputMap.put("first_name", "Maya");
+      inputMap.put("last_name", "Gutierrez");
+      inputMap.put("position_id", 2);
+      inputMap.put("position_title", "VP Country Manager");
+      inputMap.put("store_id", 0);
+      inputMap.put("department_id", 1);
+      inputMap.put("birth_date", "1951-05-10");
+      inputMap.put("hire_date", "1998-01-01 00:00:00.0");
+      inputMap.put("salary", 35000.0);
+      inputMap.put("supervisor_id", 1);
+      inputMap.put("education_level", "Bachelors Degree");
+      inputMap.put("marital_status", "M");
+      inputMap.put("gender", "F");
+      inputMap.put("management_role", "Senior Management");
+      final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+
+      IndexRequest<JsonData> request = IndexRequest.of(i -> i
+          .index("employee")
+          .withJson(input1)
+      );
+      elasticsearchClient.index(request);
+    }
+    {
+      Map<String, Object> inputMap = new HashMap<>();
+      inputMap.put("employee_id", 6);
+      inputMap.put("full_name", "Roberta Damstra");
+      inputMap.put("first_name", "Roberta");
+      inputMap.put("last_name", "Damstra");
+      inputMap.put("position_id", 3);
+      inputMap.put("position_title", "VP Information Systems");
+      inputMap.put("store_id", 0);
+      inputMap.put("department_id", 2);
+      inputMap.put("birth_date", "1942-10-08");
+      inputMap.put("hire_date", "1994-12-01 00:00:00.0");
+      inputMap.put("salary", 25000.0);
+      inputMap.put("supervisor_id", 1);
+      inputMap.put("education_level", "Bachelors Degree");
+      inputMap.put("marital_status", "M");
+      inputMap.put("gender", "F");
+      inputMap.put("management_role", "Senior Management");
+      final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+
+      IndexRequest<JsonData> request = IndexRequest.of(i -> i
+          .index("employee")
+          .withJson(input1)
+      );
+      elasticsearchClient.index(request);
+    }
+    {
+      Map<String, Object> inputMap = new HashMap<>();
+      inputMap.put("employee_id", 7);
+      inputMap.put("full_name", "Rebecca Kanagaki");
+      inputMap.put("first_name", "Rebecca");
+      inputMap.put("last_name", "Kanagaki");
+      inputMap.put("position_id", 4);
+      inputMap.put("position_title", "VP Human Resources");
+      inputMap.put("store_id", 0);
+      inputMap.put("department_id", 3);
+      inputMap.put("birth_date", "1949-03-27");
+      inputMap.put("hire_date", "1994-12-01 00:00:00.0");
+      inputMap.put("salary", 15000.0);
+      inputMap.put("supervisor_id", 1);
+      inputMap.put("education_level", "Bachelors Degree");
+      inputMap.put("marital_status", "M");
+      inputMap.put("gender", "F");
+      inputMap.put("management_role", "Senior Management");
+      final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+
+      IndexRequest<JsonData> request = IndexRequest.of(i -> i
+          .index("employee")
+          .withJson(input1)
+      );
+      elasticsearchClient.index(request);
+    }
+    {
+      Map<String, Object> inputMap = new HashMap<>();
+      inputMap.put("employee_id", 8);
+      inputMap.put("full_name", "Kim Brunner");
+      inputMap.put("first_name", "Kim");
+      inputMap.put("last_name", "Brunner");
+      inputMap.put("position_id", 11);
+      inputMap.put("position_title", "Store Manager");
+      inputMap.put("store_id", 9);
+      inputMap.put("department_id", 11);
+      inputMap.put("birth_date", "1922-08-10");
+      inputMap.put("hire_date", "1998-01-01 00:00:00.0");
+      inputMap.put("salary", 10000.0);
+      inputMap.put("supervisor_id", 5);
+      inputMap.put("education_level", "Bachelors Degree");
+      inputMap.put("marital_status", "S");
+      inputMap.put("gender", "F");
+      inputMap.put("management_role", "Store Management");
+      final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+
+      IndexRequest<JsonData> request = IndexRequest.of(i -> i
+          .index("employee")
+          .withJson(input1)
+      );
+      elasticsearchClient.index(request);
+    }
+    {
+      Map<String, Object> inputMap = new HashMap<>();
+      inputMap.put("employee_id", 9);
+      inputMap.put("full_name", "Brenda Blumberg");
+      inputMap.put("first_name", "Brenda");
+      inputMap.put("last_name", "Blumberg");
+      inputMap.put("position_id", 11);
+      inputMap.put("position_title", "Store Manager");
+      inputMap.put("store_id", 21);
+      inputMap.put("department_id", 11);
+      inputMap.put("birth_date", "1979-06-23");
+      inputMap.put("hire_date", "1998-01-01 00:00:00.0");
+      inputMap.put("salary", 17000.0);
+      inputMap.put("supervisor_id", 5);
+      inputMap.put("education_level", "Graduate Degree");
+      inputMap.put("marital_status", "M");
+      inputMap.put("gender", "F");
+      inputMap.put("management_role", "Store Management");
+      final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+
+      IndexRequest<JsonData> request = IndexRequest.of(i -> i
+          .index("employee")
+          .withJson(input1)
+      );
+      elasticsearchClient.index(request);
+    }
+    {
+      Map<String, Object> inputMap = new HashMap<>();
+      inputMap.put("employee_id", 10);
+      inputMap.put("full_name", "Darren Stanz");
+      inputMap.put("first_name", "Darren");
+      inputMap.put("last_name", "Stanz");
+      inputMap.put("position_id", 5);
+      inputMap.put("position_title", "VP Finance");
+      inputMap.put("store_id", 0);
+      inputMap.put("department_id", 5);
+      inputMap.put("birth_date", "1949-08-26");
+      inputMap.put("hire_date", "1994-12-01 00:00:00.0");
+      inputMap.put("salary", 50000.0);
+      inputMap.put("supervisor_id", 1);
+      inputMap.put("education_level", "Partial College");
+      inputMap.put("marital_status", "M");
+      inputMap.put("gender", "M");
+      inputMap.put("management_role", "Senior Management");
+      final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+
+      IndexRequest<JsonData> request = IndexRequest.of(i -> i
+          .index("employee")
+          .withJson(input1)
+      );
+      elasticsearchClient.index(request);
+    }
+    {
+      Map<String, Object> inputMap = new HashMap<>();
+      inputMap.put("employee_id", 11);
+      inputMap.put("full_name", "Jonathan Murraiin");
+      inputMap.put("first_name", "Jonathan");
+      inputMap.put("last_name", "Murraiin");
+      inputMap.put("position_id", 11);
+      inputMap.put("position_title", "Store Manager");
+      inputMap.put("store_id", 1);
+      inputMap.put("department_id", 11);
+      inputMap.put("birth_date", "1967-06-20");
+      inputMap.put("hire_date", "1998-01-01 00:00:00.0");
+      inputMap.put("salary", 15000.0);
+      inputMap.put("supervisor_id", 5);
+      inputMap.put("education_level", "Graduate Degree");
+      inputMap.put("marital_status", "S");
+      inputMap.put("gender", "M");
+      inputMap.put("management_role", "Store Management");
+      final Reader input1 = new StringReader(JSONObject.toJSONString(inputMap));
+
+      IndexRequest<JsonData> request = IndexRequest.of(i -> i
+          .index("employee")
+          .withJson(input1)
+      );
+      elasticsearchClient.index(request);
+    }
+
+    RefreshRequest refreshRequest = new RefreshRequest.Builder()
+        .index(indexNames)
+        .build();
+    elasticsearchClient.indices().refresh(refreshRequest);
   }
 
   @Test
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchUserTranslationTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchUserTranslationTest.java
new file mode 100644
index 0000000000..3574060be6
--- /dev/null
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchUserTranslationTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.elasticsearch;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.core.IndexResponse;
+import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.RefreshRequest;
+import co.elastic.clients.json.JsonData;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.hamcrest.CoreMatchers;
+import org.json.simple.JSONObject;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.ADMIN_USER;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.ADMIN_USER_PASSWORD;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1_PASSWORD;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_2;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+public class ElasticSearchUserTranslationTest extends ClusterTest {
+  private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUserTranslationTest.class);
+  private static final List<String> indexNames = new LinkedList<>();
+  private static ElasticsearchClient elasticsearchClient;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    TestElasticsearchSuite.initElasticsearch();
+    ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher)
+        .configProperty(ExecConstants.HTTP_ENABLE, true)
+        .configProperty(ExecConstants.HTTP_PORT_HUNT, true)
+        .configProperty(ExecConstants.IMPERSONATION_ENABLED, true);
+
+    startCluster(builder);
+
+
+    PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(new HashMap<>());
+    // Add authorized user
+    credentialsProvider.setUserCredentials(TestElasticsearchSuite.ELASTICSEARCH_USERNAME,
+        TestElasticsearchSuite.ELASTICSEARCH_PASSWORD, TEST_USER_1);
+    // Add unauthorized user
+    credentialsProvider.setUserCredentials("nope", "no way dude", TEST_USER_2);
+
+    ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
+        Collections.singletonList(TestElasticsearchSuite.getAddress()),
+        TestElasticsearchSuite.ELASTICSEARCH_USERNAME,
+        TestElasticsearchSuite.ELASTICSEARCH_PASSWORD,
+        null,
+        AuthMode.SHARED_USER.name(),
+        null
+    );
+
+    config.setEnabled(true);
+    cluster.defineStoragePlugin("elastic", config);
+
+    ElasticsearchStorageConfig ut_config = new ElasticsearchStorageConfig(
+        Collections.singletonList(TestElasticsearchSuite.getAddress()),
+        null,
+        null,
+        null,
+        AuthMode.USER_TRANSLATION.name(),
+        credentialsProvider);
+
+    ut_config.setEnabled(true);
+    cluster.defineStoragePlugin("ut_elastic", ut_config);
+
+    elasticsearchClient = TestElasticsearchSuite.getESClient();
+    prepareData();
+  }
+
+  @AfterClass
+  public static void cleanUp() throws IOException {
+    DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder()
+        .index(indexNames)
+        .build();
+    elasticsearchClient.indices().delete(deleteIndexRequest);
+    TestElasticsearchSuite.tearDownCluster();
+  }
+
+  private static void prepareData() throws IOException {
+    String indexName = "t1";
+    indexNames.add(indexName);
+    CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
+        .index(indexName)
+        .build();
+
+    elasticsearchClient.indices().create(createIndexRequest);
+
+    Map<String, Object> map = new HashMap<>();
+    map.put("string_field", "a");
+    map.put("int_field", 123);
+    JSONObject jsonObject = new JSONObject(map);
+
+    Reader input = new StringReader(jsonObject.toJSONString());
+    IndexRequest<JsonData> request = IndexRequest.of(i -> i
+        .index("t1")
+        .withJson(input)
+    );
+
+    IndexResponse response = elasticsearchClient.index(request);
+    logger.debug("Insert response: {}", response.toString() );
+
+    RefreshRequest refreshRequest = new RefreshRequest.Builder()
+        .index(indexNames)
+        .build();
+    elasticsearchClient.indices().refresh(refreshRequest);
+
+    indexName = "t2";
+    indexNames.add(indexName);
+    createIndexRequest = new CreateIndexRequest.Builder()
+        .index(indexName)
+        .build();
+
+    elasticsearchClient.indices().create(createIndexRequest);
+
+    map = new HashMap<>();
+    map.put("another_int_field", 321);
+    map.put("another_string_field", "b");
+    jsonObject = new JSONObject(map);
+
+    Reader input2 = new StringReader(jsonObject.toJSONString());
+    request = IndexRequest.of(i -> i
+        .index("t2")
+        .withJson(input2)
+    );
+
+    response = elasticsearchClient.index(request);
+    logger.debug("Insert response: {}", response.toString() );
+
+    refreshRequest = new RefreshRequest.Builder()
+        .index(indexNames)
+        .build();
+    elasticsearchClient.indices().refresh(refreshRequest);
+    logger.debug("Data preparation complete.");
+  }
+
+  @Test
+  public void testInfoSchemaQueryWithMissingCredentials() throws Exception {
+    // This test validates that the correct credentials are sent down to ElasticSearch.
+    // This user should not see the ut_elastic because they do not have valid credentials.
+    ClientFixture client = cluster
+        .clientBuilder()
+        .property(DrillProperties.USER, ADMIN_USER)
+        .property(DrillProperties.PASSWORD, ADMIN_USER_PASSWORD)
+        .build();
+
+    String sql = "SHOW DATABASES WHERE schema_name LIKE '%elastic%'";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    results.print();
+    assertEquals(1, results.rowCount());
+    results.clear();
+  }
+
+  @Test
+  public void testInfoSchemaQueryWithValidCredentials() throws Exception {
+    // This test validates that the ElasticSearch connection with user translation appears when the user is
+    // authenticated.
+    ClientFixture client = cluster
+        .clientBuilder()
+        .property(DrillProperties.USER, TEST_USER_1)
+        .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+        .build();
+
+    String sql = "SHOW DATABASES WHERE schema_name LIKE '%elastic%'";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    assertEquals(2, results.rowCount());
+    results.clear();
+  }
+
+  @Test
+  public void testQueryWithUserTranslation() throws Exception {
+    ClientFixture client = cluster
+        .clientBuilder()
+        .property(DrillProperties.USER, TEST_USER_1)
+        .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+        .build();
+
+    String sql = "select * from ut_elastic.t1";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    assertEquals(1, results.rowCount());
+    results.clear();
+  }
+
+  @Test
+  public void testQueryWithUserTranslationAndInvalidCredentials() throws Exception {
+    ClientFixture client = cluster
+        .clientBuilder()
+        .property(DrillProperties.USER, ADMIN_USER)
+        .property(DrillProperties.PASSWORD, ADMIN_USER_PASSWORD)
+        .build();
+
+    String sql = "select * from ut_elastic.t1";
+    try {
+      client.queryBuilder().sql(sql).rowSet();
+      fail();
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), CoreMatchers.containsString("Object 'ut_elastic' not found"));
+    }
+  }
+}
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/TestElasticsearchSuite.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/TestElasticsearchSuite.java
index dec9b7659e..12fbe72e6f 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/TestElasticsearchSuite.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/TestElasticsearchSuite.java
@@ -17,32 +17,65 @@
  */
 package org.apache.drill.exec.store.elasticsearch;
 
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.ElasticsearchTransport;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
+import org.apache.commons.io.IOUtils;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.test.BaseTest;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
-import org.testcontainers.utility.DockerImageName;
 
+import com.google.api.client.util.SslUtils;
+
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
 import java.time.Duration;
 import java.util.concurrent.atomic.AtomicInteger;
 
+
 @Category(SlowTest.class)
 @RunWith(Suite.class)
-@Suite.SuiteClasses({ElasticComplexTypesTest.class, ElasticInfoSchemaTest.class, ElasticSearchPlanTest.class, ElasticSearchQueryTest.class})
+@Suite.SuiteClasses({
+    ElasticComplexTypesTest.class,
+    ElasticInfoSchemaTest.class,
+    ElasticSearchPlanTest.class,
+    ElasticSearchQueryTest.class,
+    ElasticSearchUserTranslationTest.class})
 public class TestElasticsearchSuite extends BaseTest {
 
   protected static ElasticsearchContainer elasticsearch;
+  public static final String ELASTICSEARCH_USERNAME = "elastic";
+  public static final String ELASTICSEARCH_PASSWORD = "s3cret";
+  private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:8.6.0";
 
   private static final AtomicInteger initCount = new AtomicInteger(0);
 
   private static volatile boolean runningSuite = false;
 
   @BeforeClass
-  public static void initElasticsearch() {
+  public static void initElasticsearch() throws IOException, GeneralSecurityException {
     synchronized (TestElasticsearchSuite.class) {
       if (initCount.get() == 0) {
         startElasticsearch();
@@ -52,25 +85,89 @@ public class TestElasticsearchSuite extends BaseTest {
     }
   }
 
-  public static boolean isRunningSuite() {
-    return runningSuite;
-  }
-
   @AfterClass
   public static void tearDownCluster() {
     synchronized (TestElasticsearchSuite.class) {
       if (initCount.decrementAndGet() == 0 && elasticsearch != null) {
         elasticsearch.stop();
+        elasticsearch.close();
       }
     }
   }
 
-  private static void startElasticsearch() {
-    DockerImageName imageName = DockerImageName.parse("elasticsearch:7.14.2")
-      .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
-    TestElasticsearchSuite.elasticsearch = new ElasticsearchContainer(imageName)
-      .withStartupTimeout(Duration.ofMinutes(2));
-    TestElasticsearchSuite.elasticsearch.start();
+  public static boolean isRunningSuite() {
+    return runningSuite;
+  }
+
+  /**
+   * Returns an {@link CredentialsProvider} for use with the ElasticSearch test container.
+   * @return A {@link CredentialsProvider} with the default credentials.
+   */
+  public static CredentialsProvider getCredentialsProvider() {
+    CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+    credentialsProvider.setCredentials(AuthScope.ANY,
+        new UsernamePasswordCredentials(ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD));
+    return credentialsProvider;
+  }
+
+  public static byte[] getCertAsBytes(ElasticsearchContainer container) {
+    return container.copyFileFromContainer(
+        "/usr/share/elasticsearch/config/certs/http_ca.crt",
+        IOUtils::toByteArray);
+  }
+
+  public static SSLContext createContextFromCaCert(byte[] certAsBytes) {
+    try {
+      CertificateFactory factory = CertificateFactory.getInstance("X.509");
+      Certificate trustedCa = factory.generateCertificate(
+          new ByteArrayInputStream(certAsBytes)
+      );
+      KeyStore trustStore = KeyStore.getInstance("pkcs12");
+      trustStore.load(null, null);
+      trustStore.setCertificateEntry("ca", trustedCa);
+      SSLContextBuilder sslContextBuilder =
+          SSLContexts.custom().loadTrustMaterial(trustStore, null);
+      return sslContextBuilder.build();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static ElasticsearchClient getESClient() {
+    HttpHost host = new HttpHost(
+      elasticsearch.getHost(),
+      elasticsearch.getMappedPort(9200),
+      "http"
+    );
+
+    final RestClientBuilder builder = RestClient.builder(host);
+
+    builder.setHttpClientConfigCallback(clientBuilder -> {
+      //clientBuilder.setSSLContext(TestElasticsearchSuite.createContextFromCaCert(getCertAsBytes(elasticsearch)));
+      clientBuilder.setDefaultCredentialsProvider(getCredentialsProvider());
+      return clientBuilder;
+    });
+
+    ElasticsearchTransport transport = new RestClientTransport(
+        builder.build(), new JacksonJsonpMapper());
+
+    return new ElasticsearchClient(transport);
+  }
+
+
+
+  private static void startElasticsearch() throws GeneralSecurityException {
+    elasticsearch = new ElasticsearchContainer(IMAGE_NAME)
+        .withExposedPorts(9200)
+        .withStartupTimeout(Duration.ofMinutes(2))
+        .withStartupAttempts(5)
+        .withPassword(ELASTICSEARCH_PASSWORD)
+        .withEnv("xpack.security.enabled", "true")
+        .withEnv("xpack.security.transport.ssl.enabled", "false")
+        .withEnv("ES_JAVA_OPTS", "-Xmx1g"); // ES gobbles up lots of RAM under defaults.
+
+    HttpsURLConnection.setDefaultSSLSocketFactory(SslUtils.trustAllSSLContext().getSocketFactory());
+    elasticsearch.start();
   }
 
   public static String getAddress() {