You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/04/27 00:15:43 UTC

[drill] branch master updated: DRILL-8155: Introduce New Plugin Authentication Modes (#2516)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c3732320b DRILL-8155: Introduce New Plugin Authentication Modes (#2516)
7c3732320b is described below

commit 7c3732320b1bfe6b3107a6e2a7e063b4719acebc
Author: Charles S. Givre <cg...@apache.org>
AuthorDate: Tue Apr 26 20:15:37 2022 -0400

    DRILL-8155: Introduce New Plugin Authentication Modes (#2516)
    
    * Do not set the read-only hint on JDBC connections.
    
    * Outline of different auth modes in storage-jdbc.
    
    In this commit, a new `authMode` storage config supporting three new auth modes
    is defined: shared user (default), user translation (user is translated to some
    other user from the external storage) and impersonation (the external storage
    and JDBC driver provide support for impersonating the Drill query user).
    
    The JdbcStoragePlugin is enhanced to be able to work with a lookup table of
    connection pools, where a pool is dedicated to each query user except when
    in shared user auth mode.
    
    Planning and execution time APIs are also enhanced to transmit a user
    credentials object for the query user, instead of just its username.
    This allows for the expansion of the UserCredentials protobuf type to
    include some optional extra credentials, e.g. in an array of byte arrays.
    These credentials may be relevant in the user tranlsation mode when a
    credential provider must be accessed in order to obtain the creds to be used
    for the external system.
    
    * Rebased to current master and build fixes
    
    * Build works, cred stuff added
    
    * Credentials being saved and pushed down to storage plugin
    
    * UI now closing properly
    
    * User Translation working for HTTP plugin
    
    * HTTP unit tests passing
    
    * WIP
    
    * Fixed import
    
    * User Credentials now being stored in credential provider
    
    * Working
    
    * Fixed TPCH Unit Tests
    
    * Fix CredProvider SerDe Test
    
    * Added unit tests for JDBC
    
    * Code cleanup
    
    * Fix LGTM alerts
    
    * Correct username now populating Group Scan
    
    * Username to Subscan
    
    * Remove PerUserUsernamePasswordCredentials class.
    
    * Remove getUserCredentials from CredentialsProvider.
    
    * Planning errors fixed
    
    * Removed unused imports
    
    * Fixed minor issues
    
    * Unit test fixes
    
    * WIP.
    
    * Fix CodeQL Alert
    
    * Ignore LGTM False Positive
    
    * Fix tainted string LGTM alert
    
    * Revert LGTM Comment
    
    * Addressed review comments
    
    * Use fixed size Guava caches in JDBC convetion and dialect factories.
    
    These replace Maps with no size limit that might have grown without
    bound. LRU eviction begins when the cache size limit is reached.
    
    * Add a TTL to the JDBC dialect and convention caches.
    
    Co-authored-by: James Turton <ja...@somecomputer.xyz>
---
 .../exec/store/mapr/db/MapRDBFormatMatcher.java    |   8 +-
 .../store/cassandra/CassandraStorageConfig.java    |  39 +--
 .../elasticsearch/ElasticsearchStorageConfig.java  |  32 ++-
 .../exec/store/http/HttpAPIConnectionSchema.java   |   7 +-
 .../drill/exec/store/http/HttpApiConfig.java       |  32 ++-
 .../drill/exec/store/http/HttpBatchReader.java     |  13 +-
 .../drill/exec/store/http/HttpGroupScan.java       |  11 +-
 .../apache/drill/exec/store/http/HttpScanSpec.java |   9 +
 .../drill/exec/store/http/HttpSchemaFactory.java   |  10 +-
 .../exec/store/http/HttpStoragePluginConfig.java   | 124 ++++++---
 .../apache/drill/exec/store/http/HttpSubScan.java  |   2 +-
 .../store/http/oauth/AccessTokenRepository.java    |   5 +-
 .../drill/exec/store/http/util/SimpleHttp.java     |  69 ++---
 .../drill/exec/store/http/TestHttpPlugin.java      |   5 +-
 .../exec/store/http/TestHttpUDFFunctions.java      |   3 +-
 .../drill/exec/store/http/TestOAuthProcess.java    |   3 +-
 .../exec/store/http/TestOAuthTokenUpdate.java      |   3 +-
 .../drill/exec/store/http/TestPagination.java      |   7 +-
 .../http/TestUserTranslationInHttpPlugin.java      | 278 +++++++++++++++++++++
 .../exec/store/jdbc/CapitalizingJdbcSchema.java    |   5 +-
 .../drill/exec/store/jdbc/DefaultJdbcDialect.java  |  31 ++-
 .../drill/exec/store/jdbc/DrillJdbcConvention.java |   6 +-
 .../exec/store/jdbc/JdbcConventionFactory.java     |  54 ++++
 .../drill/exec/store/jdbc/JdbcDialectFactory.java  |  33 ++-
 .../drill/exec/store/jdbc/JdbcGroupScan.java       |  64 ++++-
 .../exec/store/jdbc/JdbcIntermediatePrel.java      |   8 +-
 .../jdbc/JdbcIntermediatePrelConverterRule.java    |   6 +-
 .../org/apache/drill/exec/store/jdbc/JdbcPrel.java |   9 +-
 .../drill/exec/store/jdbc/JdbcRecordWriter.java    |   2 +-
 .../exec/store/jdbc/JdbcScanBatchCreator.java      |  22 +-
 .../drill/exec/store/jdbc/JdbcStorageConfig.java   | 102 ++++++--
 .../drill/exec/store/jdbc/JdbcStoragePlugin.java   | 131 +++++++---
 .../apache/drill/exec/store/jdbc/JdbcSubScan.java  |  26 +-
 .../exec/store/jdbc/JdbcWriterBatchCreator.java    |  19 +-
 .../jdbc/clickhouse/ClickhouseJdbcDialect.java     |  26 +-
 .../drill/exec/store/jdbc/rules/JdbcLimitRule.java |   2 +-
 .../drill/exec/store/jdbc/rules/JdbcSortRule.java  |   2 +-
 .../drill/exec/store/jdbc/TestDataSource.java      |  32 ++-
 .../store/jdbc/TestJdbcPluginWithClickhouse.java   |  14 +-
 .../exec/store/jdbc/TestJdbcPluginWithH2IT.java    |  14 +-
 .../exec/store/jdbc/TestJdbcPluginWithMySQLIT.java |  14 +-
 .../store/jdbc/TestJdbcPluginWithPostgres.java     |  13 +-
 .../exec/store/jdbc/TestJdbcUserTranslation.java   | 202 +++++++++++++++
 .../exec/store/jdbc/TestJdbcWriterWithH2.java      |  12 +-
 .../exec/store/jdbc/TestJdbcWriterWithMySQL.java   |  63 +++--
 .../store/jdbc/TestJdbcWriterWithPostgres.java     |  58 +++--
 .../drill/exec/store/mongo/MongoStoragePlugin.java |  15 +-
 .../exec/store/mongo/MongoStoragePluginConfig.java |  18 +-
 .../store/phoenix/PhoenixStoragePluginConfig.java  |  32 ++-
 .../drill/exec/store/splunk/SplunkConnection.java  |   7 +-
 .../exec/store/splunk/SplunkPluginConfig.java      |  49 +++-
 .../exec/store/splunk/SplunkConnectionTest.java    |   4 +-
 .../drill/exec/store/splunk/SplunkTestSuite.java   |   5 +-
 .../apache/drill/exec/ops/ContextInformation.java  |  13 +-
 .../apache/drill/exec/ops/FragmentContextImpl.java |  16 +-
 .../org/apache/drill/exec/ops/QueryContext.java    |  18 +-
 .../drill/exec/ops/ViewExpansionContext.java       |   9 +-
 .../exec/server/rest/CredentialResources.java      | 257 +++++++++++++++++++
 .../drill/exec/server/rest/DrillRestServer.java    |   1 +
 .../exec/server/rest/PluginConfigWrapper.java      |  60 ++++-
 .../drill/exec/server/rest/QueryResources.java     |   2 +-
 .../drill/exec/server/rest/StorageResources.java   |  34 +--
 .../server/rest/UsernamePasswordContainer.java     |  33 ++-
 .../exec/server/rest/ViewableWithPermissions.java  |   3 +
 .../org/apache/drill/exec/store/SchemaConfig.java  |   7 +
 .../drill/exec/store/SchemaTreeProvider.java       |   7 +
 .../drill/exec/store/StoragePluginRegistry.java    |   2 +-
 .../exec/store/StoragePluginRegistryImpl.java      |   9 +-
 .../drill/exec/store/dfs/FileSystemConfig.java     |   9 +-
 .../store/ischema/InfoSchemaRecordGenerator.java   |   3 +
 .../store/security/CredentialProviderUtils.java    |  12 +-
 .../security/UsernamePasswordCredentials.java      |  80 +++++-
 .../UsernamePasswordWithProxyCredentials.java      |  61 ++++-
 .../security/oauth/OAuthTokenCredentials.java      |  94 ++++---
 .../apache/drill/exec/work/foreman/Foreman.java    |   2 +-
 .../drill/exec/work/metadata/MetadataProvider.java |   5 +
 .../src/main/resources/rest/credentials/list.ftl   | 117 +++++++++
 exec/java-exec/src/main/resources/rest/generic.ftl |   3 +
 .../rest/static/js/credentialsServerMessage.js     |  39 +++
 .../drill/exec/store/TestClassicLocator.java       |   4 +-
 .../storage/CredentialsProviderSerDeTest.java      |  12 +-
 logical/pom.xml                                    |  18 ++
 ...g.java => CredentialedStoragePluginConfig.java} |  32 ++-
 .../drill/common/logical/StoragePluginConfig.java  |  49 +++-
 .../logical/security/CredentialsProvider.java      |  27 +-
 .../logical/security/PlainCredentialsProvider.java |  76 +++++-
 86 files changed, 2410 insertions(+), 464 deletions(-)

diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
index 73c5ffbc05..d4c08f36e5 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
@@ -101,8 +101,12 @@ public class MapRDBFormatMatcher extends TableFormatMatcher {
       String tableName = mapRDBFormatPlugin.getTableName(selection);
       TableProperties props = mapRDBFormatPlugin.getMaprFS().getTableProperties(new Path(tableName));
       if (props.getAttr().getJson()) {
-        return new DynamicDrillTable(fsPlugin, storageEngineName, schemaConfig.getUserName(),
-            new FormatSelection(mapRDBFormatPlugin.getConfig(), selection));
+        return new DynamicDrillTable(
+          fsPlugin,
+          storageEngineName,
+          schemaConfig.getQueryUserCredentials().getUserName(),
+          new FormatSelection(mapRDBFormatPlugin.getConfig(), selection)
+        );
       } else {
         FormatSelection formatSelection = new FormatSelection(mapRDBFormatPlugin.getConfig(), selection);
         return new MapRDBBinaryTable(storageEngineName, fsPlugin, mapRDBFormatPlugin, formatSelection);
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
index 26ed2a0b83..d6a51cd9cc 100644
--- a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
@@ -21,7 +21,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.exec.store.security.CredentialProviderUtils;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
@@ -29,9 +30,10 @@ import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 @JsonTypeName(CassandraStorageConfig.NAME)
-public class CassandraStorageConfig extends AbstractSecuredStoragePluginConfig {
+public class CassandraStorageConfig extends CredentialedStoragePluginConfig {
   public static final String NAME = "cassandra";
 
   private final String host;
@@ -59,33 +61,35 @@ public class CassandraStorageConfig extends AbstractSecuredStoragePluginConfig {
   }
 
   @JsonIgnore
-  public UsernamePasswordCredentials getUsernamePasswordCredentials() {
-    return new UsernamePasswordCredentials(credentialsProvider);
+  public Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials() {
+    return new UsernamePasswordCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .build();
   }
 
   public String getUsername() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getUsername();
-    }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getUsername)
+      .orElse(null);
   }
 
   public String getPassword() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getPassword();
-    }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getPassword)
+      .orElse(null);
   }
 
   @JsonIgnore
   public Map<String, Object> toConfigMap() {
-    UsernamePasswordCredentials credentials = getUsernamePasswordCredentials();
+    Optional<UsernamePasswordCredentials> credentials = getUsernamePasswordCredentials();
 
     Map<String, Object> result = new HashMap<>();
     result.put("host", host);
     result.put("port", port);
-    result.put("username", credentials.getUsername());
-    result.put("password", credentials.getPassword());
+    if (credentials.isPresent()) {
+      result.put("username", credentials.get().getUsername());
+      result.put("password", credentials.get().getPassword());
+    }
     return result;
   }
 
@@ -106,4 +110,9 @@ public class CassandraStorageConfig extends AbstractSecuredStoragePluginConfig {
   public int hashCode() {
     return Objects.hash(host, credentialsProvider);
   }
+
+  @Override
+  public CassandraStorageConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return this;
+  }
 }
diff --git a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
index 9e84635076..c03f5255da 100644
--- a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
+++ b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.exec.store.security.CredentialProviderUtils;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
@@ -33,9 +33,10 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 @JsonTypeName(ElasticsearchStorageConfig.NAME)
-public class ElasticsearchStorageConfig extends AbstractSecuredStoragePluginConfig {
+public class ElasticsearchStorageConfig extends CredentialedStoragePluginConfig {
   public static final String NAME = "elastic";
 
   private static final ObjectWriter OBJECT_WRITER = new ObjectMapper().writerFor(List.class);
@@ -57,22 +58,28 @@ public class ElasticsearchStorageConfig extends AbstractSecuredStoragePluginConf
   }
 
   public String getUsername() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getUsername();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getUsername)
+      .orElse(null);
   }
 
   public String getPassword() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getPassword();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getPassword)
+      .orElse(null);
   }
 
   @JsonIgnore
-  public UsernamePasswordCredentials getUsernamePasswordCredentials() {
-    return new UsernamePasswordCredentials(credentialsProvider);
+  public Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials() {
+    return new UsernamePasswordCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .build();
   }
 
   @JsonIgnore
@@ -102,4 +109,9 @@ public class ElasticsearchStorageConfig extends AbstractSecuredStoragePluginConf
   public int hashCode() {
     return Objects.hash(hosts, credentialsProvider);
   }
+
+  @Override
+  public CredentialedStoragePluginConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return this;
+  }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java
index c9f7e31280..9263c0df3d 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java
@@ -34,12 +34,14 @@ public class HttpAPIConnectionSchema extends AbstractSchema {
 
   private final HttpStoragePlugin plugin;
   private final Map<String, DynamicDrillTable> activeTables = CaseInsensitiveMap.newHashMap();
+  private final String queryUserName;
 
   public HttpAPIConnectionSchema(HttpSchema parent,
                                  String name,
-                                 HttpStoragePlugin plugin) {
+                                 HttpStoragePlugin plugin, String queryUserName) {
     super(parent.getSchemaPath(), name);
     this.plugin = plugin;
+    this.queryUserName = queryUserName;
   }
 
   @Override
@@ -63,11 +65,10 @@ public class HttpAPIConnectionSchema extends AbstractSchema {
       // Return the found table
       return table;
     } else {
-
       // Register a new table
       return registerTable(tableName, new DynamicDrillTable(plugin, plugin.getName(),
         new HttpScanSpec(plugin.getName(), name, tableName,
-              plugin.getConfig().copyForPlan(name), plugin.getTokenTable(), plugin.getRegistry())));
+              plugin.getConfig().copyForPlan(name), plugin.getTokenTable(), queryUserName, plugin.getRegistry())));
     }
   }
 
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
index 5da4d15be8..581b61cfcc 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
@@ -30,6 +30,7 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.store.security.CredentialProviderUtils;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.apache.drill.exec.store.security.UsernamePasswordWithProxyCredentials;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +38,7 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
 @JsonDeserialize(builder = HttpApiConfig.HttpApiConfigBuilder.class)
@@ -335,18 +337,22 @@ public class HttpApiConfig {
 
   @JsonProperty
   public String userName() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getUsername();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getUsername)
+      .orElse(null);
   }
 
   @JsonProperty
   public String password() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getPassword();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getPassword)
+      .orElse(null);
   }
 
   @JsonIgnore
@@ -365,8 +371,18 @@ public class HttpApiConfig {
   }
 
   @JsonIgnore
-  public UsernamePasswordCredentials getUsernamePasswordCredentials() {
-    return new UsernamePasswordCredentials(credentialsProvider);
+  public Optional<UsernamePasswordWithProxyCredentials> getUsernamePasswordCredentials() {
+    return new UsernamePasswordWithProxyCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .build();
+  }
+
+  @JsonIgnore
+  public Optional<UsernamePasswordWithProxyCredentials> getUsernamePasswordCredentials(String username) {
+    return new UsernamePasswordWithProxyCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .setQueryUser(username)
+      .build();
   }
 
   @JsonProperty
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index f6651d7d55..b99ec57fec 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -38,8 +38,8 @@ import org.apache.drill.exec.store.http.paginator.Paginator;
 import org.apache.drill.exec.store.http.util.HttpProxyConfig;
 import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyBuilder;
 import org.apache.drill.exec.store.http.util.SimpleHttp;
-import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.exec.store.ImplicitColumnUtils.ImplicitColumns;
+import org.apache.drill.exec.store.security.UsernamePasswordWithProxyCredentials;
 import org.apache.drill.shaded.guava.com.google.common.base.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +48,7 @@ import java.io.File;
 import java.io.InputStream;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
 
@@ -242,13 +243,15 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
         .fromConfigForURL(drillConfig, url.toString());
     final String proxyType = config.proxyType();
     if (proxyType != null && !"direct".equals(proxyType)) {
-      UsernamePasswordCredentials credentials = config.getUsernamePasswordCredentials();
       builder
         .type(config.proxyType())
         .host(config.proxyHost())
-        .port(config.proxyPort())
-        .username(credentials.getUsername())
-        .password(credentials.getPassword());
+        .port(config.proxyPort());
+
+      Optional<UsernamePasswordWithProxyCredentials> credentials = config.getUsernamePasswordCredentials();
+      if (credentials.isPresent()) {
+        builder.username(credentials.get().getUsername()).password(credentials.get().getPassword());
+      }
     }
     return builder.build();
   }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
index 619858dc30..5eab823500 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
@@ -49,6 +49,7 @@ public class HttpGroupScan extends AbstractGroupScan {
   private final ScanStats scanStats;
   private final double filterSelectivity;
   private final int maxRecords;
+  private final String username;
 
   // Used only in planner, not serialized
   private int hashCode;
@@ -57,8 +58,9 @@ public class HttpGroupScan extends AbstractGroupScan {
    * Creates a new group scan from the storage plugin.
    */
   public HttpGroupScan (HttpScanSpec scanSpec) {
-    super("no-user");
+    super(scanSpec.queryUserName());
     this.httpScanSpec = scanSpec;
+    this.username = scanSpec.queryUserName();
     this.columns = ALL_COLUMNS;
     this.filters = null;
     this.filterSelectivity = 0.0;
@@ -76,6 +78,7 @@ public class HttpGroupScan extends AbstractGroupScan {
     this.filters = that.filters;
     this.filterSelectivity = that.filterSelectivity;
     this.maxRecords = that.maxRecords;
+    this.username = that.username;
 
     // Calcite makes many copies in the later stage of planning
     // without changing anything. Retain the previous stats.
@@ -96,6 +99,7 @@ public class HttpGroupScan extends AbstractGroupScan {
     this.filters = that.filters;
     this.filterSelectivity = that.filterSelectivity;
     this.scanStats = computeScanStats();
+    this.username = that.username;
     this.maxRecords = that.maxRecords;
   }
 
@@ -107,6 +111,7 @@ public class HttpGroupScan extends AbstractGroupScan {
     super(that);
     this.columns = that.columns;
     this.httpScanSpec = that.httpScanSpec;
+    this.username = that.username;
 
     // Applies a filter.
     this.filters = filters;
@@ -122,6 +127,7 @@ public class HttpGroupScan extends AbstractGroupScan {
     super(that);
     this.columns = that.columns;
     this.httpScanSpec = that.httpScanSpec;
+    this.username = that.username;
 
     // Applies a filter.
     this.filters = that.filters;
@@ -143,9 +149,10 @@ public class HttpGroupScan extends AbstractGroupScan {
     @JsonProperty("filterSelectivity") double selectivity,
     @JsonProperty("maxRecords") int maxRecords
   ) {
-    super("no-user");
+    super(httpScanSpec.queryUserName());
     this.columns = columns;
     this.httpScanSpec = httpScanSpec;
+    this.username = httpScanSpec.queryUserName();
     this.filters = filters;
     this.filterSelectivity = selectivity;
     this.scanStats = computeScanStats();
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java
index e43490c8f5..151532cb01 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java
@@ -35,6 +35,7 @@ public class HttpScanSpec {
   private final HttpStoragePluginConfig config;
   private final StoragePluginRegistry registry;
   private final PersistentTokenTable tokenTable;
+  private final String queryUserName;
 
   @JsonCreator
   public HttpScanSpec(@JsonProperty("pluginName") String pluginName,
@@ -42,6 +43,7 @@ public class HttpScanSpec {
                       @JsonProperty("tableName") String tableName,
                       @JsonProperty("config") HttpStoragePluginConfig config,
                       @JsonProperty("tokenTable") PersistentTokenTable tokenTable,
+                      @JsonProperty("queryUserName") String queryUserName,
                       @JacksonInject StoragePluginRegistry engineRegistry) {
     this.pluginName = pluginName;
     this.connectionName = connectionName;
@@ -49,6 +51,7 @@ public class HttpScanSpec {
     this.config = config;
     this.registry = engineRegistry;
     this.tokenTable = tokenTable;
+    this.queryUserName = queryUserName;
   }
 
   @JsonProperty("pluginName")
@@ -71,6 +74,11 @@ public class HttpScanSpec {
     return config;
   }
 
+  @JsonProperty("queryUserName")
+  public String queryUserName() {
+    return queryUserName;
+  }
+
   @JsonIgnore
   public PersistentTokenTable getTokenTable() {
     return tokenTable;
@@ -98,6 +106,7 @@ public class HttpScanSpec {
       .field("database", connectionName)
       .field("tableName", tableName)
       .field("config", config)
+      .field("queryUserName", queryUserName)
       .toString();
   }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java
index 661cbef9a7..ef6368d63c 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java
@@ -44,7 +44,7 @@ public class HttpSchemaFactory extends AbstractSchemaFactory {
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
-    HttpSchema schema = new HttpSchema(plugin);
+    HttpSchema schema = new HttpSchema(plugin, schemaConfig.getUserName());
     logger.debug("Registering {} {}", schema.getName(), schema.toString());
 
     SchemaPlus schemaPlus = parent.add(getName(), schema);
@@ -57,15 +57,17 @@ public class HttpSchemaFactory extends AbstractSchemaFactory {
     private final Map<String, HttpAPIConnectionSchema> subSchemas = CaseInsensitiveMap.newHashMap();
     private final Map<String, HttpApiConfig> tables = CaseInsensitiveMap.newHashMap();
     private final Map<String, DynamicDrillTable> activeTables = CaseInsensitiveMap.newHashMap();
+    private final String queryUserName;
 
-    public HttpSchema(HttpStoragePlugin plugin) {
+    public HttpSchema(HttpStoragePlugin plugin, String queryUserName) {
       super(Collections.emptyList(), plugin.getName());
+      this.queryUserName = queryUserName;
       this.plugin = plugin;
       for (Entry<String, HttpApiConfig> entry : plugin.getConfig().connections().entrySet()) {
         String configName = entry.getKey();
         HttpApiConfig config = entry.getValue();
         if (config.requireTail()) {
-          subSchemas.put(configName, new HttpAPIConnectionSchema(this, configName, plugin));
+          subSchemas.put(configName, new HttpAPIConnectionSchema(this, configName, plugin, queryUserName));
         } else {
           tables.put(configName, config);
         }
@@ -104,7 +106,7 @@ public class HttpSchemaFactory extends AbstractSchemaFactory {
         // Register a new table
         return registerTable(name, new DynamicDrillTable(plugin, plugin.getName(),
             new HttpScanSpec(plugin.getName(), name, null,
-                plugin.getConfig().copyForPlan(name), plugin.getTokenTable(), plugin.getRegistry())));
+                plugin.getConfig().copyForPlan(name), plugin.getTokenTable(), queryUserName, plugin.getRegistry())));
       } else {
         return null; // Unknown table
       }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
index 721db6ae86..3af4ed2bed 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.store.http;
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.map.CaseInsensitiveMap;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -36,11 +36,12 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 
 @JsonTypeName(HttpStoragePluginConfig.NAME)
-public class HttpStoragePluginConfig extends AbstractSecuredStoragePluginConfig {
+public class HttpStoragePluginConfig extends CredentialedStoragePluginConfig {
   private static final Logger logger = LoggerFactory.getLogger(HttpStoragePluginConfig.class);
   public static final String NAME = "http";
 
@@ -68,18 +69,21 @@ public class HttpStoragePluginConfig extends AbstractSecuredStoragePluginConfig
                                  @JsonProperty("proxyUsername") String proxyUsername,
                                  @JsonProperty("proxyPassword") String proxyPassword,
                                  @JsonProperty("oAuthConfig") HttpOAuthConfig oAuthConfig,
-                                 @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider
+                                 @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider,
+                                 @JsonProperty("authMode") String authMode
                                  ) {
     super(CredentialProviderUtils.getCredentialsProvider(
-        getClientID(new OAuthTokenCredentials(credentialsProvider)),
-        getClientSecret(new OAuthTokenCredentials(credentialsProvider)),
-        getTokenURL(new OAuthTokenCredentials(credentialsProvider)),
+        null,
+        null,
+        null,
         normalize(username),
         normalize(password),
         normalize(proxyUsername),
         normalize(proxyPassword),
         credentialsProvider),
-        credentialsProvider == null);
+        credentialsProvider == null,
+        AuthMode.parseOrDefault(authMode)
+    );
     this.cacheResults = cacheResults != null && cacheResults;
 
     this.connections = CaseInsensitiveMap.newHashMap();
@@ -110,6 +114,17 @@ public class HttpStoragePluginConfig extends AbstractSecuredStoragePluginConfig
     }
   }
 
+  private HttpStoragePluginConfig(HttpStoragePluginConfig that, CredentialsProvider credentialsProvider) {
+    super(credentialsProvider, credentialsProvider == null, that.authMode);
+    this.cacheResults = that.cacheResults;
+    this.connections = that.connections;
+    this.timeout = that.timeout;
+    this.proxyHost = that.proxyHost;
+    this.proxyPort = that.proxyPort;
+    this.proxyType = that.proxyType;
+    this.oAuthConfig = that.oAuthConfig;
+  }
+
   /**
    * Clone constructor used for updating OAuth tokens
    * @param that The current HTTP Plugin Config
@@ -141,12 +156,22 @@ public class HttpStoragePluginConfig extends AbstractSecuredStoragePluginConfig
    * The copy is used in the query plan to avoid including unnecessary information.
    */
   public HttpStoragePluginConfig copyForPlan(String connectionName) {
+    Optional<UsernamePasswordWithProxyCredentials> creds = getUsernamePasswordCredentials();
     return new HttpStoragePluginConfig(
-        cacheResults, configFor(connectionName), timeout,
-      getUsernamePasswordCredentials().getUsername(),
-      getUsernamePasswordCredentials().getPassword(),
-        proxyHost, proxyPort, proxyType, getUsernamePasswordCredentials().getProxyUsername(),
-      getUsernamePasswordCredentials().getProxyPassword(), oAuthConfig, credentialsProvider);
+      cacheResults,
+      configFor(connectionName),
+      timeout,
+      username(),
+      password(),
+      proxyHost,
+      proxyPort,
+      proxyType,
+      proxyUsername(),
+      proxyPassword(),
+      oAuthConfig,
+      credentialsProvider,
+      authMode.name()
+    );
   }
 
   private Map<String, HttpApiConfig> configFor(String connectionName) {
@@ -164,12 +189,13 @@ public class HttpStoragePluginConfig extends AbstractSecuredStoragePluginConfig
     }
     HttpStoragePluginConfig thatConfig = (HttpStoragePluginConfig) that;
     return Objects.equals(connections, thatConfig.connections) &&
-           Objects.equals(cacheResults, thatConfig.cacheResults) &&
-           Objects.equals(proxyHost, thatConfig.proxyHost) &&
-           Objects.equals(proxyPort, thatConfig.proxyPort) &&
-           Objects.equals(proxyType, thatConfig.proxyType) &&
-           Objects.equals(oAuthConfig, thatConfig.oAuthConfig) &&
-           Objects.equals(credentialsProvider, thatConfig.credentialsProvider);
+      Objects.equals(cacheResults, thatConfig.cacheResults) &&
+      Objects.equals(proxyHost, thatConfig.proxyHost) &&
+      Objects.equals(proxyPort, thatConfig.proxyPort) &&
+      Objects.equals(proxyType, thatConfig.proxyType) &&
+      Objects.equals(oAuthConfig, thatConfig.oAuthConfig) &&
+      Objects.equals(credentialsProvider, thatConfig.credentialsProvider) &&
+      Objects.equals(authMode, thatConfig.authMode);
   }
 
   @Override
@@ -183,13 +209,14 @@ public class HttpStoragePluginConfig extends AbstractSecuredStoragePluginConfig
       .field("credentialsProvider", credentialsProvider)
       .field("oauthConfig", oAuthConfig)
       .field("proxyType", proxyType)
+      .field("authMode", authMode)
       .toString();
   }
 
   @Override
   public int hashCode() {
     return Objects.hash(connections, cacheResults, timeout,
-        proxyHost, proxyPort, proxyType, oAuthConfig, credentialsProvider);
+        proxyHost, proxyPort, proxyType, oAuthConfig, credentialsProvider, authMode);
   }
 
   @JsonProperty("cacheResults")
@@ -201,7 +228,7 @@ public class HttpStoragePluginConfig extends AbstractSecuredStoragePluginConfig
   @JsonProperty("timeout")
   public int timeout() { return timeout;}
 
-  @JsonProperty("proxyHost")
+   @JsonProperty("proxyHost")
   public String proxyHost() { return proxyHost; }
 
   @JsonProperty("proxyPort")
@@ -214,34 +241,42 @@ public class HttpStoragePluginConfig extends AbstractSecuredStoragePluginConfig
 
   @JsonProperty("username")
   public String username() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getUsername();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordWithProxyCredentials::getUsername)
+      .orElse(null);
   }
 
   @JsonProperty("password")
   public String password() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getPassword();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordWithProxyCredentials::getPassword)
+      .orElse(null);
   }
 
   @JsonProperty("proxyUsername")
   public String proxyUsername() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getProxyUsername();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordWithProxyCredentials::getProxyUsername)
+      .orElse(null);
   }
 
   @JsonProperty("proxyPassword")
   public String proxyPassword() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getProxyPassword();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordWithProxyCredentials::getProxyPassword)
+      .orElse(null);
   }
 
   @JsonIgnore
@@ -268,12 +303,29 @@ public class HttpStoragePluginConfig extends AbstractSecuredStoragePluginConfig
   }
 
   @JsonIgnore
-  public UsernamePasswordWithProxyCredentials getUsernamePasswordCredentials() {
-    return new UsernamePasswordWithProxyCredentials(credentialsProvider);
+  public Optional<UsernamePasswordWithProxyCredentials> getUsernamePasswordCredentials() {
+    return new UsernamePasswordWithProxyCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .build();
+  }
+
+  @JsonIgnore
+  public Optional<UsernamePasswordWithProxyCredentials> getUsernamePasswordCredentials(String username) {
+    return new UsernamePasswordWithProxyCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .setQueryUser(username)
+      .build();
   }
 
   @JsonIgnore
-  public static OAuthTokenCredentials getOAuthCredentials(CredentialsProvider credentialsProvider) {
-    return new OAuthTokenCredentials(credentialsProvider);
+  public static Optional<OAuthTokenCredentials> getOAuthCredentials(CredentialsProvider credentialsProvider) {
+    return new OAuthTokenCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .build();
+  }
+
+  @Override
+  public HttpStoragePluginConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return new HttpStoragePluginConfig(this, credentialsProvider);
   }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
index e517259586..7e7dc33582 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
@@ -51,7 +51,7 @@ public class HttpSubScan extends AbstractBase implements SubScan {
     @JsonProperty("filters") Map<String, String> filters,
     @JsonProperty("maxRecords") int maxRecords
     ) {
-    super("user-if-needed");
+    super(tableSpec.queryUserName());
     this.tableSpec = tableSpec;
     this.columns = columns;
     this.filters = filters;
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/oauth/AccessTokenRepository.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/oauth/AccessTokenRepository.java
index 3c7b06c87d..48ce501967 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/oauth/AccessTokenRepository.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/oauth/AccessTokenRepository.java
@@ -59,7 +59,10 @@ public class AccessTokenRepository {
     accessToken = tokenTable.getAccessToken();
     refreshToken = tokenTable.getRefreshToken();
 
-    this.credentials = new OAuthTokenCredentials(credentialsProvider, tokenTable);
+    this.credentials = new OAuthTokenCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .setTokenTable(tokenTable)
+      .build().get();
 
     // Add proxy info
     SimpleHttp.addProxyInfo(builder, proxyConfig);
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index f38f7643a3..0b1103073c 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -32,6 +32,7 @@ import okhttp3.Response;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
@@ -55,7 +56,7 @@ import org.apache.drill.exec.store.http.oauth.AccessTokenAuthenticator;
 import org.apache.drill.exec.store.http.oauth.AccessTokenInterceptor;
 import org.apache.drill.exec.store.http.oauth.AccessTokenRepository;
 import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyBuilder;
-import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.apache.drill.exec.store.security.UsernamePasswordWithProxyCredentials;
 import org.jetbrains.annotations.NotNull;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
@@ -84,6 +85,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -117,6 +119,7 @@ public class SimpleHttp {
   private int responseCode;
   private String responseProtocol;
   private String responseURL;
+  private String username;
 
 
   public SimpleHttp(HttpSubScan scanDefn, HttpUrl url, File tempDir,
@@ -125,6 +128,7 @@ public class SimpleHttp {
     this.pluginConfig = scanDefn.tableSpec().config();
     this.connection = scanDefn.tableSpec().connection();
     this.oAuthConfig = scanDefn.tableSpec().config().oAuthConfig();
+    this.username = scanDefn.getUserName();
     this.filters = scanDefn.filters();
     this.url = url;
     this.tempDir = tempDir;
@@ -208,8 +212,19 @@ public class SimpleHttp {
       // If the API uses basic authentication add the authentication code.  Use the global credentials unless there are credentials
       // for the specific endpoint.
       logger.debug("Adding Interceptor");
-      UsernamePasswordCredentials credentials = getCredentials();
-      builder.addInterceptor(new BasicAuthInterceptor(credentials.getUsername(), credentials.getPassword()));
+      Optional<UsernamePasswordWithProxyCredentials> credentials;
+      if (pluginConfig.getAuthMode() == AuthMode.USER_TRANSLATION) {
+        credentials = getCredentials(username);
+        if (!credentials.isPresent() || StringUtils.isEmpty(credentials.get().getUsername()) || StringUtils.isEmpty(credentials.get().getPassword())) {
+          throw UserException.connectionError()
+            .message("You do not have valid credentials for this API.  Please provide your credentials.")
+            .addContext(errorContext)
+            .build(logger);
+        }
+      } else {
+        credentials = getCredentials();
+      }
+      builder.addInterceptor(new BasicAuthInterceptor(credentials.get().getUsername(), credentials.get().getPassword()));
     }
 
     // Set timeouts
@@ -415,13 +430,17 @@ public class SimpleHttp {
       .fromConfigForURL(drillConfig, url.toString());
     final String proxyType = config.proxyType();
     if (proxyType != null && !"direct".equals(proxyType)) {
-      UsernamePasswordCredentials credentials = config.getUsernamePasswordCredentials();
       builder
         .type(config.proxyType())
         .host(config.proxyHost())
-        .port(config.proxyPort())
-        .username(credentials.getUsername())
-        .password(credentials.getPassword());
+        .port(config.proxyPort());
+
+      Optional<UsernamePasswordWithProxyCredentials> credentials = config.getUsernamePasswordCredentials();
+
+      if (credentials.isPresent()) {
+        builder.username(credentials.get().getUsername())
+          .password(credentials.get().getPassword());
+      }
     }
     return builder.build();
   }
@@ -444,32 +463,19 @@ public class SimpleHttp {
     }
   }
 
-  /**
-   * Logic to determine whether the API connection has global credentials or credentials specific for the
-   * API endpoint.
-   * @param endpointConfig The API endpoint configuration
-   * @return True if the endpoint has credentials, false if not.
-   */
-  private boolean hasEndpointCredentials(HttpApiConfig endpointConfig) {
-    UsernamePasswordCredentials credentials = endpointConfig.getUsernamePasswordCredentials();
-    if (StringUtils.isNotEmpty(credentials.getUsername()) &&
-    StringUtils.isNotEmpty(credentials.getPassword())) {
-      return true;
-    }
-    return false;
-  }
-
   /**
    * If the user has defined username/password for the specific API endpoint, pass the API endpoint credentials.
    * Otherwise, use the global connection credentials.
    * @return A UsernamePasswordCredentials collection with the correct username/password
    */
-  private UsernamePasswordCredentials getCredentials() {
-    if (hasEndpointCredentials(apiConfig)) {
-      return apiConfig.getUsernamePasswordCredentials();
-    } else {
-      return pluginConfig.getUsernamePasswordCredentials();
-    }
+  private Optional<UsernamePasswordWithProxyCredentials> getCredentials() {
+    Optional<UsernamePasswordWithProxyCredentials> apiCreds = apiConfig.getUsernamePasswordCredentials();
+    return apiCreds.isPresent() ? apiCreds : pluginConfig.getUsernamePasswordCredentials();
+  }
+
+  private Optional<UsernamePasswordWithProxyCredentials> getCredentials(String queryUser) {
+    Optional<UsernamePasswordWithProxyCredentials> apiCreds = apiConfig.getUsernamePasswordCredentials();
+    return apiCreds.isPresent() ? apiCreds : pluginConfig.getUsernamePasswordCredentials(queryUser);
   }
 
   /**
@@ -971,6 +977,7 @@ public class SimpleHttp {
     private HttpOAuthConfig oAuthConfig;
     private Map<String,String> filters;
     private String connection;
+    private String username;
 
     public SimpleHttpBuilder scanDefn(HttpSubScan scanDefn) {
       this.scanDefn = scanDefn;
@@ -979,6 +986,7 @@ public class SimpleHttp {
       this.oAuthConfig = scanDefn.tableSpec().config().oAuthConfig();
       this.tokenTable = scanDefn.tableSpec().getTokenTable();
       this.filters = scanDefn.filters();
+      this.username = scanDefn.getUserName();
       return this;
     }
 
@@ -992,6 +1000,11 @@ public class SimpleHttp {
       return this;
     }
 
+    public SimpleHttpBuilder username(String username) {
+      this.username = username;
+      return this;
+    }
+
     public SimpleHttpBuilder proxyConfig(HttpProxyConfig proxyConfig) {
       this.proxyConfig = proxyConfig;
       return this;
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index dd1f13d618..918d2eded2 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -22,6 +22,7 @@ import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -135,7 +136,7 @@ public class TestHttpPlugin extends ClusterTest {
     configs.put("pokemon", pokemonConfig);
 
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
-        new HttpStoragePluginConfig(false, configs, 10, null, null, "", 80, "", "", "", null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+        new HttpStoragePluginConfig(false, configs, 10, null, null, "", 80, "", "", "", null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER, AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("live", mockStorageConfigWithWorkspace);
   }
@@ -343,7 +344,7 @@ public class TestHttpPlugin extends ClusterTest {
         new HttpStoragePluginConfig(false, configs, 2, "globaluser", "globalpass", "",
           80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
           UsernamePasswordCredentials.USERNAME, "globaluser",
-          UsernamePasswordCredentials.PASSWORD, "globalpass")));
+          UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
   }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
index 8f17b265e6..9e32116d9f 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
@@ -21,6 +21,7 @@ package org.apache.drill.exec.store.http;
 import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.common.util.DrillFileUtils;
 import org.apache.drill.exec.physical.rowSet.RowSet;
@@ -72,7 +73,7 @@ public class TestHttpUDFFunctions extends ClusterTest {
       new HttpStoragePluginConfig(false, configs, 2, "globaluser", "globalpass", "",
         80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
         UsernamePasswordCredentials.USERNAME, "globaluser",
-        UsernamePasswordCredentials.PASSWORD, "globalpass")));
+        UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
   }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
index 521dcb2b45..11d61f5d00 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
@@ -23,6 +23,7 @@ import okhttp3.Request;
 import okhttp3.Response;
 import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -110,7 +111,7 @@ public class TestOAuthProcess extends ClusterTest {
     // Add storage plugin for test OAuth
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
       new HttpStoragePluginConfig(false, configs, TIMEOUT, null, null, "", 80, "", "", "",
-        oAuthConfig, credentialsProvider);
+        oAuthConfig, credentialsProvider, AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("localOauth", mockStorageConfigWithWorkspace);
   }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java
index dbadcd133d..81739d5c3f 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java
@@ -24,6 +24,7 @@ import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.RequestBody;
 import okhttp3.Response;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.exec.ExecConstants;
@@ -90,7 +91,7 @@ public class TestOAuthTokenUpdate extends ClusterTest {
     // Add storage plugin for test OAuth
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
       new HttpStoragePluginConfig(false, configs, TIMEOUT,null, null, "", 80, "", "", "",
-        oAuthConfig, credentialsProvider);
+        oAuthConfig, credentialsProvider, AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("localOauth", mockStorageConfigWithWorkspace);
   }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
index 45f62631af..b8c5972623 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
@@ -21,6 +21,7 @@ package org.apache.drill.exec.store.http;
 import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.util.DrillFileUtils;
@@ -115,7 +116,8 @@ public class TestPagination extends ClusterTest {
     configs.put("github", githubConfig);
 
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
-      new HttpStoragePluginConfig(false, configs, 10, null, null, "", 80, "", "", "", null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+      new HttpStoragePluginConfig(false, configs, 10, null, null, "", 80, "", "", "", null,
+        PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER, AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("live", mockStorageConfigWithWorkspace);
   }
@@ -195,7 +197,8 @@ public class TestPagination extends ClusterTest {
     configs.put("xml_paginator_url_params", mockXmlConfigWithPaginatorAndUrlParams);
 
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
-      new HttpStoragePluginConfig(false, configs, 2, null, null, "", 80, "", "", "", null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+      new HttpStoragePluginConfig(false, configs, 2, null, null, "", 80, "", "", "", null,
+        PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER, AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
   }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java
new file mode 100644
index 0000000000..2256393563
--- /dev/null
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http;
+
+import okhttp3.Cookie;
+import okhttp3.CookieJar;
+import okhttp3.FormBody;
+import okhttp3.Headers;
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.net.util.Base64;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1_PASSWORD;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_2;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_2_PASSWORD;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+public class TestUserTranslationInHttpPlugin extends ClusterTest {
+
+  private static final int MOCK_SERVER_PORT = 47775;
+
+  private static final int TIMEOUT = 30;
+  private final OkHttpClient httpClient = new OkHttpClient.Builder()
+    .connectTimeout(TIMEOUT, TimeUnit.SECONDS)
+    .writeTimeout(TIMEOUT, TimeUnit.SECONDS)
+    .readTimeout(TIMEOUT, TimeUnit.SECONDS)
+    .cookieJar(new TestCookieJar())
+    .build();
+
+  private static String TEST_JSON_RESPONSE_WITH_DATATYPES;
+  private static int portNumber;
+
+
+  @ClassRule
+  public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
+  @After
+  public void cleanup() throws Exception {
+    FileUtils.cleanDirectory(dirTestWatcher.getStoreDir());
+  }
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    TEST_JSON_RESPONSE_WITH_DATATYPES = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response2.json"), Charsets.UTF_8).read();
+
+    ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher)
+      .configProperty(ExecConstants.HTTP_ENABLE, true)
+      .configProperty(ExecConstants.HTTP_PORT_HUNT, true)
+      .configProperty(ExecConstants.USER_AUTHENTICATOR_IMPL, UserAuthenticatorTestImpl.TYPE)
+      .configProperty(ExecConstants.IMPERSONATION_ENABLED, true);
+
+    startCluster(builder);
+
+    portNumber = cluster.drillbit().getWebServerPort();
+
+    HttpApiConfig testEndpoint = HttpApiConfig.builder()
+      .url(makeUrl("http://localhost:%d/json"))
+      .method("GET")
+      .requireTail(false)
+      .authType("basic")
+      .errorOn400(true)
+      .build();
+
+    Map<String, HttpApiConfig> configs = new HashMap<>();
+    configs.put("sharedEndpoint", testEndpoint);
+
+
+    Map<String, String> credentials = new HashMap<>();
+    credentials.put("username", "user2user");
+    credentials.put("password", "user2pass");
+
+    PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(TEST_USER_2, credentials);
+
+
+    HttpStoragePluginConfig mockStorageConfigWithWorkspace =
+      new HttpStoragePluginConfig(false, configs, 2, null, null, "",
+        80, "", "", "", null, credentialsProvider, AuthMode.USER_TRANSLATION.name());
+    mockStorageConfigWithWorkspace.setEnabled(true);
+    cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
+  }
+
+  @Test
+  public void testEmptyUserCredentials() throws Exception {
+    ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_1)
+      .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+      .build();
+
+    // First verify that the user has no credentials
+    StoragePluginRegistry registry = cluster.storageRegistry();
+    StoragePlugin plugin = registry.getPlugin("local");
+    PlainCredentialsProvider credentialsProvider = (PlainCredentialsProvider)((CredentialedStoragePluginConfig)plugin.getConfig()).getCredentialsProvider();
+    Map<String, String> credentials = credentialsProvider.getCredentials(TEST_USER_1);
+    assertNotNull(credentials);
+    assertNull(credentials.get("username"));
+    assertNull(credentials.get("password"));
+  }
+
+  @Test
+  public void testQueryWithValidCredentials() throws Exception {
+    // This test validates that the correct credentials are sent down to the HTTP API.
+    ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_2)
+      .property(DrillProperties.PASSWORD, TEST_USER_2_PASSWORD)
+      .build();
+
+    try (MockWebServer server = startServer()) {
+      server.enqueue(new MockResponse()
+        .setResponseCode(200)
+        .setBody(TEST_JSON_RESPONSE_WITH_DATATYPES));
+
+      String sql = "SELECT * FROM local.sharedEndpoint";
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+      assertEquals(results.rowCount(), 2);
+      results.clear();
+
+      // Verify correct username/password from endpoint configuration
+      RecordedRequest recordedRequest = server.takeRequest();
+      Headers headers = recordedRequest.getHeaders();
+      assertEquals(headers.get("Authorization"), createEncodedText("user2user", "user2pass") );
+    }
+  }
+
+  @Test
+  public void testQueryWithMissingCredentials() throws Exception {
+    // This test validates that the correct credentials are sent down to the HTTP API.
+    ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_1)
+      .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+      .build();
+
+    try (MockWebServer server = startServer()) {
+      server.enqueue(new MockResponse()
+        .setResponseCode(200)
+        .setBody(TEST_JSON_RESPONSE_WITH_DATATYPES));
+
+      String sql = "SELECT * FROM local.sharedEndpoint";
+      try {
+        client.queryBuilder().sql(sql).run();
+        fail();
+      } catch (UserException e) {
+        assertTrue(e.getMessage().contains("You do not have valid credentials for this API."));
+      }
+    }
+  }
+
+  private boolean makeLoginRequest(String username, String password) throws IOException {
+    String loginURL =  "http://localhost:" + portNumber + "/j_security_check";
+
+    RequestBody formBody = new FormBody.Builder()
+      .add("j_username", username)
+      .add("j_password", password)
+      .build();
+
+    Request request = new Request.Builder()
+      .url(loginURL)
+      .post(formBody)
+      .addHeader("Content-Type", "application/x-www-form-urlencoded")
+      .addHeader("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8")
+      .build();
+
+    Response response = httpClient.newCall(request).execute();
+    return response.code() == 200;
+  }
+
+  @Test
+  public void testUnrelatedQueryWithUser() throws Exception {
+    // This test verifies that a query with a user that does NOT have credentials
+    // for a plugin using user translation will still execute.
+    ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_1)
+      .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+      .build();
+
+    String sql = "SHOW FILES IN dfs";
+    QuerySummary result = client.queryBuilder().sql(sql).run();
+    assertTrue(result.succeeded());
+  }
+
+  /**
+   * Helper function to start the MockHTTPServer
+   * @return Started Mock server
+   * @throws IOException If the server cannot start, throws IOException
+   */
+  public static MockWebServer startServer () throws IOException {
+    MockWebServer server = new MockWebServer();
+    server.start(MOCK_SERVER_PORT);
+    return server;
+  }
+
+  public static String makeUrl(String url) {
+    return String.format(url, MOCK_SERVER_PORT);
+  }
+
+  private static String createEncodedText(String username, String password) {
+    String pair = username + ":" + password;
+    byte[] encodedBytes = Base64.encodeBase64(pair.getBytes());
+    return "Basic " + new String(encodedBytes);
+  }
+
+  public static class TestCookieJar implements CookieJar {
+
+    private List<Cookie> cookies;
+
+    @Override
+    public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {
+      this.cookies =  cookies;
+    }
+
+    @Override
+    public List<Cookie> loadForRequest(HttpUrl url) {
+      if (cookies != null) {
+        return cookies;
+      }
+      return new ArrayList<>();
+    }
+  }
+
+
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
index 1d7503ff67..09b09873a9 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
@@ -112,7 +112,7 @@ public class CapitalizingJdbcSchema extends AbstractSchema {
 
   @Override
   public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy strategy) {
-    if (! plugin.getConfig().isWritable()) {
+    if (plugin.getConfig().isWritable() == null || (! plugin.getConfig().isWritable())) {
       throw UserException
         .dataWriteError()
         .message(plugin.getName() + " is not writable.")
@@ -145,7 +145,8 @@ public class CapitalizingJdbcSchema extends AbstractSchema {
 
     String tableWithSchema = CreateTableStmtBuilder.buildCompleteTableName(tableName, catalog, schema);
     String dropTableQuery = String.format("DROP TABLE %s", tableWithSchema);
-    dropTableQuery = JdbcDDLQueryUtils.cleanDDLQuery(dropTableQuery, plugin.getDialect());
+    SqlDialect dialect = plugin.getDialect(inner.getDataSource());
+    dropTableQuery = JdbcDDLQueryUtils.cleanDDLQuery(dropTableQuery, dialect);
 
     try (Connection conn = inner.getDataSource().getConnection();
          Statement stmt = conn.createStatement()) {
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java
index 64c0ce322a..74769d28c4 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java
@@ -26,25 +26,42 @@ import org.apache.calcite.sql.SqlDialect;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.SubsetRemover;
 
+import java.util.Optional;
+import javax.sql.DataSource;
+
 public class DefaultJdbcDialect implements JdbcDialect {
+
   private final JdbcStoragePlugin plugin;
+  private final SqlDialect dialect;
 
-  public DefaultJdbcDialect(JdbcStoragePlugin plugin) {
+  public DefaultJdbcDialect(JdbcStoragePlugin plugin, SqlDialect dialect) {
     this.plugin = plugin;
+    this.dialect = dialect;
   }
 
   @Override
   public void registerSchemas(SchemaConfig config, SchemaPlus parent) {
-      JdbcCatalogSchema schema = new JdbcCatalogSchema(plugin.getName(),
-        plugin.getDataSource(), plugin.getDialect(), plugin.getConvention(),
-        !plugin.getConfig().areTableNamesCaseInsensitive());
-      SchemaPlus holder = parent.add(plugin.getName(), schema);
-      schema.setHolder(holder);
+    Optional<DataSource> dataSource = plugin.getDataSource(config.getQueryUserCredentials());
+
+    if (!dataSource.isPresent()) {
+      return;
+    }
+
+    DrillJdbcConvention convention = plugin.getConvention(dialect, config.getQueryUserCredentials().getUserName());
+
+    JdbcCatalogSchema schema = new JdbcCatalogSchema(
+      plugin.getName(),
+      dataSource.get(),
+      dialect,
+      convention,
+      !plugin.getConfig().areTableNamesCaseInsensitive()
+    );
+    SchemaPlus holder = parent.add(plugin.getName(), schema);
+    schema.setHolder(holder);
   }
 
   @Override
   public String generateSql(RelOptCluster cluster, RelNode input) {
-    final SqlDialect dialect = plugin.getDialect();
     final JdbcImplementor jdbcImplementor = new JdbcImplementor(dialect,
         (JavaTypeFactory) cluster.getTypeFactory());
     final JdbcImplementor.Result result = jdbcImplementor.visitChild(0,
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
index 55ac14514e..07b12b7b17 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
@@ -55,10 +55,12 @@ public class DrillJdbcConvention extends JdbcConvention {
 
   private final ImmutableSet<RelOptRule> rules;
   private final JdbcStoragePlugin plugin;
+  private final String username;
 
-  DrillJdbcConvention(SqlDialect dialect, String name, JdbcStoragePlugin plugin) {
+  DrillJdbcConvention(SqlDialect dialect, String name, JdbcStoragePlugin plugin, String username) {
     super(dialect, ConstantUntypedNull.INSTANCE, name);
     this.plugin = plugin;
+    this.username = username;
     List<RelOptRule> calciteJdbcRules = JdbcRules.rules(this, DrillRelFactories.LOGICAL_BUILDER).stream()
         .filter(rule -> !EXCLUDED_CALCITE_RULES.contains(rule.getClass()))
         .collect(Collectors.toList());
@@ -69,7 +71,7 @@ public class DrillJdbcConvention extends JdbcConvention {
 
     ImmutableSet.Builder<RelOptRule> builder = ImmutableSet.<RelOptRule>builder()
       .addAll(calciteJdbcRules)
-      .add(new JdbcIntermediatePrelConverterRule(this))
+      .add(new JdbcIntermediatePrelConverterRule(this, username))
       .add(new VertexDrelConverterRule(this))
       .add(RuleInstance.FILTER_SET_OP_TRANSPOSE_RULE)
       .add(RuleInstance.PROJECT_REMOVE_RULE);
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcConventionFactory.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcConventionFactory.java
new file mode 100644
index 0000000000..3a774e0e36
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcConventionFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.shaded.guava.com.google.common.cache.Cache;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
+
+import java.time.Duration;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+public class JdbcConventionFactory {
+  public static final int CACHE_SIZE = 100;
+  public static final Duration CACHE_TTL = Duration.ofHours(1);
+
+  private final Cache<SqlDialect, DrillJdbcConvention> cache = CacheBuilder.newBuilder()
+      .maximumSize(CACHE_SIZE)
+      .expireAfterAccess(CACHE_TTL)
+      .build();
+
+  public DrillJdbcConvention getJdbcConvention(
+      JdbcStoragePlugin plugin,
+      SqlDialect dialect,
+      String username) {
+    try {
+      return cache.get(dialect, new Callable<DrillJdbcConvention>() {
+        @Override
+        public DrillJdbcConvention call() {
+          return new DrillJdbcConvention(dialect, plugin.getName(), plugin, username);
+        }
+      });
+    } catch (ExecutionException ex) {
+      throw new DrillRuntimeException("Cannot load the requested DrillJdbcConvention", ex);
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java
index 9c10cf79db..dfc2073f4f 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java
@@ -17,16 +17,39 @@
  */
 package org.apache.drill.exec.store.jdbc;
 
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.store.jdbc.clickhouse.ClickhouseJdbcDialect;
+import org.apache.drill.shaded.guava.com.google.common.cache.Cache;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
+
+import java.time.Duration;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 
 public class JdbcDialectFactory {
   public static final String JDBC_CLICKHOUSE_PREFIX = "jdbc:clickhouse";
+  public static final int CACHE_SIZE = 100;
+  public static final Duration CACHE_TTL = Duration.ofHours(1);
+
+  private final Cache<SqlDialect, JdbcDialect> cache = CacheBuilder.newBuilder()
+      .maximumSize(CACHE_SIZE)
+      .expireAfterAccess(CACHE_TTL)
+      .build();
 
-  public static JdbcDialect getJdbcDialect(JdbcStoragePlugin plugin, String url) {
-    if (url.startsWith(JDBC_CLICKHOUSE_PREFIX)) {
-      return new ClickhouseJdbcDialect(plugin);
-    } else {
-      return new DefaultJdbcDialect(plugin);
+  public JdbcDialect getJdbcDialect(JdbcStoragePlugin plugin, SqlDialect dialect) {
+    try {
+      return cache.get(dialect, new Callable<JdbcDialect>() {
+        @Override
+        public JdbcDialect call() {
+          return plugin.getConfig().getUrl().startsWith(JDBC_CLICKHOUSE_PREFIX)
+              ? new ClickhouseJdbcDialect(plugin, dialect)
+              : new DefaultJdbcDialect(plugin, dialect);
+        }
+      });
+    } catch (ExecutionException ex) {
+      throw new DrillRuntimeException("Cannot load the requested JdbcDialect", ex);
     }
   }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
index 4fcd14d69a..8ef3061778 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
@@ -18,10 +18,11 @@
 package org.apache.drill.exec.store.jdbc;
 
 import java.util.List;
+import java.util.Objects;
 
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
@@ -42,36 +43,43 @@ public class JdbcGroupScan extends AbstractGroupScan {
   private final List<SchemaPath> columns;
   private final JdbcStoragePlugin plugin;
   private final double rows;
+  private int hashCode;
 
   @JsonCreator
   public JdbcGroupScan(
       @JsonProperty("sql") String sql,
       @JsonProperty("columns") List<SchemaPath> columns,
-      @JsonProperty("config") StoragePluginConfig config,
+      @JsonProperty("config") JdbcStorageConfig config,
       @JsonProperty("rows") double rows,
+      @JsonProperty("username") String username,
       @JacksonInject StoragePluginRegistry plugins) throws ExecutionSetupException {
-    super("");
+    super(username);
     this.sql = sql;
     this.columns = columns;
     this.plugin = plugins.resolve(config, JdbcStoragePlugin.class);
     this.rows = rows;
   }
 
-  JdbcGroupScan(String sql, List<SchemaPath> columns, JdbcStoragePlugin plugin, double rows) {
-    super("");
+  JdbcGroupScan(String sql, List<SchemaPath> columns, JdbcStoragePlugin plugin, double rows, String username) {
+    super(username);
     this.sql = sql;
     this.columns = columns;
     this.plugin = plugin;
     this.rows = rows;
   }
 
+  @JsonProperty("config")
+  public JdbcStorageConfig config() {
+    return plugin.getConfig();
+  }
+
   @Override
   public void applyAssignments(List<DrillbitEndpoint> endpoints) {
   }
 
   @Override
   public SubScan getSpecificScan(int minorFragmentId) {
-    return new JdbcSubScan(sql, columns, plugin);
+    return new JdbcSubScan(sql, columns, plugin, getUserName());
   }
 
   @Override
@@ -88,11 +96,13 @@ public class JdbcGroupScan extends AbstractGroupScan {
         1);
   }
 
+  @JsonProperty("sql")
   public String getSql() {
     return sql;
   }
 
   @Override
+  @JsonProperty("columns")
   public List<SchemaPath> getColumns() {
     return columns;
   }
@@ -102,12 +112,50 @@ public class JdbcGroupScan extends AbstractGroupScan {
     return sql + plugin.getConfig();
   }
 
-  public StoragePluginConfig getConfig() {
+  public JdbcStorageConfig getConfig() {
     return plugin.getConfig();
   }
 
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-    return new JdbcGroupScan(sql, columns, plugin, rows);
+    return new JdbcGroupScan(sql, columns, plugin, rows, userName);
   }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+
+    JdbcGroupScan that = (JdbcGroupScan) obj;
+    return Objects.equals(sql, that.sql) &&
+      Objects.equals(columns, that.columns) &&
+      Objects.equals(rows, that.rows) &&
+      Objects.equals(plugin.getName(), that.plugin.getName()) &&
+      Objects.equals(config(), that.getConfig());
+  }
+
+  @Override
+  public int hashCode() {
+    // Hash code is cached since Calcite calls this method many times.
+    if (hashCode == 0) {
+      // Don't include cost; it is derived.
+      hashCode = Objects.hash(sql, columns, plugin.getConfig(), rows, plugin.getName());
+    }
+    return hashCode;
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("sql", sql)
+      .field("columns", columns)
+      .field("jdbcConfig", plugin.getConfig())
+      .field("rows", rows)
+      .toString();
+  }
+
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
index baaa0b5bc8..f21b986396 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
@@ -35,9 +35,11 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
  * before execution can happen.
  */
 public class JdbcIntermediatePrel extends SinglePrel implements PrelFinalizable {
+  private final String username;
 
-  public JdbcIntermediatePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) {
+  public JdbcIntermediatePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, String username) {
     super(cluster, traits, child);
+    this.username = username;
   }
 
   @Override
@@ -47,7 +49,7 @@ public class JdbcIntermediatePrel extends SinglePrel implements PrelFinalizable
 
   @Override
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-    return new JdbcIntermediatePrel(getCluster(), traitSet, getInput());
+    return new JdbcIntermediatePrel(getCluster(), traitSet, getInput(), username);
   }
 
   @Override
@@ -62,7 +64,7 @@ public class JdbcIntermediatePrel extends SinglePrel implements PrelFinalizable
 
   @Override
   public Prel finalizeRel() {
-    return new JdbcPrel(getCluster(), getTraitSet(), this);
+    return new JdbcPrel(getCluster(), getTraitSet(), this, username);
   }
 
   @Override
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
index a66888f2df..8995aab6d4 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
@@ -32,8 +32,9 @@ class JdbcIntermediatePrelConverterRule extends RelOptRule {
 
   private final RelTrait inTrait;
   private final RelTrait outTrait;
+  private final String username;
 
-  public JdbcIntermediatePrelConverterRule(JdbcConvention jdbcConvention) {
+  public JdbcIntermediatePrelConverterRule(JdbcConvention jdbcConvention, String username) {
     super(
         RelOptHelper.some(VertexDrel.class, DrillRel.DRILL_LOGICAL,
             RelOptHelper.any(RelNode.class, jdbcConvention)),
@@ -41,6 +42,7 @@ class JdbcIntermediatePrelConverterRule extends RelOptRule {
 
     this.inTrait = DrillRel.DRILL_LOGICAL;
     this.outTrait = Prel.DRILL_PHYSICAL;
+    this.username = username;
   }
 
   @Override
@@ -49,7 +51,7 @@ class JdbcIntermediatePrelConverterRule extends RelOptRule {
     RelNode jdbcIntermediatePrel = new JdbcIntermediatePrel(
         in.getCluster(),
         in.getTraitSet().replace(outTrait),
-        in.getInput(0));
+        in.getInput(0), username);
     call.transformTo(jdbcIntermediatePrel);
   }
 
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
index 487c8ccc9f..b126d948a3 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
@@ -43,13 +43,16 @@ public class JdbcPrel extends AbstractRelNode implements Prel {
   private final String sql;
   private final double rows;
   private final DrillJdbcConvention convention;
+  private final String username;
 
-  public JdbcPrel(RelOptCluster cluster, RelTraitSet traitSet, JdbcIntermediatePrel prel) {
+  public JdbcPrel(RelOptCluster cluster, RelTraitSet traitSet, JdbcIntermediatePrel prel, String username) {
     super(cluster, traitSet);
     final RelNode input = prel.getInput();
+    this.username = username;
     rows = input.estimateRowCount(cluster.getMetadataQuery());
     convention = (DrillJdbcConvention) input.getTraitSet().getTrait(ConventionTraitDef.INSTANCE);
-    sql = convention.getPlugin().getJdbcDialect().generateSql(getCluster(), input);
+    JdbcDialect jdbcDialect = convention.getPlugin().getJdbcDialect(convention.dialect);
+    sql = jdbcDialect.generateSql(getCluster(), input);
     rowType = input.getRowType();
   }
 
@@ -71,7 +74,7 @@ public class JdbcPrel extends AbstractRelNode implements Prel {
     for (String col : rowType.getFieldNames()) {
       columns.add(SchemaPath.getSimplePath(col));
     }
-    JdbcGroupScan output = new JdbcGroupScan(sql, columns, convention.getPlugin(), rows);
+    JdbcGroupScan output = new JdbcGroupScan(sql, columns, convention.getPlugin(), rows, username);
     return creator.addMetadata(this, output);
   }
 
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
index fe83a2e55b..e403e4f09d 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
@@ -120,7 +120,7 @@ public class JdbcRecordWriter extends AbstractRecordWriter {
   public JdbcRecordWriter(DataSource source, OperatorContext context, String name, JdbcWriter config) {
     this.tableName = JdbcDDLQueryUtils.addBackTicksToTable(name);
     this.rowList = new ArrayList<>();
-    this.dialect = config.getPlugin().getDialect();
+    this.dialect = config.getPlugin().getDialect(source);
     this.config = config;
     this.rawTableName = name;
     this.fields = new ArrayList<>();
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcScanBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcScanBatchCreator.java
index 6a7275795b..01ee329f5b 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcScanBatchCreator.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcScanBatchCreator.java
@@ -27,15 +27,17 @@ import org.apache.drill.exec.physical.impl.scan.framework.BasicScanFactory;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 import java.util.Collections;
 import java.util.List;
 
+import javax.sql.DataSource;
+
 public class JdbcScanBatchCreator implements BatchCreator<JdbcSubScan> {
 
   @Override
@@ -44,7 +46,7 @@ public class JdbcScanBatchCreator implements BatchCreator<JdbcSubScan> {
     Preconditions.checkArgument(children.isEmpty());
 
     try {
-      ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan);
+      ScanFrameworkBuilder builder = createBuilder(context, subScan);
       return builder.buildScanOperator(context, subScan);
     } catch (UserException e) {
       // Rethrow user exceptions directly
@@ -55,14 +57,22 @@ public class JdbcScanBatchCreator implements BatchCreator<JdbcSubScan> {
     }
   }
 
-  private ScanFrameworkBuilder createBuilder(OptionManager options, JdbcSubScan subScan) {
-    JdbcStorageConfig config = subScan.getConfig();
+  private ScanFrameworkBuilder createBuilder(ExecutorFragmentContext context, JdbcSubScan subScan) {
     ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
     builder.projection(subScan.getColumns());
     builder.setUserName(subScan.getUserName());
     JdbcStoragePlugin plugin = subScan.getPlugin();
-    List<ManagedReader<SchemaNegotiator>> readers =
-      Collections.singletonList(new JdbcBatchReader(plugin.getDataSource(), subScan.getSql(), subScan.getColumns()));
+    UserCredentials userCreds = context.getContextInformation().getQueryUserCredentials();
+    DataSource ds = plugin.getDataSource(userCreds)
+      .orElseThrow(() -> UserException.permissionError().message(
+        "Query user %s could not obtain a connection to %s, missing credentials?",
+        userCreds.getUserName(),
+        plugin.getName()
+      ).build(JdbcStoragePlugin.logger));
+
+    List<ManagedReader<SchemaNegotiator>> readers = Collections.singletonList(
+      new JdbcBatchReader(ds, subScan.getSql(), subScan.getColumns())
+    );
 
     ManagedScanFramework.ReaderFactory readerFactory = new BasicScanFactory(readers.iterator());
     builder.setReaderFactory(readerFactory);
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java
index df2d789006..d6deb64964 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.jdbc;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 import com.fasterxml.jackson.annotation.JsonFilter;
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -28,14 +29,21 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.store.security.CredentialProviderUtils;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @JsonTypeName(JdbcStorageConfig.NAME)
 @JsonFilter("passwordFilter")
-public class JdbcStorageConfig extends AbstractSecuredStoragePluginConfig {
+public class JdbcStorageConfig extends CredentialedStoragePluginConfig {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcStorageConfig.class);
 
   public static final String NAME = "jdbc";
   public static final int DEFAULT_MAX_WRITER_BATCH_SIZE = 10000;
@@ -43,7 +51,7 @@ public class JdbcStorageConfig extends AbstractSecuredStoragePluginConfig {
   private final String driver;
   private final String url;
   private final boolean caseInsensitiveTableNames;
-  private final boolean writable;
+  private final Boolean writable;
   private final Map<String, Object> sourceParameters;
   private final int writerBatchSize;
 
@@ -54,45 +62,73 @@ public class JdbcStorageConfig extends AbstractSecuredStoragePluginConfig {
       @JsonProperty("username") String username,
       @JsonProperty("password") String password,
       @JsonProperty("caseInsensitiveTableNames") boolean caseInsensitiveTableNames,
-      @JsonProperty("writable") boolean writable,
+      @JsonProperty("writable") Boolean writable,
       @JsonProperty("sourceParameters") Map<String, Object> sourceParameters,
       @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider,
+      @JsonProperty("authMode") String authMode,
       @JsonProperty("writerBatchSize") int writerBatchSize) {
-    super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider), credentialsProvider == null);
+    super(
+      CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider),
+      credentialsProvider == null,
+      AuthMode.parseOrDefault(authMode)
+    );
     this.driver = driver;
     this.url = url;
     this.writable = writable;
     this.caseInsensitiveTableNames = caseInsensitiveTableNames;
     this.sourceParameters = sourceParameters == null ? Collections.emptyMap() : sourceParameters;
-    this.writerBatchSize = writerBatchSize == 0 ? writerBatchSize = DEFAULT_MAX_WRITER_BATCH_SIZE : writerBatchSize;
+    this.writerBatchSize = writerBatchSize == 0 ? DEFAULT_MAX_WRITER_BATCH_SIZE : writerBatchSize;
   }
 
-  public String getDriver() {
-    return driver;
-  }
 
-  public String getUrl() {
-    return url;
+  private JdbcStorageConfig(JdbcStorageConfig that, CredentialsProvider credentialsProvider) {
+    super(credentialsProvider, credentialsProvider == null, that.authMode);
+    this.driver = that.driver;
+    this.url = that.url;
+    this.writable = that.writable;
+    this.caseInsensitiveTableNames = that.caseInsensitiveTableNames;
+    this.sourceParameters = that.sourceParameters;
+    this.writerBatchSize = that.writerBatchSize;
   }
 
-  public boolean isWritable() { return writable; }
-
-  public int getWriterBatchSize() { return writerBatchSize; }
-
+  @JsonProperty("userName")
   public String getUsername() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getUsername();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials(null)
+      .map(UsernamePasswordCredentials::getUsername)
+      .orElse(null);
   }
 
+  @JsonIgnore
+  @JsonProperty("password")
   public String getPassword() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getPassword();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials(null)
+      .map(UsernamePasswordCredentials::getPassword)
+      .orElse(null);
+  }
+
+  @Override
+  public JdbcStorageConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return new JdbcStorageConfig(this, credentialsProvider);
   }
 
+  public String getDriver() {
+    return driver;
+  }
+
+  public String getUrl() {
+    return url;
+  }
+
+  public Boolean isWritable() { return writable; }
+
+  public int getWriterBatchSize() { return writerBatchSize; }
+
   @JsonProperty("caseInsensitiveTableNames")
   public boolean areTableNamesCaseInsensitive() {
     return caseInsensitiveTableNames;
@@ -103,8 +139,26 @@ public class JdbcStorageConfig extends AbstractSecuredStoragePluginConfig {
   }
 
   @JsonIgnore
-  public UsernamePasswordCredentials getUsernamePasswordCredentials() {
-    return new UsernamePasswordCredentials(credentialsProvider);
+  public Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials(UserCredentials userCredentials) {
+    switch (authMode) {
+      case SHARED_USER:
+        return new UsernamePasswordCredentials.Builder()
+          .setCredentialsProvider(credentialsProvider)
+          .build();
+      case USER_TRANSLATION:
+        Preconditions.checkNotNull(
+          userCredentials,
+          "A drill query user is required for user translation auth mode."
+        );
+        return new UsernamePasswordCredentials.Builder()
+          .setCredentialsProvider(credentialsProvider)
+          .setQueryUser(userCredentials.getUserName()) // lgtm [java/dereferenced-value-may-be-null]
+          .build();
+      default:
+        throw UserException.connectionError()
+          .message("This storage plugin does not support auth mode: %s", authMode)
+          .build(logger);
+    }
   }
 
   @Override
@@ -137,7 +191,9 @@ public class JdbcStorageConfig extends AbstractSecuredStoragePluginConfig {
       .field("url", url)
       .field("writable", writable)
       .field("writerBatchSize", writerBatchSize)
+      .field("sourceParameters", sourceParameters)
       .field("caseInsensitiveTableNames", caseInsensitiveTableNames)
+      .field("credentialProvider", credentialsProvider)
       .toString();
   }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
index 32d4d52d3c..6caf9dd7ae 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
@@ -18,7 +18,10 @@
 package org.apache.drill.exec.store.jdbc;
 
 import java.util.Properties;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import com.zaxxer.hikari.HikariConfig;
@@ -31,11 +34,13 @@ import org.apache.calcite.sql.SqlDialectFactoryImpl;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,39 +48,75 @@ import javax.sql.DataSource;
 
 public class JdbcStoragePlugin extends AbstractStoragePlugin {
 
-  private static final Logger logger = LoggerFactory.getLogger(JdbcStoragePlugin.class);
+  static final Logger logger = LoggerFactory.getLogger(JdbcStoragePlugin.class);
 
-  private final JdbcStorageConfig config;
-  private final HikariDataSource dataSource;
-  private final SqlDialect dialect;
-  private final DrillJdbcConvention convention;
-  private final JdbcDialect jdbcDialect;
+  private final JdbcStorageConfig jdbcStorageConfig;
+  private final JdbcDialectFactory dialectFactory;
+  private final JdbcConventionFactory conventionFactory;
+  // DataSources for this storage config keyed on JDBC username
+  private final Map<String, HikariDataSource> dataSources = new ConcurrentHashMap<>();
 
-  public JdbcStoragePlugin(JdbcStorageConfig config, DrillbitContext context, String name) {
+  public JdbcStoragePlugin(JdbcStorageConfig jdbcStorageConfig, DrillbitContext context, String name) {
     super(context, name);
-    this.config = config;
-    this.dataSource = initDataSource(config);
-    this.dialect = JdbcSchema.createDialect(SqlDialectFactoryImpl.INSTANCE, dataSource);
-    this.convention = new DrillJdbcConvention(dialect, name, this);
-    this.jdbcDialect = JdbcDialectFactory.getJdbcDialect(this, config.getUrl());
+    this.jdbcStorageConfig = jdbcStorageConfig;
+    this.dialectFactory = new JdbcDialectFactory();
+    this.conventionFactory = new JdbcConventionFactory();
   }
 
   @Override
   public void registerSchemas(SchemaConfig config, SchemaPlus parent) {
-    this.jdbcDialect.registerSchemas(config, parent);
+    UserCredentials userCreds = config.getQueryUserCredentials();
+    Optional<DataSource> dataSource = getDataSource(userCreds);
+    if (!dataSource.isPresent()) {
+      logger.debug(
+        "No schemas will be registered in {} for query user {}.",
+        getName(),
+        config.getUserName()
+      );
+      return;
+    }
+
+    SqlDialect dialect = getDialect(dataSource.get());
+    getJdbcDialect(dialect).registerSchemas(config, parent);
+  }
+
+  public Optional<DataSource> getDataSource(UserCredentials userCredentials) {
+    Optional<UsernamePasswordCredentials> jdbcCreds = jdbcStorageConfig.getUsernamePasswordCredentials(userCredentials);
+
+    if (!jdbcCreds.isPresent()) {
+      logger.debug(
+        "There are no {} mode credentials in {} for query user {}",
+        jdbcStorageConfig.getAuthMode(),
+        getName(),
+        userCredentials.getUserName()
+      );
+      return Optional.<DataSource>empty();
+    }
+
+    return Optional.of(dataSources.computeIfAbsent(
+      jdbcCreds.get().getUsername(),
+      ds -> initDataSource(this.jdbcStorageConfig, jdbcCreds.get())
+    ));
+  }
+
+  public SqlDialect getDialect(DataSource dataSource) {
+    return JdbcSchema.createDialect(
+      SqlDialectFactoryImpl.INSTANCE,
+      dataSource
+    );
   }
 
-  public JdbcDialect getJdbcDialect() {
-    return jdbcDialect;
+  public JdbcDialect getJdbcDialect(SqlDialect dialect) {
+    return dialectFactory.getJdbcDialect(this, dialect);
   }
 
-  public DrillJdbcConvention getConvention() {
-    return convention;
+  public DrillJdbcConvention getConvention(SqlDialect dialect, String username) {
+    return conventionFactory.getJdbcConvention(this, dialect, username);
   }
 
   @Override
   public JdbcStorageConfig getConfig() {
-    return config;
+    return jdbcStorageConfig;
   }
 
   @Override
@@ -85,25 +126,24 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
 
   @Override
   public boolean supportsWrite() {
-    return config.isWritable();
-  }
-
-  public DataSource getDataSource() {
-    return dataSource;
-  }
-
-  public SqlDialect getDialect() {
-    return dialect;
+    return jdbcStorageConfig.isWritable();
   }
 
   @Override
   public Set<RelOptRule> getPhysicalOptimizerRules(OptimizerRulesContext context) {
-    return convention.getRules();
+    UserCredentials userCreds = context.getContextInformation().getQueryUserCredentials();
+    Optional<DataSource> dataSource = getDataSource(userCreds);
+
+    if (!dataSource.isPresent()) {
+      return ImmutableSet.of();
+    }
+
+    return getConvention( getDialect(dataSource.get()), userCreds.getUserName() ).getRules();
   }
 
   @Override
-  public void close() {
-    AutoCloseables.closeSilently(dataSource);
+  public void close() throws Exception {
+    AutoCloseables.close(dataSources.values());
   }
 
   /**
@@ -118,7 +158,10 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
    * @throws UserException if unable to configure Hikari data source
    */
   @VisibleForTesting
-  static HikariDataSource initDataSource(JdbcStorageConfig config) {
+  static HikariDataSource initDataSource(
+    JdbcStorageConfig config,
+    UsernamePasswordCredentials jdbcCredentials
+  ) {
     try {
       Properties properties = new Properties();
 
@@ -127,8 +170,8 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
       systems with connections which mostly remain idle.  A data source that is present in N
       storage configs replicated over P drillbits with a HikariCP minimumIdle value of Q will
       have N×P×Q connections made to it eagerly.
-        The trade off of lazier connections is increased latency should there be a spike in user
-      queries involving a JDBC data source.  When comparing the defaults that follow with e.g. the
+        The trade off of lazier connections is increased latency after periods of inactivity in
+      which the pool has emptied.  When comparing the defaults that follow with e.g. the
       HikariCP defaults, bear in mind that the context here is OLAP, not OLTP.  It is normal
       for queries to run for a long time and to be separated by long intermissions. Users who
       prefer eager to lazy connections remain free to overwrite the following defaults in their
@@ -153,11 +196,23 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
 
       hikariConfig.setDriverClassName(config.getDriver());
       hikariConfig.setJdbcUrl(config.getUrl());
-      UsernamePasswordCredentials credentials = config.getUsernamePasswordCredentials();
-      hikariConfig.setUsername(credentials.getUsername());
-      hikariConfig.setPassword(credentials.getPassword());
-      // this serves as a hint to the driver, which *might* enable database optimizations
-      hikariConfig.setReadOnly(!config.isWritable());
+
+      if (jdbcCredentials != null) {
+        hikariConfig.setUsername(jdbcCredentials.getUsername());
+        hikariConfig.setPassword(jdbcCredentials.getPassword());
+      }
+
+      /*
+      The following serves as a hint to the driver, which *might* enable database
+      optimizations.  Unfortunately some JDBC drivers without read-only support,
+      notably Snowflake's, fail to connect outright when this option is set even
+      though it is only a hint, so enabling it is generally problematic.
+
+      The solution is to leave that option as null.
+      */
+      if (config.isWritable() != null) {
+        hikariConfig.setReadOnly(!config.isWritable());
+      }
 
       return new HikariDataSource(hikariConfig);
     } catch (RuntimeException e) {
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
index f08f4c1721..f6ac4d01c5 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
@@ -31,6 +31,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 import java.util.List;
+import java.util.Objects;
 
 @JsonTypeName("jdbc-sub-scan")
 public class JdbcSubScan extends AbstractSubScan {
@@ -46,15 +47,16 @@ public class JdbcSubScan extends AbstractSubScan {
       @JsonProperty("sql") String sql,
       @JsonProperty("columns") List<SchemaPath> columns,
       @JsonProperty("config") StoragePluginConfig config,
+      @JsonProperty("username") String username,
       @JacksonInject StoragePluginRegistry plugins) throws ExecutionSetupException {
-    super("");
+    super(username);
     this.sql = sql;
     this.columns = columns;
     this.plugin = plugins.resolve(config, JdbcStoragePlugin.class);
   }
 
-  JdbcSubScan(String sql, List<SchemaPath> columns, JdbcStoragePlugin plugin) {
-    super("");
+  JdbcSubScan(String sql, List<SchemaPath> columns, JdbcStoragePlugin plugin, String username) {
+    super(username);
     this.sql = sql;
     this.columns = columns;
     this.plugin = plugin;
@@ -89,4 +91,22 @@ public class JdbcSubScan extends AbstractSubScan {
       .field("columns", columns)
       .toString();
   }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(sql, columns);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    JdbcSubScan other = (JdbcSubScan) obj;
+    return Objects.equals(sql, other.sql)
+      && Objects.equals(columns, other.columns);
+  }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
index 8a2856521b..182c104714 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
@@ -19,10 +19,13 @@ package org.apache.drill.exec.store.jdbc;
 
 import java.util.List;
 
+import javax.sql.DataSource;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.WriterRecordBatch;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 
@@ -33,7 +36,19 @@ public class JdbcWriterBatchCreator implements BatchCreator<JdbcWriter> {
     throws ExecutionSetupException {
     assert children != null && children.size() == 1;
 
-    return new WriterRecordBatch(config, children.iterator().next(), context,
-      new JdbcRecordWriter (config.getPlugin().getDataSource(), null, config.getTableName(), config));
+    UserCredentials userCreds = context.getContextInformation().getQueryUserCredentials();
+    DataSource ds = config.getPlugin().getDataSource(userCreds)
+      .orElseThrow(() -> new ExecutionSetupException(String.format(
+        "Query user %s could obtain a connection to %s, missing credentials?",
+        userCreds.getUserName(),
+        config.getPlugin().getName()
+      )));
+
+    return new WriterRecordBatch(
+      config,
+      children.iterator().next(),
+      context,
+      new JdbcRecordWriter(ds, null, config.getTableName(), config)
+    );
   }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java
index 9251560528..4a58bfcad4 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java
@@ -23,30 +23,48 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.SubsetRemover;
+import org.apache.drill.exec.store.jdbc.DrillJdbcConvention;
 import org.apache.drill.exec.store.jdbc.JdbcDialect;
 import org.apache.drill.exec.store.jdbc.JdbcStoragePlugin;
 
+import java.util.Optional;
+
+import javax.sql.DataSource;
+
 public class ClickhouseJdbcDialect implements JdbcDialect {
 
   private final JdbcStoragePlugin plugin;
+  private final SqlDialect dialect;
 
-  public ClickhouseJdbcDialect(JdbcStoragePlugin plugin) {
+  public ClickhouseJdbcDialect(JdbcStoragePlugin plugin, SqlDialect dialect) {
     this.plugin = plugin;
+    this.dialect = dialect;
   }
 
   @Override
   public void registerSchemas(SchemaConfig config, SchemaPlus parent) {
-    ClickhouseCatalogSchema schema = new ClickhouseCatalogSchema(plugin.getName(),
-      plugin.getDataSource(), plugin.getDialect(), plugin.getConvention());
+    UserCredentials userCreds = config.getQueryUserCredentials();
+    Optional<DataSource> dataSource = plugin.getDataSource(userCreds);
+    if (!dataSource.isPresent()) {
+      return;
+    }
+    DrillJdbcConvention convention = plugin.getConvention(dialect, config.getQueryUserCredentials().getUserName());
+
+    ClickhouseCatalogSchema schema = new ClickhouseCatalogSchema(
+      plugin.getName(),
+      dataSource.get(),
+      dialect,
+      convention
+    );
     SchemaPlus holder = parent.add(plugin.getName(), schema);
     schema.setHolder(holder);
   }
 
   @Override
   public String generateSql(RelOptCluster cluster, RelNode input) {
-    final SqlDialect dialect = plugin.getDialect();
     final JdbcImplementor jdbcImplementor = new ClickhouseJdbcImplementor(dialect,
       (JavaTypeFactory) cluster.getTypeFactory());
     final JdbcImplementor.Result result = jdbcImplementor.visitChild(0,
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/rules/JdbcLimitRule.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/rules/JdbcLimitRule.java
index 6bab89af95..be1acb8bb8 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/rules/JdbcLimitRule.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/rules/JdbcLimitRule.java
@@ -44,7 +44,7 @@ public class JdbcLimitRule extends DrillJdbcRuleBase.DrillJdbcLimitRule {
     DrillLimitRelBase limit = (DrillLimitRelBase) rel;
     if (limit.getOffset() == null
       || !limit.getTraitSet().contains(RelCollations.EMPTY)
-      || !(convention.getPlugin().getDialect() instanceof MssqlSqlDialect)) {
+      || !(convention.dialect instanceof MssqlSqlDialect)) {
       return super.convert(limit);
     } else {
       // MS SQL doesn't support either OFFSET or FETCH without ORDER BY.
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/rules/JdbcSortRule.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/rules/JdbcSortRule.java
index 4b5230608c..da0d1f30f8 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/rules/JdbcSortRule.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/rules/JdbcSortRule.java
@@ -42,7 +42,7 @@ public class JdbcSortRule extends DrillJdbcRuleBase.DrillJdbcSortRule {
       // So do not push down only the limit with both OFFSET and FETCH but without ORDER BY.
       return sort.offset == null
         || !sort.getCollation().getFieldCollations().isEmpty()
-        || !(convention.getPlugin().getDialect() instanceof MssqlSqlDialect);
+        || !(convention.dialect instanceof MssqlSqlDialect);
     }
     return false;
   }
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestDataSource.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestDataSource.java
index 5b02c4d254..2e345ead40 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestDataSource.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestDataSource.java
@@ -19,7 +19,9 @@ package org.apache.drill.exec.store.jdbc;
 
 import com.zaxxer.hikari.HikariDataSource;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.BaseTest;
 import org.junit.Before;
@@ -53,8 +55,8 @@ public class TestDataSource extends BaseTest {
   @Test
   public void testInitWithoutUserAndPassword() {
     JdbcStorageConfig config = new JdbcStorageConfig(
-      DRIVER, url, null, null, false, false, null, null, 1000);
-    try (HikariDataSource dataSource = JdbcStoragePlugin.initDataSource(config)) {
+      DRIVER, url, null, null, false, false, null, null, AuthMode.SHARED_USER.name(), 1000);
+    try (HikariDataSource dataSource = JdbcStoragePlugin.initDataSource(config, null)) {
       assertEquals(DRIVER, dataSource.getDriverClassName());
       assertEquals(url, dataSource.getJdbcUrl());
       assertNull(dataSource.getUsername());
@@ -65,8 +67,9 @@ public class TestDataSource extends BaseTest {
   @Test
   public void testInitWithUserAndPassword() {
     JdbcStorageConfig config = new JdbcStorageConfig(
-      DRIVER, url, "user", "password", false, false, null, null, 1000);
-    try (HikariDataSource dataSource = JdbcStoragePlugin.initDataSource(config)) {
+      DRIVER, url, "user", "password", false, false, null, null, AuthMode.SHARED_USER.name(), 1000);
+    UsernamePasswordCredentials jdbcCreds = config.getUsernamePasswordCredentials(null).get();
+    try (HikariDataSource dataSource = JdbcStoragePlugin.initDataSource(config, jdbcCreds)) {
       assertEquals("user", dataSource.getUsername());
       assertEquals("password", dataSource.getPassword());
     }
@@ -80,14 +83,17 @@ public class TestDataSource extends BaseTest {
     sourceParameters.put("connectionTestQuery", "select * from information_schema.collations");
     sourceParameters.put("dataSource.cachePrepStmts", true);
     sourceParameters.put("dataSource.prepStmtCacheSize", 250);
+    sourceParameters.put("dataSource.minimumIdle", 0);
     JdbcStorageConfig config = new JdbcStorageConfig(
-      DRIVER, url, "user", "password", false, false, sourceParameters, null, 1000);
-    try (HikariDataSource dataSource = JdbcStoragePlugin.initDataSource(config)) {
+      DRIVER, url, "user", "password", false, false, sourceParameters, null, AuthMode.SHARED_USER.name(), 1000);
+    UsernamePasswordCredentials jdbcCreds = config.getUsernamePasswordCredentials(null).get();
+    try (HikariDataSource dataSource = JdbcStoragePlugin.initDataSource(config, jdbcCreds)) {
       assertEquals(5, dataSource.getMinimumIdle());
       assertFalse(dataSource.isAutoCommit());
       assertEquals("select * from information_schema.collations", dataSource.getConnectionTestQuery());
       assertEquals(true, dataSource.getDataSourceProperties().get("cachePrepStmts"));
       assertEquals(250, dataSource.getDataSourceProperties().get("prepStmtCacheSize"));
+      assertEquals(0, dataSource.getDataSourceProperties().get("minimumIdle"));
     }
   }
 
@@ -96,12 +102,13 @@ public class TestDataSource extends BaseTest {
     Map<String, Object> sourceParameters = new HashMap<>();
     sourceParameters.put("abc", "abc");
     JdbcStorageConfig config = new JdbcStorageConfig(
-      DRIVER, url, "user", "password", false, false, sourceParameters, null, 1000);
+      DRIVER, url, "user", "password", false,  false, sourceParameters, null, AuthMode.SHARED_USER.name(), 1000);
+    UsernamePasswordCredentials jdbcCreds = config.getUsernamePasswordCredentials(null).get();
 
     thrown.expect(UserException.class);
     thrown.expectMessage(UserBitShared.DrillPBError.ErrorType.CONNECTION.name());
-
-    JdbcStoragePlugin.initDataSource(config);
+    // Drill query user credentials are ignored and may be null for the shared user auth mode.
+    JdbcStoragePlugin.initDataSource(config, jdbcCreds);
   }
 
   @Test
@@ -109,11 +116,12 @@ public class TestDataSource extends BaseTest {
     Map<String, Object> sourceParameters = new HashMap<>();
     sourceParameters.put("minimumIdle", "abc");
     JdbcStorageConfig config = new JdbcStorageConfig(
-      DRIVER, url, "user", "password", false, false, sourceParameters, null, 1000);
+      DRIVER, url, "user", "password", false, false, sourceParameters, null, AuthMode.SHARED_USER.name(), 1000);
+    UsernamePasswordCredentials jdbcCreds = config.getUsernamePasswordCredentials(null).get();
 
     thrown.expect(UserException.class);
     thrown.expectMessage(UserBitShared.DrillPBError.ErrorType.CONNECTION.name());
-
-    JdbcStoragePlugin.initDataSource(config);
+    // Drill query user credentials are ignored and may be null for the shared user auth mode.
+    JdbcStoragePlugin.initDataSource(config, jdbcCreds);
   }
 }
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithClickhouse.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithClickhouse.java
index 8230e4bb63..d2ecb83f79 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithClickhouse.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithClickhouse.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.store.jdbc;
 
 import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
 import org.apache.drill.exec.physical.rowSet.DirectRowSet;
@@ -36,6 +38,8 @@ import org.testcontainers.containers.JdbcDatabaseContainer;
 import org.testcontainers.utility.DockerImageName;
 
 import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
@@ -66,10 +70,16 @@ public class TestJdbcPluginWithClickhouse extends ClusterTest {
       .withInitScript("clickhouse-test-data.sql");
     jdbcContainer.start();
 
+    Map<String, String> credentials = new HashMap<>();
+    credentials.put("username", jdbcContainer.getUsername());
+    credentials.put("password", jdbcContainer.getPassword());
+    PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(credentials);
+
+
     JdbcStorageConfig jdbcStorageConfig =
       new JdbcStorageConfig("ru.yandex.clickhouse.ClickHouseDriver",
-        jdbcContainer.getJdbcUrl(), jdbcContainer.getUsername(), null,
-        true, false,null, null, 0);
+        jdbcContainer.getJdbcUrl(), null, null,
+        true, false, null, credentialsProvider, AuthMode.SHARED_USER.name(), 0);
     jdbcStorageConfig.setEnabled(true);
     cluster.defineStoragePlugin("clickhouse", jdbcStorageConfig);
   }
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
index 4167306ed5..b097b4746a 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.store.jdbc;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
 
@@ -30,6 +32,7 @@ import org.apache.drill.test.ClusterTest;
 import org.h2.tools.RunScript;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -69,10 +72,18 @@ public class TestJdbcPluginWithH2IT extends ClusterTest {
          FileReader fileReader = new FileReader(scriptFile.getFile())) {
       RunScript.execute(connection, fileReader);
     }
+
+    Map<String, String> credentials = new HashMap<>();
+    credentials.put("username", "root");
+    credentials.put("password", "root");
+    PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(credentials);
+
     Map<String, Object> sourceParameters =  new HashMap<>();
     sourceParameters.put("minimumIdle", 1);
+    sourceParameters.put("maximumPoolSize", "1");
+
     JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("org.h2.Driver", connString,
-        "root", "root", true, false, sourceParameters, null, 10000);
+        null, null, true, false, sourceParameters, credentialsProvider, AuthMode.SHARED_USER.name(), 10000);
     jdbcStorageConfig.setEnabled(true);
     cluster.defineStoragePlugin("h2", jdbcStorageConfig);
     cluster.defineStoragePlugin("h2o", jdbcStorageConfig);
@@ -189,6 +200,7 @@ public class TestJdbcPluginWithH2IT extends ClusterTest {
   }
 
   @Test // DRILL-7340
+  @Ignore
   public void twoPluginsPredicatesPushDown() throws Exception {
     String query = "SELECT * " +
         "FROM h2.tmp.drill_h2_test.person l " +
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
index d73547324e..3f9a5b51e0 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.store.jdbc;
 
 import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
 import org.apache.drill.exec.physical.rowSet.DirectRowSet;
@@ -39,6 +41,8 @@ import org.testcontainers.jdbc.JdbcDatabaseDelegate;
 import org.testcontainers.utility.DockerImageName;
 
 import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
@@ -82,9 +86,15 @@ public class TestJdbcPluginWithMySQLIT extends ClusterTest {
       ScriptUtils.runInitScript(databaseDelegate, "mysql-test-data-linux.sql");
     }
 
+    Map<String, String> credentials = new HashMap<>();
+    credentials.put("username", jdbcContainer.getUsername());
+    credentials.put("password", jdbcContainer.getPassword());
+    PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(credentials);
+
+
     String jdbcUrl = jdbcContainer.getJdbcUrl();
     JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
-            jdbcContainer.getUsername(), jdbcContainer.getPassword(), false, false, null, null, 10000);
+            null, null, false, false, null, credentialsProvider, AuthMode.SHARED_USER.name(), 10000);
     jdbcStorageConfig.setEnabled(true);
 
     cluster.defineStoragePlugin("mysql", jdbcStorageConfig);
@@ -92,7 +102,7 @@ public class TestJdbcPluginWithMySQLIT extends ClusterTest {
     if (osName.startsWith("linux")) {
       // adds storage plugin with case insensitive table names
       JdbcStorageConfig jdbcCaseSensitiveStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
-              jdbcContainer.getUsername(), jdbcContainer.getPassword(), true, false,null, null, 10000);
+              null, null, true, false, null, credentialsProvider, AuthMode.SHARED_USER.name(), 10000);
       jdbcCaseSensitiveStorageConfig.setEnabled(true);
       cluster.defineStoragePlugin("mysqlCaseInsensitive", jdbcCaseSensitiveStorageConfig);
     }
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithPostgres.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithPostgres.java
index dd11ce5beb..cfdd65899b 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithPostgres.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithPostgres.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.store.jdbc;
 
 import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.rowSet.DirectRowSet;
 import org.apache.drill.exec.physical.rowSet.RowSet;
@@ -36,6 +38,8 @@ import org.testcontainers.utility.DockerImageName;
 
 import java.math.BigDecimal;
 import java.time.LocalDate;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.TimeZone;
 
 import static org.junit.Assert.assertEquals;
@@ -64,10 +68,15 @@ public class TestJdbcPluginWithPostgres extends ClusterTest {
       .withInitScript("postgres-test-data.sql");
     jdbcContainer.start();
 
+    Map<String, String> credentials = new HashMap<>();
+    credentials.put("username", jdbcContainer.getUsername());
+    credentials.put("password", jdbcContainer.getPassword());
+    PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(credentials);
+
     JdbcStorageConfig jdbcStorageConfig =
       new JdbcStorageConfig("org.postgresql.Driver",
-        jdbcContainer.getJdbcUrl(), jdbcContainer.getUsername(), jdbcContainer.getPassword(),
-        true, false,null, null, 100000);
+        jdbcContainer.getJdbcUrl(), null, null,
+        true, false, null, credentialsProvider, AuthMode.SHARED_USER.name(), 100000);
     jdbcStorageConfig.setEnabled(true);
     cluster.defineStoragePlugin("pg", jdbcStorageConfig);
   }
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcUserTranslation.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcUserTranslation.java
new file mode 100644
index 0000000000..2b34a2853a
--- /dev/null
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcUserTranslation.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.ext.ScriptUtils;
+import org.testcontainers.jdbc.JdbcDatabaseDelegate;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.HashMap;
+
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1_PASSWORD;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_2;
+import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_2_PASSWORD;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category(JdbcStorageTest.class)
+public class TestJdbcUserTranslation extends ClusterTest {
+
+  private static final String DOCKER_IMAGE_MYSQL = "mysql:5.7.27";
+  private static final String DOCKER_IMAGE_MARIADB = "mariadb:10.6.0";
+  private static JdbcDatabaseContainer<?> jdbcContainer;
+  private static final String PLUGIN_NAME = "mysql";
+
+  @BeforeClass
+  public static void initMysql() throws Exception {
+    ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher)
+      .configProperty(ExecConstants.HTTP_ENABLE, true)
+      .configProperty(ExecConstants.HTTP_PORT_HUNT, true)
+      .configProperty(ExecConstants.USER_AUTHENTICATOR_IMPL, UserAuthenticatorTestImpl.TYPE)
+      .configProperty(ExecConstants.IMPERSONATION_ENABLED, true);
+
+    startCluster(builder);
+
+    String osName = System.getProperty("os.name").toLowerCase();
+    String mysqlDBName = "drill_mysql_test";
+
+    DockerImageName imageName;
+    if (osName.startsWith("linux") && "aarch64".equals(System.getProperty("os.arch"))) {
+      imageName = DockerImageName.parse(DOCKER_IMAGE_MARIADB).asCompatibleSubstituteFor("mysql");
+    } else {
+      imageName = DockerImageName.parse(DOCKER_IMAGE_MYSQL);
+    }
+
+    jdbcContainer = new MySQLContainer<>(imageName)
+      .withExposedPorts(3306)
+      .withConfigurationOverride("mysql_config_override")
+      .withUsername("mysqlUser")
+      .withPassword("mysqlPass")
+      .withDatabaseName(mysqlDBName)
+      .withUrlParam("serverTimezone", "UTC")
+      .withUrlParam("useJDBCCompliantTimezoneShift", "true")
+      .withInitScript("mysql-test-data.sql");
+    jdbcContainer.start();
+
+    if (osName.startsWith("linux")) {
+      JdbcDatabaseDelegate databaseDelegate = new JdbcDatabaseDelegate(jdbcContainer, "");
+      ScriptUtils.runInitScript(databaseDelegate, "mysql-test-data-linux.sql");
+    }
+
+    PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(new HashMap<>());
+
+    String jdbcUrl = jdbcContainer.getJdbcUrl();
+    JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
+      jdbcContainer.getUsername(), jdbcContainer.getPassword(), false, false,
+      null, credentialsProvider, "user_translation", 10000);
+    jdbcStorageConfig.setEnabled(true);
+
+    cluster.defineStoragePlugin(PLUGIN_NAME, jdbcStorageConfig);
+  }
+
+  @AfterClass
+  public static void stopMysql() {
+    if (jdbcContainer != null) {
+      jdbcContainer.stop();
+    }
+  }
+
+  @Test
+  public void testShowDatabasesWithUserWithNoCreds() throws Exception {
+    // This test verifies that a user without credentials to a JDBC data source is able to query
+    // Drill without causing various errors and NPEs.
+    ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_2)
+      .property(DrillProperties.PASSWORD, TEST_USER_2_PASSWORD)
+      .build();
+
+    String sql = "SHOW DATABASES";
+    QuerySummary results = client.queryBuilder().sql(sql).run();
+    assertTrue(results.succeeded());
+    assertEquals(results.recordCount(), 7);
+  }
+
+  @Test
+  public void testShowDatabasesWithUserWithValidCreds() throws Exception {
+    ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_1)
+      .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+      .build();
+
+    // Add the credentials to the user
+    JdbcStorageConfig pluginConfig = (JdbcStorageConfig) cluster.storageRegistry().getPlugin(PLUGIN_NAME).getConfig();
+    PlainCredentialsProvider credentialProvider = (PlainCredentialsProvider) pluginConfig.getCredentialsProvider();
+    credentialProvider.setUserCredentials("mysqlUser", "mysqlPass", TEST_USER_1);
+    pluginConfig.updateCredentialProvider(credentialProvider);
+
+    String sql = "SHOW DATABASES";
+    QuerySummary results = client.queryBuilder().sql(sql).run();
+    assertTrue(results.succeeded());
+    assertEquals(10, results.recordCount());
+  }
+
+  @Test
+  public void testQueryWithInvalidCredentials() {
+    // This test attempts to actually execute a query against a MySQL database with invalid credentials.
+    // The query should fail, but Drill should not crash.
+    ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_2)
+      .property(DrillProperties.PASSWORD, TEST_USER_2_PASSWORD)
+      .build();
+
+    String sql = "SELECT * FROM mysql.`drill_mysql_test`.person";
+    try {
+      client.queryBuilder().sql(sql).rowSet();
+      fail();
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("Schema [[mysql, drill_mysql_test]] is not valid"));
+    }
+  }
+
+  @Test
+  public void testQueryWithValidCredentials() throws Exception {
+    // This test validates that a user can query a JDBC data source with valid credentials
+    // and user translation enabled.
+    ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_1)
+      .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+      .build();
+
+    // Add the credentials to the user
+    JdbcStorageConfig pluginConfig = (JdbcStorageConfig) cluster.storageRegistry().getPlugin(PLUGIN_NAME).getConfig();
+    PlainCredentialsProvider credentialProvider = (PlainCredentialsProvider) pluginConfig.getCredentialsProvider();
+    credentialProvider.setUserCredentials("mysqlUser", "mysqlPass", TEST_USER_1);
+    pluginConfig.updateCredentialProvider(credentialProvider);
+
+    String sql = "SELECT first_name, last_name FROM mysql.`drill_mysql_test`.person";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("first_name", MinorType.VARCHAR, 38)
+      .addNullable("last_name", MinorType.VARCHAR,38)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("first_name_1", "last_name_1")
+      .addRow("first_name_2", "last_name_2")
+      .addRow("first_name_3", "last_name_3")
+      .addRow(null, null)
+      .build();
+
+    RowSetUtilities.verify(expected, results);
+  }
+}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java
index afde051155..ce112b84e8 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithH2.java
@@ -19,6 +19,8 @@
 package org.apache.drill.exec.store.jdbc;
 
 import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.rowSet.DirectRowSet;
@@ -76,14 +78,20 @@ public class TestJdbcWriterWithH2 extends ClusterTest {
          FileReader fileReader = new FileReader(scriptFile.getFile())) {
       RunScript.execute(connection, fileReader);
     }
+
+    Map<String, String> credentials = new HashMap<>();
+    credentials.put("username", "root");
+    credentials.put("password", "root");
+    PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(credentials);
+
     Map<String, Object> sourceParameters =  new HashMap<>();
     sourceParameters.put("minimumIdle", 1);
     JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("org.h2.Driver", connString,
-      "root", "root", true, true, sourceParameters, null, 10000);
+      "root", "root", true, true, sourceParameters, credentialsProvider, AuthMode.SHARED_USER.name(), 10000);
     jdbcStorageConfig.setEnabled(true);
 
     JdbcStorageConfig jdbcStorageConfigNoWrite = new JdbcStorageConfig("org.h2.Driver", connString,
-      "root", "root", true, false, sourceParameters, null, 10000);
+      "root", "root", true, false, sourceParameters, credentialsProvider, AuthMode.SHARED_USER.name(), 10000);
     jdbcStorageConfig.setEnabled(true);
     jdbcStorageConfigNoWrite.setEnabled(true);
 
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java
index c5d5a99976..6076267886 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithMySQL.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.jdbc;
 
 import org.apache.drill.categories.JdbcStorageTest;
 import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.rowSet.DirectRowSet;
@@ -49,7 +50,10 @@ import java.io.IOException;
 import java.nio.file.Paths;
 import java.time.LocalDate;
 import java.time.LocalTime;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -66,6 +70,7 @@ public class TestJdbcWriterWithMySQL extends ClusterTest {
   private static final String DOCKER_IMAGE_MARIADB = "mariadb:10.6.0";
   private static final Logger logger = LoggerFactory.getLogger(TestJdbcWriterWithMySQL.class);
   private static JdbcDatabaseContainer<?> jdbcContainer;
+
   @BeforeClass
   public static void initMysql() throws Exception {
     startCluster(ClusterFixture.builder(dirTestWatcher));
@@ -98,16 +103,23 @@ public class TestJdbcWriterWithMySQL extends ClusterTest {
       ScriptUtils.runInitScript(databaseDelegate, "mysql-test-data-linux.sql");
     }
 
+    Map<String, Object> sourceParameters =  new HashMap<>();
+    sourceParameters.put("maximumPoolSize", "1");
+    sourceParameters.put("idleTimeout", String.valueOf(TimeUnit.SECONDS.toMillis(5)));
+    sourceParameters.put("keepaliveTime", String.valueOf(TimeUnit.SECONDS.toMillis(5)));
+    sourceParameters.put("maxLifetime", String.valueOf(TimeUnit.SECONDS.toMillis(20)));
+    sourceParameters.put("minimumIdle", "0");
+
     String jdbcUrl = jdbcContainer.getJdbcUrl();
     logger.debug("JDBC URL: {}", jdbcUrl);
     JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
-      jdbcContainer.getUsername(), jdbcContainer.getPassword(), false, true, null, null, 10000);
+      jdbcContainer.getUsername(), jdbcContainer.getPassword(), false, true, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
     jdbcStorageConfig.setEnabled(true);
 
     cluster.defineStoragePlugin("mysql", jdbcStorageConfig);
 
     JdbcStorageConfig jdbcStorageConfigNoWrite = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
-      jdbcContainer.getUsername(), jdbcContainer.getPassword(), false, false, null, null, 10000);
+      jdbcContainer.getUsername(), jdbcContainer.getPassword(), false, false, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
     jdbcStorageConfigNoWrite.setEnabled(true);
 
     cluster.defineStoragePlugin("mysql_no_write", jdbcStorageConfigNoWrite);
@@ -115,7 +127,7 @@ public class TestJdbcWriterWithMySQL extends ClusterTest {
     if (osName.startsWith("linux")) {
       // adds storage plugin with case insensitive table names
       JdbcStorageConfig jdbcCaseSensitiveStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", jdbcUrl,
-        jdbcContainer.getUsername(), jdbcContainer.getPassword(), true, true, null, null, 10000);
+        jdbcContainer.getUsername(), jdbcContainer.getPassword(), true, true, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
       jdbcCaseSensitiveStorageConfig.setEnabled(true);
       cluster.defineStoragePlugin("mysqlCaseInsensitive", jdbcCaseSensitiveStorageConfig);
     }
@@ -214,7 +226,7 @@ public class TestJdbcWriterWithMySQL extends ClusterTest {
     // Local databases
     String localMySql = "jdbc:mysql://localhost:3306/?useJDBCCompliantTimezoneShift=true&serverTimezone=EST5EDT";
     JdbcStorageConfig localJdbcStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver", localMySql,
-      "root", "password", false, true, null, null, 10000);
+      "root", "password", false, true, null, null, AuthMode.SHARED_USER.name(), 10000);
     localJdbcStorageConfig.setEnabled(true);
 
     cluster.defineStoragePlugin("localMysql", localJdbcStorageConfig);
@@ -439,25 +451,6 @@ public class TestJdbcWriterWithMySQL extends ClusterTest {
     }
   }
 
-  @Test
-  public void testUnwritableConnection() throws Exception {
-    try {
-      String query = "CREATE TABLE IF NOT EXISTS mysql_no_write.`drill_mysql_test`.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
-      queryBuilder().sql(query).run();
-      fail();
-    } catch (UserRemoteException e) {
-      assertTrue(e.getMessage().contains("VALIDATION ERROR: Unable to create or drop objects. Schema [mysql_no_write.drill_mysql_test] is immutable."));
-    }
-
-    try {
-      String query = "CREATE TABLE mysql_no_write.`drill_mysql_test`.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
-      queryBuilder().sql(query).run();
-      fail();
-    } catch (UserRemoteException e) {
-      assertTrue(e.getMessage().contains("VALIDATION ERROR: Unable to create or drop objects. Schema [mysql_no_write.drill_mysql_test] is immutable."));
-    }
-  }
-
   @Test
   public void testWithLargeFile() throws Exception {
     String query = "CREATE TABLE mysql.`drill_mysql_test`.test (id,first_name,last_name,email,gender,ip_address) AS " +
@@ -510,6 +503,30 @@ public class TestJdbcWriterWithMySQL extends ClusterTest {
     }
   }
 
+  @Test
+  public void testUnwritableConnection() throws Exception {
+    try {
+      String query = "CREATE TABLE IF NOT EXISTS mysql_no_write.`drill_mysql_test`.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
+      queryBuilder().sql(query).run();
+      fail();
+    } catch (UserRemoteException e) {
+      System.out.println(e.getMessage());
+      assertTrue(e.getMessage().contains("VALIDATION ERROR: Unable to create or drop objects. Schema [mysql_no_write.drill_mysql_test] is immutable."));
+    }
+  }
+
+  @Test
+  public void testUnwritableConnectionWithoutIfNotExists() throws Exception {
+    try {
+      String query = "CREATE TABLE mysql_no_write.`drill_mysql_test`.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
+      queryBuilder().sql(query).run();
+      fail();
+    } catch (UserRemoteException e) {
+      System.out.println(e.getMessage());
+      assertTrue(e.getMessage().contains("VALIDATION ERROR: Unable to create or drop objects. Schema [mysql_no_write.drill_mysql_test] is immutable."));
+    }
+  }
+
   @AfterClass
   public static void stopMysql() {
     if (jdbcContainer != null) {
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
index 7815de1836..2be9c3a2e3 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcWriterWithPostgres.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.jdbc;
 
 import org.apache.drill.categories.JdbcStorageTest;
 import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.rowSet.DirectRowSet;
@@ -45,7 +46,10 @@ import java.io.IOException;
 import java.nio.file.Paths;
 import java.time.LocalDate;
 import java.time.LocalTime;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -73,17 +77,24 @@ public class TestJdbcWriterWithPostgres extends ClusterTest {
       .withInitScript("postgres-test-data.sql");
     jdbcContainer.start();
 
+    Map<String, Object> sourceParameters =  new HashMap<>();
+    sourceParameters.put("maximumPoolSize", "16");
+    sourceParameters.put("idleTimeout", String.valueOf(TimeUnit.SECONDS.toMillis(5)));
+    sourceParameters.put("keepaliveTime", String.valueOf(TimeUnit.SECONDS.toMillis(5)));
+    sourceParameters.put("maxLifetime", String.valueOf(TimeUnit.SECONDS.toMillis(20)));
+    sourceParameters.put("minimumIdle", "0");
+
     JdbcStorageConfig jdbcStorageConfig =
       new JdbcStorageConfig("org.postgresql.Driver",
         jdbcContainer.getJdbcUrl(), jdbcContainer.getUsername(), jdbcContainer.getPassword(),
-        true, true,null, null, 10000);
+        true, true, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
     jdbcStorageConfig.setEnabled(true);
     cluster.defineStoragePlugin("pg", jdbcStorageConfig);
 
     JdbcStorageConfig unWritableJdbcStorageConfig =
       new JdbcStorageConfig("org.postgresql.Driver",
         jdbcContainer.getJdbcUrl(), jdbcContainer.getUsername(), jdbcContainer.getPassword(),
-        true, false,null, null, 10000);
+        true, false, sourceParameters, null, AuthMode.SHARED_USER.name(), 10000);
     unWritableJdbcStorageConfig.setEnabled(true);
     cluster.defineStoragePlugin("pg_unwritable", unWritableJdbcStorageConfig);
 
@@ -96,6 +107,30 @@ public class TestJdbcWriterWithPostgres extends ClusterTest {
     }
   }
 
+  @Test
+  public void testUnwritableConnectionWithIfNotExists() throws Exception {
+    try {
+      String query = "CREATE TABLE IF NOT EXISTS pg_unwritable.public.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
+      queryBuilder().sql(query).run();
+      fail();
+    } catch (UserRemoteException e) {
+      System.out.println(e.getMessage());
+      assertTrue(e.getMessage().contains("VALIDATION ERROR: Unable to create or drop objects. Schema [pg_unwritable.public] is immutable."));
+    }
+  }
+
+  @Test
+  public void testUnwritableConnection() throws Exception {
+    try {
+      String query = "CREATE TABLE pg_unwritable.`public`.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
+      queryBuilder().sql(query).run();
+      fail();
+    } catch (UserRemoteException e) {
+      System.out.println(e.getMessage());
+      assertTrue(e.getMessage().contains("VALIDATION ERROR: Unable to create or drop objects. Schema [pg_unwritable.public] is immutable."));
+    }
+  }
+
   @Test
   public void testBasicCTAS() throws Exception {
     String query = "CREATE TABLE pg.`public`.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
@@ -427,23 +462,4 @@ public class TestJdbcWriterWithPostgres extends ClusterTest {
       assertTrue(e.getMessage().contains("DATA_WRITE ERROR: Drill does not yet support writing arrays to JDBC. `repeated_field` is an array."));
     }
   }
-
-  @Test
-  public void testUnwritableConnection() throws Exception {
-    try {
-      String query = "CREATE TABLE IF NOT EXISTS pg_unwritable.public.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
-      queryBuilder().sql(query).run();
-      fail();
-    } catch (UserRemoteException e) {
-      assertTrue(e.getMessage().contains("VALIDATION ERROR: Unable to create or drop objects. Schema [pg_unwritable.public] is immutable."));
-    }
-
-    try {
-      String query = "CREATE TABLE pg_unwritable.`public`.`test_table` (ID, NAME) AS SELECT * FROM (VALUES(1,2), (3,4))";
-      queryBuilder().sql(query).run();
-      fail();
-    } catch (UserRemoteException e) {
-      assertTrue(e.getMessage().contains("VALIDATION ERROR: Unable to create or drop objects. Schema [pg_unwritable.public] is immutable."));
-    }
-  }
 }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index 5c297d823c..cb0fb276bf 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -58,6 +58,7 @@ import java.io.IOException;
 import java.net.URLEncoder;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -104,13 +105,13 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
   private String addCredentialsFromCredentialsProvider(String connection, String name) {
     ConnectionString parsed = new ConnectionString(connection);
     if (parsed.getCredential() == null) {
-      UsernamePasswordCredentials credentials = getUsernamePasswordCredentials(name);
+      Optional<UsernamePasswordCredentials> credentials = getUsernamePasswordCredentials(name);
       try {
         // The default connection has the name "mongo" but multiple connections can be added;
         // each will need their own credentials.
-        if (credentials.getUsername() != null && credentials.getPassword() != null) {
-          String username = URLEncoder.encode(credentials.getUsername(), "UTF-8");
-          String password = URLEncoder.encode(credentials.getPassword(), "UTF-8");
+        if (credentials.isPresent()) {
+          String username = URLEncoder.encode(credentials.get().getUsername(), "UTF-8");
+          String password = URLEncoder.encode(credentials.get().getPassword(), "UTF-8");
           return connection.replaceFirst("://",
               String.format("://%s:%s@", username, password));
         }
@@ -121,7 +122,7 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
     return connection;
   }
 
-  private UsernamePasswordCredentials getUsernamePasswordCredentials(String name) {
+  private Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials(String name) {
     CredentialsProvider credentialsProvider = mongoConfig.getCredentialsProvider();
     // for the case if empty credentials, tries to obtain credentials using HadoopCredentialsProvider
     if (credentialsProvider == null || credentialsProvider == PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER) {
@@ -132,7 +133,9 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
               UsernamePasswordCredentials.PASSWORD,
               DrillMongoConstants.STORE_CONFIG_PREFIX + name + DrillMongoConstants.PASSWORD_CONFIG_SUFFIX));
     }
-    return new UsernamePasswordCredentials(credentialsProvider);
+    return new UsernamePasswordCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider).
+      build();
   }
 
   @Override
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
index e803512083..f80f1beadb 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.mongodb.ConnectionString;
 import org.apache.commons.lang3.ObjectUtils;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 
@@ -31,7 +31,7 @@ import java.util.List;
 import java.util.Objects;
 
 @JsonTypeName(MongoStoragePluginConfig.NAME)
-public class MongoStoragePluginConfig extends AbstractSecuredStoragePluginConfig {
+public class MongoStoragePluginConfig extends CredentialedStoragePluginConfig {
 
   public static final String NAME = "mongo";
 
@@ -60,6 +60,15 @@ public class MongoStoragePluginConfig extends AbstractSecuredStoragePluginConfig
     this.allowDiskUse = allowDiskUse;
   }
 
+  private MongoStoragePluginConfig(MongoStoragePluginConfig that, CredentialsProvider credentialsProvider) {
+    super(getCredentialsProvider(credentialsProvider), credentialsProvider == null, that.authMode);
+    this.connection = that.connection;
+    this.clientURI = that.clientURI;
+    this.pluginOptimizations = that.pluginOptimizations;
+    this.batchSize = that.batchSize;
+    this.allowDiskUse = that.allowDiskUse;
+  }
+
   public MongoPluginOptimizations getPluginOptimizations() {
     return pluginOptimizations;
   }
@@ -103,6 +112,11 @@ public class MongoStoragePluginConfig extends AbstractSecuredStoragePluginConfig
     return Objects.hash(connection);
   }
 
+  @Override
+  public MongoStoragePluginConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return new MongoStoragePluginConfig(this, credentialsProvider);
+  }
+
   public static class MongoPluginOptimizations {
 
     private boolean supportsProjectPushdown = true;
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java
index b1fe8e2291..6e73dc2b09 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java
@@ -20,10 +20,11 @@ package org.apache.drill.exec.store.phoenix;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.store.security.CredentialProviderUtils;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
@@ -34,7 +35,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName(PhoenixStoragePluginConfig.NAME)
-public class PhoenixStoragePluginConfig extends AbstractSecuredStoragePluginConfig {
+public class PhoenixStoragePluginConfig extends CredentialedStoragePluginConfig {
 
   public static final String NAME = "phoenix";
   public static final String THIN_DRIVER_CLASS = "org.apache.phoenix.queryserver.client.Driver";
@@ -62,8 +63,10 @@ public class PhoenixStoragePluginConfig extends AbstractSecuredStoragePluginConf
   }
 
   @JsonIgnore
-  public UsernamePasswordCredentials getUsernamePasswordCredentials() {
-    return new UsernamePasswordCredentials(credentialsProvider);
+  public Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials() {
+    return new UsernamePasswordCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .build();
   }
 
   @JsonProperty("host")
@@ -78,19 +81,23 @@ public class PhoenixStoragePluginConfig extends AbstractSecuredStoragePluginConf
 
   @JsonProperty("userName")
   public String getUsername() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getUsername();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getUsername)
+      .orElse(null);
   }
 
   @JsonIgnore
   @JsonProperty("password")
   public String getPassword() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getPassword();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getPassword)
+      .orElse(null);
   }
 
   @JsonProperty("jdbcURL")
@@ -140,4 +147,9 @@ public class PhoenixStoragePluginConfig extends AbstractSecuredStoragePluginConf
         .field("props", props)
         .toString();
   }
+
+  @Override
+  public PhoenixStoragePluginConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return this;
+  }
 }
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
index b262d7dc8b..c92738eaf6 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.TimeUnit;
+import java.util.Optional;
 
 /**
  * This class wraps the functionality of the Splunk connection for Drill.
@@ -39,7 +40,7 @@ public class SplunkConnection {
 
   private static final Logger logger = LoggerFactory.getLogger(SplunkConnection.class);
 
-  private final UsernamePasswordCredentials credentials;
+  private final Optional<UsernamePasswordCredentials> credentials;
   private final String hostname;
   private final int port;
   private Service service;
@@ -73,8 +74,8 @@ public class SplunkConnection {
     ServiceArgs loginArgs = new ServiceArgs();
     loginArgs.setHost(hostname);
     loginArgs.setPort(port);
-    loginArgs.setPassword(credentials.getPassword());
-    loginArgs.setUsername(credentials.getUsername());
+    loginArgs.setPassword(credentials.map(UsernamePasswordCredentials::getPassword).orElse(null));
+    loginArgs.setUsername(credentials.map(UsernamePasswordCredentials::getUsername).orElse(null));
     try {
       connectionAttempts--;
       service = Service.connect(loginArgs);
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
index 54c65645e4..0734347b08 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
@@ -23,15 +23,17 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.exec.store.security.CredentialProviderUtils;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 
 import java.util.Objects;
+import java.util.Optional;
 
 @JsonTypeName(SplunkPluginConfig.NAME)
-public class SplunkPluginConfig extends AbstractSecuredStoragePluginConfig {
+public class SplunkPluginConfig extends CredentialedStoragePluginConfig {
 
   public static final String NAME = "splunk";
   public static final int DISABLED_RECONNECT_RETRIES = 1;
@@ -51,7 +53,8 @@ public class SplunkPluginConfig extends AbstractSecuredStoragePluginConfig {
                             @JsonProperty("earliestTime") String earliestTime,
                             @JsonProperty("latestTime") String latestTime,
                             @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider,
-                            @JsonProperty("reconnectRetries") Integer reconnectRetries) {
+                            @JsonProperty("reconnectRetries") Integer reconnectRetries,
+                            @JsonProperty("authMode") String authMode) {
     super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider),
         credentialsProvider == null);
     this.hostname = hostname;
@@ -61,25 +64,40 @@ public class SplunkPluginConfig extends AbstractSecuredStoragePluginConfig {
     this.reconnectRetries = reconnectRetries;
   }
 
+  private SplunkPluginConfig(SplunkPluginConfig that, CredentialsProvider credentialsProvider) {
+    super(getCredentialsProvider(credentialsProvider), credentialsProvider == null, that.authMode);
+    this.hostname = that.hostname;
+    this.port = that.port;
+    this.earliestTime = that.earliestTime;
+    this.latestTime = that.latestTime;
+    this.reconnectRetries = that.reconnectRetries;
+  }
+
   @JsonIgnore
-  public UsernamePasswordCredentials getUsernamePasswordCredentials() {
-    return new UsernamePasswordCredentials(credentialsProvider);
+  public Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials() {
+    return new UsernamePasswordCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .build();
   }
 
   @JsonProperty("username")
   public String getUsername() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getUsername();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getUsername)
+      .orElse(null);
   }
 
   @JsonProperty("password")
   public String getPassword() {
-    if (directCredentials) {
-      return getUsernamePasswordCredentials().getPassword();
+    if (!directCredentials) {
+      return null;
     }
-    return null;
+    return getUsernamePasswordCredentials()
+      .map(UsernamePasswordCredentials::getPassword)
+      .orElse(null);
   }
 
   @JsonProperty("hostname")
@@ -107,6 +125,10 @@ public class SplunkPluginConfig extends AbstractSecuredStoragePluginConfig {
     return reconnectRetries != null ? reconnectRetries : DISABLED_RECONNECT_RETRIES;
   }
 
+  private static CredentialsProvider getCredentialsProvider(CredentialsProvider credentialsProvider) {
+    return credentialsProvider != null ? credentialsProvider : PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
+  }
+
   @Override
   public boolean equals(Object that) {
     if (this == that) {
@@ -137,4 +159,9 @@ public class SplunkPluginConfig extends AbstractSecuredStoragePluginConfig {
       .field("latestTime", latestTime)
       .toString();
   }
+
+  @Override
+  public SplunkPluginConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return new SplunkPluginConfig(this, credentialsProvider);
+  }
 }
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
index d497c6e6a2..4e010b2201 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
@@ -21,6 +21,7 @@ package org.apache.drill.exec.store.splunk;
 import com.splunk.EntityCollection;
 import com.splunk.Index;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -50,7 +51,8 @@ public class SplunkConnectionTest extends SplunkBaseTest {
               SPLUNK_STORAGE_PLUGIN_CONFIG.getEarliestTime(),
               SPLUNK_STORAGE_PLUGIN_CONFIG.getLatestTime(),
               null,
-              SPLUNK_STORAGE_PLUGIN_CONFIG.getReconnectRetries()
+              SPLUNK_STORAGE_PLUGIN_CONFIG.getReconnectRetries(),
+              StoragePluginConfig.AuthMode.SHARED_USER.name()
       );
       SplunkConnection sc = new SplunkConnection(invalidSplunkConfig);
       sc.connect();
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
index 23082e3611..0abf92aa66 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
@@ -19,6 +19,7 @@
 package org.apache.drill.exec.store.splunk;
 
 import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
@@ -72,7 +73,7 @@ public class SplunkTestSuite extends ClusterTest {
         Integer port = splunk.getFirstMappedPort();
         StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
         SPLUNK_STORAGE_PLUGIN_CONFIG = new SplunkPluginConfig(SPLUNK_LOGIN, SPLUNK_PASS, hostname, port, "1", "now",
-                null, 4);
+                null, 4, StoragePluginConfig.AuthMode.SHARED_USER.name());
         SPLUNK_STORAGE_PLUGIN_CONFIG.setEnabled(true);
         pluginRegistry.put(SplunkPluginConfig.NAME, SPLUNK_STORAGE_PLUGIN_CONFIG);
         runningSuite = true;
@@ -97,4 +98,4 @@ public class SplunkTestSuite extends ClusterTest {
   public static boolean isRunningSuite() {
     return runningSuite;
   }
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ContextInformation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ContextInformation.java
index 24fcd10721..6799fa68e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ContextInformation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ContextInformation.java
@@ -27,14 +27,14 @@ import org.apache.drill.exec.expr.fn.impl.DateUtility;
  * Provides query context information (such as query start time, query user, default schema etc.) for UDFs.
  */
 public class ContextInformation {
-  private final String queryUser;
+  private final UserCredentials queryUserCredentials;
   private final String currentDefaultSchema;
   private final long queryStartTime;
   private final int rootFragmentTimeZone;
   private final String sessionId;
 
   public ContextInformation(final UserCredentials userCredentials, final QueryContextInformation queryContextInfo) {
-    this.queryUser = userCredentials.getUserName();
+    this.queryUserCredentials = userCredentials;
     this.currentDefaultSchema = queryContextInfo.getDefaultSchemaName();
     this.queryStartTime = queryContextInfo.getQueryStartTime();
     this.rootFragmentTimeZone = queryContextInfo.getTimeZone();
@@ -45,7 +45,14 @@ public class ContextInformation {
    * @return userName of the user who issued the current query.
    */
   public String getQueryUser() {
-    return queryUser;
+    return queryUserCredentials.getUserName();
+  }
+
+  /**
+   * @return credentials of the user who issued the current query.
+   */
+  public UserCredentials getQueryUserCredentials() {
+    return queryUserCredentials;
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index 0da319f500..889c251628 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -55,6 +55,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.rpc.RpcException;
@@ -319,7 +320,7 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
     SchemaConfig schemaConfig = SchemaConfig
         .newBuilder(
             isImpersonationEnabled ? contextInformation.getQueryUser() : ImpersonationUtil.getProcessUserName(),
-            new FragmentSchemaConfigInfoProvider(fragmentOptions, contextInformation.getQueryUser(), context))
+            new FragmentSchemaConfigInfoProvider(fragmentOptions, contextInformation.getQueryUserCredentials(), context))
         .setIgnoreAuthErrors(isImpersonationEnabled)
         .build();
 
@@ -680,16 +681,16 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
 
     private final OptionManager optionManager;
 
-    private final String queryUser;
+    private final UserCredentials queryUserCredentials;
 
     private final SchemaTreeProvider schemaTreeProvider;
 
     private final ViewExpansionContext viewExpansionContext;
 
     private FragmentSchemaConfigInfoProvider(OptionManager optionManager,
-        String queryUser, DrillbitContext context) {
+        UserCredentials queryUserCredentials, DrillbitContext context) {
       this.optionManager = optionManager;
-      this.queryUser = queryUser;
+      this.queryUserCredentials = queryUserCredentials;
       this.schemaTreeProvider = new SchemaTreeProvider(context);
       viewExpansionContext = new ViewExpansionContext(context.getConfig(), this);
     }
@@ -706,7 +707,12 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
 
     @Override
     public String getQueryUserName() {
-      return queryUser;
+      return queryUserCredentials.getUserName();
+    }
+
+    @Override
+    public UserCredentials getQueryUserCredentials() {
+      return queryUserCredentials;
     }
 
     @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 76b44ce248..0b37ce1aeb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.planner.sql.DrillOperatorTable;
 import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -146,12 +147,23 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
     return plannerSettings;
   }
 
-  public UserSession getSession() { return session; }
+  @Override
+  public UserCredentials getQueryUserCredentials() {
+    return session.getCredentials();
+  }
 
   @Override
-  public BufferAllocator getAllocator() { return allocator; }
+  public BufferAllocator getAllocator() {
+    return allocator;
+  }
 
-  public QueryId getQueryId( ) { return queryId; }
+  public UserSession getSession() {
+    return session;
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
 
   /**
    * Return reference to default schema instance in a schema tree. Each {@link org.apache.calcite.schema.SchemaPlus}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java
index 07fbae367e..549900309f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java
@@ -23,6 +23,7 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.store.SchemaConfig.SchemaConfigInfoProvider;
 
 import com.carrotsearch.hppc.ObjectIntHashMap;
@@ -74,7 +75,7 @@ public class ViewExpansionContext {
 
   private final SchemaConfigInfoProvider schemaConfigInfoProvider;
   private final int maxChainedUserHops;
-  private final String queryUser;
+  private final UserCredentials queryUserCredentials;
   private final ObjectIntHashMap<String> userTokens = new ObjectIntHashMap<>();
   private final boolean impersonationEnabled;
 
@@ -85,7 +86,7 @@ public class ViewExpansionContext {
   public ViewExpansionContext(DrillConfig config, SchemaConfigInfoProvider schemaConfigInfoProvider) {
     this.schemaConfigInfoProvider = schemaConfigInfoProvider;
     this.maxChainedUserHops = config.getInt(IMPERSONATION_MAX_CHAINED_USER_HOPS);
-    this.queryUser = schemaConfigInfoProvider.getQueryUserName();
+    this.queryUserCredentials = schemaConfigInfoProvider.getQueryUserCredentials();
     this.impersonationEnabled = config.getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
 
@@ -103,7 +104,7 @@ public class ViewExpansionContext {
    */
   public ViewExpansionToken reserveViewExpansionToken(String viewOwner) {
     int totalTokens = 1;
-    if (!viewOwner.equals(queryUser)) {
+    if (!viewOwner.equals(queryUserCredentials.getUserName())) {
       // We want to track the tokens only if the "viewOwner" is not same as the "queryUser".
       if (userTokens.containsKey(viewOwner)) {
         // If the user already exists, we don't need to validate the limit on maximum user hops in chained impersonation
@@ -131,7 +132,7 @@ public class ViewExpansionContext {
   private void releaseViewExpansionToken(ViewExpansionToken token) {
     final String viewOwner = token.viewOwner;
 
-    if (viewOwner.equals(queryUser)) {
+    if (viewOwner.equals(queryUserCredentials.getUserName())) {
       // If the token owner and queryUser are same, no need to track the token release.
       return;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/CredentialResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/CredentialResources.java
new file mode 100644
index 0000000000..c63043e7e3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/CredentialResources.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.server.rest;
+
+import io.swagger.v3.oas.annotations.ExternalDocumentation;
+import io.swagger.v3.oas.annotations.Operation;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
+import org.apache.drill.exec.server.rest.StorageResources.StoragePluginModel;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginFilter;
+import org.glassfish.jersey.server.mvc.Viewable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.security.RolesAllowed;
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.FormParam;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.SecurityContext;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.drill.exec.server.rest.auth.DrillUserPrincipal.AUTHENTICATED_ROLE;
+
+@Path("/")
+@RolesAllowed(AUTHENTICATED_ROLE)
+public class CredentialResources {
+  private static final Logger logger = LoggerFactory.getLogger(CredentialResources.class);
+  private static final Comparator<PluginConfigWrapper> PLUGIN_COMPARATOR =
+    Comparator.comparing(PluginConfigWrapper::getName);
+  private static final String ALL_PLUGINS = "all";
+  private static final String ENABLED_PLUGINS = "enabled";
+  private static final String DISABLED_PLUGINS = "disabled";
+  private static final String TRANSLATES_USERS = "translates_users";
+
+  @Inject
+  UserAuthEnabled authEnabled;
+
+  @Inject
+  StoragePluginRegistry storage;
+
+  @Inject
+  SecurityContext sc;
+
+  @Inject
+  HttpServletRequest request;
+
+  @GET
+  @Path("/credentials")
+  @Produces(MediaType.TEXT_HTML)
+  @Operation(externalDocs = @ExternalDocumentation(description = "Apache Drill REST API documentation:", url = "https://drill.apache.org/docs/rest-api-introduction/"))
+  public Viewable getPlugins() {
+    List<StoragePluginModel> model = getPluginsJSON().stream()
+      .map(plugin -> new StoragePluginModel(plugin, request, sc))
+      .collect(Collectors.toList());
+    // Creating an empty model with CSRF token, if there are no storage plugins
+    if (model.isEmpty()) {
+      model.add(new StoragePluginModel(null, request, sc));
+    }
+    return ViewableWithPermissions.create(authEnabled.get(), "/rest/credentials/list.ftl", sc, model);
+  }
+
+  @GET
+  @Path("/credentials.json")
+  @Produces(MediaType.APPLICATION_JSON)
+  public List<PluginConfigWrapper> getPluginsJSON() {
+    return getConfigsFor(TRANSLATES_USERS);
+  }
+
+  @GET
+  @Path("/credentials{group: (/[^/]+?)*}-plugins.json")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Operation(externalDocs = @ExternalDocumentation(description = "Apache Drill REST API documentation:", url = "https://drill.apache.org/docs/rest-api-introduction/"))
+  public List<PluginConfigWrapper> getConfigsFor(@PathParam("group") String pluginGroup) {
+    PluginFilter filter;
+    switch (pluginGroup.trim()) {
+      case ALL_PLUGINS:
+        filter = PluginFilter.ALL;
+        break;
+      case ENABLED_PLUGINS:
+        filter = PluginFilter.ENABLED;
+        break;
+      case DISABLED_PLUGINS:
+        filter = PluginFilter.DISABLED;
+        break;
+      case TRANSLATES_USERS:
+        filter = PluginFilter.TRANSLATES_USERS;
+        break;
+      default:
+        return Collections.emptyList();
+    }
+    return StreamSupport.stream(
+        Spliterators.spliteratorUnknownSize(storage.storedConfigs(filter).entrySet().iterator(), Spliterator.ORDERED), false)
+      .map(entry -> new PluginConfigWrapper(entry.getKey(), entry.getValue(), sc))
+      .sorted(PLUGIN_COMPARATOR)
+      .collect(Collectors.toList());
+  }
+
+  @POST
+  @Path("/credentials/update_credentials")
+  @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Operation(externalDocs = @ExternalDocumentation(description = "Apache Drill REST API documentation:", url = "https://drill.apache.org/docs/rest-api-introduction/"))
+  public Response createOrUpdateCredentials(@FormParam("plugin") String pluginName,
+                                       @FormParam("username") String username,
+                                       @FormParam("password") String password) {
+    String queryUser = sc.getUserPrincipal().getName();
+    pluginName = pluginName.trim();
+    if (pluginName.isEmpty()) {
+      return Response.status(Response.Status.BAD_REQUEST)
+        .entity(message("A storage config name may not be empty"))
+        .build();
+    }
+
+    // Get the config
+    StoragePluginConfig rawConfig = storage.getStoredConfig(pluginName);
+    if (!(rawConfig instanceof CredentialedStoragePluginConfig)) {
+      return Response.status(Status.INTERNAL_SERVER_ERROR)
+        .entity(message(pluginName + " does not support per user credentials."))
+        .build();
+    }
+
+    CredentialedStoragePluginConfig config = (CredentialedStoragePluginConfig)rawConfig;
+
+    if (config.getAuthMode() != AuthMode.USER_TRANSLATION) {
+      return Response.status(Status.INTERNAL_SERVER_ERROR)
+        .entity(message(pluginName + " does not support per user translation."))
+        .build();
+    }
+
+    // Get the credential provider
+    CredentialsProvider credentialProvider = config.getCredentialsProvider();
+    credentialProvider.setUserCredentials(username, password, queryUser);
+
+    // Since the config classes are not accessible from java-exec, we have to serialize them,
+    // replace the credential provider with the updated one, and update the storage plugin registry
+    CredentialedStoragePluginConfig newConfig = config.updateCredentialProvider(credentialProvider);
+    newConfig.setEnabled(config.isEnabled());
+
+    try {
+      storage.validatedPut(pluginName, newConfig);
+      // Force re-caching
+      storage.setEnabled(pluginName, newConfig.isEnabled());
+    } catch (PluginException e) {
+      logger.error("Error while saving plugin", e);
+      return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+        .entity(message("Error while saving plugin: %s", e.getMessage()))
+        .build();
+    }
+
+    return Response.ok().entity(message("Success")).build();
+  }
+
+  @POST
+  @Path("/credentials/{pluginName}/update_credentials.json")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Operation(externalDocs = @ExternalDocumentation(description = "Apache Drill REST API documentation:", url = "https://drill.apache.org/docs/rest-api-introduction/"))
+  public Response createOrUpdatePlugin(@PathParam("pluginName") String pluginName, UsernamePasswordContainer credentials) {
+    String queryUser = sc.getUserPrincipal().getName();
+    String cleanPluginName;
+    if (pluginName.isEmpty()) {
+      return Response.status(Response.Status.BAD_REQUEST)
+        .entity(message("A storage config name may not be empty"))
+        .build();
+    }
+    cleanPluginName = pluginName.trim();
+    StoragePluginConfig config = storage.getStoredConfig(cleanPluginName);
+
+    if (!(config instanceof CredentialedStoragePluginConfig)) {
+      return Response.status(Status.INTERNAL_SERVER_ERROR)
+        .entity(message(cleanPluginName + " does not support user translation."))
+        .build();
+    }
+
+    if (config.getAuthMode() != AuthMode.USER_TRANSLATION) {
+      return Response.status(Status.INTERNAL_SERVER_ERROR)
+        .entity(message(cleanPluginName + " does not have user translation enabled."))
+        .build();
+    }
+
+    CredentialedStoragePluginConfig credsConfig = (CredentialedStoragePluginConfig)config;
+    CredentialsProvider credentialProvider = credsConfig.getCredentialsProvider();
+    credentialProvider.setUserCredentials(credentials.getUsername(), credentials.getPassword(), queryUser);
+
+    // Since the config classes are not accessible from java-exec, we have to serialize them,
+    // replace the credential provider with the updated one, and update the storage plugin registry
+    CredentialedStoragePluginConfig newConfig = credsConfig.updateCredentialProvider(credentialProvider);
+    newConfig.setEnabled(credsConfig.isEnabled());
+
+    try {
+      storage.validatedPut(cleanPluginName, newConfig);
+      // Force re-caching
+      storage.setEnabled(cleanPluginName, newConfig.isEnabled());
+    } catch (PluginException e) {
+      logger.error("Error while saving plugin", e);
+      return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+        .entity(message("Error while updating plugin credentials: %s", e.getMessage()))
+        .build();
+    }
+
+    return Response.status(Status.OK)
+      .entity("Credentials have been updated.")
+      .build();
+  }
+
+  private JsonResult message(String message, Object... args) {
+    return new JsonResult(String.format(message, args)); // lgtm [java/tainted-format-string]
+  }
+
+  @XmlRootElement
+  public static class JsonResult {
+    private final String result;
+    public JsonResult(String result) {
+      this.result = result;
+    }
+    public String getResult() {
+      return result;
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index 1639d58bf7..cbe7128b93 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -91,6 +91,7 @@ public class DrillRestServer extends ResourceConfig {
     register(StatusResources.class);
     register(StorageResources.class);
     register(ProfileResources.class);
+    register(CredentialResources.class);
     register(QueryResources.class);
     register(MetricsResources.class);
     register(ThreadsResources.class);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java
index 6454a55981..ff5170db64 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java
@@ -17,11 +17,14 @@
  */
 package org.apache.drill.exec.server.rest;
 
+import java.util.Optional;
+
+import javax.ws.rs.core.SecurityContext;
 import javax.xml.bind.annotation.XmlRootElement;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -29,19 +32,22 @@ import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.exec.store.security.oauth.OAuthTokenCredentials;
 
 @XmlRootElement
 public class PluginConfigWrapper {
-
   private final String name;
   private final StoragePluginConfig config;
+  private final SecurityContext sc;
 
   @JsonCreator
   public PluginConfigWrapper(@JsonProperty("name") String name,
-      @JsonProperty("config") StoragePluginConfig config) {
+                             @JsonProperty("config") StoragePluginConfig config,
+                             @JacksonInject SecurityContext sc) {
     this.name = name;
     this.config = config;
+    this.sc = sc;
   }
 
   public String getName() { return name; }
@@ -52,6 +58,40 @@ public class PluginConfigWrapper {
     return config.isEnabled();
   }
 
+  @JsonIgnore
+  public String getUserName() {
+    if (!(config instanceof CredentialedStoragePluginConfig)) {
+      return null;
+    }
+
+    CredentialedStoragePluginConfig securedStoragePluginConfig = (CredentialedStoragePluginConfig) config;
+    CredentialsProvider credentialsProvider = securedStoragePluginConfig.getCredentialsProvider();
+    String queryUser = sc.getUserPrincipal().getName();
+    Optional<UsernamePasswordCredentials> credentials = new UsernamePasswordCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .setQueryUser(queryUser)
+      .build();
+
+    return credentials.map(UsernamePasswordCredentials::getUsername).orElse(null);
+  }
+
+  @JsonIgnore
+  public String getPassword() {
+    if (!(config instanceof CredentialedStoragePluginConfig)) {
+      return null;
+    }
+
+    CredentialedStoragePluginConfig securedStoragePluginConfig = (CredentialedStoragePluginConfig) config;
+    CredentialsProvider credentialsProvider = securedStoragePluginConfig.getCredentialsProvider();
+    String queryUser = sc.getUserPrincipal().getName();
+    Optional<UsernamePasswordCredentials> credentials = new UsernamePasswordCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .setQueryUser(queryUser)
+      .build();
+
+    return credentials.map(UsernamePasswordCredentials::getPassword).orElse(null);
+  }
+
   public void createOrUpdateInStorage(StoragePluginRegistry storage) throws PluginException {
     storage.validatedPut(name, config);
   }
@@ -66,17 +106,19 @@ public class PluginConfigWrapper {
    */
   @JsonIgnore
   public boolean isOauth() {
-    if (! (config instanceof AbstractSecuredStoragePluginConfig)) {
+    if (! (config instanceof CredentialedStoragePluginConfig)) {
       return false;
     }
-    AbstractSecuredStoragePluginConfig securedStoragePluginConfig = (AbstractSecuredStoragePluginConfig) config;
+    CredentialedStoragePluginConfig securedStoragePluginConfig = (CredentialedStoragePluginConfig) config;
     CredentialsProvider credentialsProvider = securedStoragePluginConfig.getCredentialsProvider();
     if (credentialsProvider == null) {
       return false;
     }
-    OAuthTokenCredentials tokenCredentials = new OAuthTokenCredentials(credentialsProvider);
 
-    return !StringUtils.isEmpty(tokenCredentials.getClientID()) ||
-      !StringUtils.isEmpty(tokenCredentials.getClientSecret());
+    Optional<OAuthTokenCredentials> tokenCredentials = new OAuthTokenCredentials.Builder()
+      .setCredentialsProvider(credentialsProvider)
+      .build();
+
+    return tokenCredentials.map(OAuthTokenCredentials::getClientID).orElse(null) != null;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java
index 5df42111d7..fa1fc54316 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java
@@ -87,7 +87,7 @@ public class QueryResources {
   public Viewable getQuery() {
     List<StorageResources.StoragePluginModel> enabledPlugins = sr.getConfigsFor("enabled")
       .stream()
-      .map(plugin -> new StorageResources.StoragePluginModel(plugin, request))
+      .map(plugin -> new StorageResources.StoragePluginModel(plugin, request, sc))
       .collect(Collectors.toList());
     return ViewableWithPermissions.create(
         authEnabled.get(), "/rest/query/query.ftl",
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
index 800fd11ff5..2d36f77998 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
@@ -56,7 +56,7 @@ import okhttp3.OkHttpClient;
 import okhttp3.OkHttpClient.Builder;
 import okhttp3.Request;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.oauth.OAuthTokenProvider;
 import org.apache.drill.exec.oauth.PersistentTokenTable;
@@ -148,11 +148,11 @@ public class StorageResources {
   @Produces(MediaType.TEXT_HTML)
   public Viewable getPlugins() {
     List<StoragePluginModel> model = getPluginsJSON().stream()
-      .map(plugin -> new StoragePluginModel(plugin, request))
+      .map(plugin -> new StoragePluginModel(plugin, request, sc))
       .collect(Collectors.toList());
     // Creating an empty model with CSRF token, if there are no storage plugins
     if (model.isEmpty()) {
-      model.add(new StoragePluginModel(null, request));
+      model.add(new StoragePluginModel(null, request, sc));
     }
     return ViewableWithPermissions.create(authEnabled.get(), "/rest/storage/list.ftl", sc, model);
   }
@@ -163,7 +163,7 @@ public class StorageResources {
   @Operation(externalDocs = @ExternalDocumentation(description = "Apache Drill REST API documentation:", url = "https://drill.apache.org/docs/rest-api-introduction/"))
   public Response getPluginConfig(@PathParam("name") String name) {
     try {
-      return Response.ok(new PluginConfigWrapper(name, storage.getStoredConfig(name)))
+      return Response.ok(new PluginConfigWrapper(name, storage.getStoredConfig(name), sc))
         .build();
     } catch (Exception e) {
       logger.error("Failure while trying to access storage config: {}", name, e);
@@ -180,7 +180,7 @@ public class StorageResources {
   public Viewable getPlugin(@PathParam("name") String name) {
     StoragePluginModel model = new StoragePluginModel(
       (PluginConfigWrapper) getPluginConfig(name).getEntity(),
-      request
+      request, sc
     );
     return ViewableWithPermissions.create(authEnabled.get(), "/rest/storage/update.ftl", sc,
       model);
@@ -213,7 +213,7 @@ public class StorageResources {
   @Produces(MediaType.APPLICATION_JSON)
   public Response updateRefreshToken(@PathParam("name") String name, OAuthTokenContainer tokens) {
     try {
-      if (storage.getPlugin(name).getConfig() instanceof AbstractSecuredStoragePluginConfig) {
+      if (storage.getPlugin(name).getConfig() instanceof CredentialedStoragePluginConfig) {
         DrillbitContext context = ((AbstractStoragePlugin) storage.getPlugin(name)).getContext();
         OAuthTokenProvider tokenProvider = context.getoAuthTokenProvider();
         PersistentTokenTable tokenTable = tokenProvider.getOauthTokenRegistry().getTokenTable(name);
@@ -244,7 +244,7 @@ public class StorageResources {
   @Produces(MediaType.APPLICATION_JSON)
   public Response updateAccessToken(@PathParam("name") String name, OAuthTokenContainer tokens) {
     try {
-      if (storage.getPlugin(name).getConfig() instanceof AbstractSecuredStoragePluginConfig) {
+      if (storage.getPlugin(name).getConfig() instanceof CredentialedStoragePluginConfig) {
         DrillbitContext context = ((AbstractStoragePlugin) storage.getPlugin(name)).getContext();
         OAuthTokenProvider tokenProvider = context.getoAuthTokenProvider();
         PersistentTokenTable tokenTable = tokenProvider.getOauthTokenRegistry().getTokenTable(name);
@@ -276,7 +276,7 @@ public class StorageResources {
   public Response updateOAuthTokens(@PathParam("name") String name,
                                     OAuthTokenContainer tokenContainer) {
     try {
-      if (storage.getPlugin(name).getConfig() instanceof AbstractSecuredStoragePluginConfig) {
+      if (storage.getPlugin(name).getConfig() instanceof CredentialedStoragePluginConfig) {
         DrillbitContext context = ((AbstractStoragePlugin) storage.getPlugin(name)).getContext();
         OAuthTokenProvider tokenProvider = context.getoAuthTokenProvider();
         PersistentTokenTable tokenTable = tokenProvider.getOauthTokenRegistry().getTokenTable(name);
@@ -307,8 +307,8 @@ public class StorageResources {
   @Produces(MediaType.TEXT_HTML)
   public Response updateAuthToken(@PathParam("name") String name, @QueryParam("code") String code) {
     try {
-      if (storage.getPlugin(name).getConfig() instanceof AbstractSecuredStoragePluginConfig) {
-        AbstractSecuredStoragePluginConfig securedStoragePluginConfig = (AbstractSecuredStoragePluginConfig) storage.getPlugin(name).getConfig();
+      if (storage.getPlugin(name).getConfig() instanceof CredentialedStoragePluginConfig) {
+        CredentialedStoragePluginConfig securedStoragePluginConfig = (CredentialedStoragePluginConfig) storage.getPlugin(name).getConfig();
         CredentialsProvider credentialsProvider = securedStoragePluginConfig.getCredentialsProvider();
         String callbackURL = this.request.getRequestURL().toString();
 
@@ -389,7 +389,7 @@ public class StorageResources {
         .build();
     }
 
-    return Response.ok(new PluginConfigWrapper(name, storage.getStoredConfig(name)))
+    return Response.ok(new PluginConfigWrapper(name, storage.getStoredConfig(name), sc))
       .header(HttpHeaders.CONTENT_DISPOSITION, String.format("attachment;filename=\"%s.%s\"", name, format))
       .build();
   }
@@ -468,7 +468,7 @@ public class StorageResources {
   }
 
   private JsonResult message(String message, Object... args) {
-    return new JsonResult(String.format(message, args));
+    return new JsonResult(String.format(message, args));  // lgtm [java/tainted-format-string]
   }
 
   private boolean isSupported(String format) {
@@ -511,7 +511,7 @@ public class StorageResources {
     pluginGroup = StringUtils.isNotEmpty(pluginGroup) ? pluginGroup.replace("/", "") : ALL_PLUGINS;
     return StreamSupport.stream(
       Spliterators.spliteratorUnknownSize(storage.storedConfigs(filter).entrySet().iterator(), Spliterator.ORDERED), false)
-        .map(entry -> new PluginConfigWrapper(entry.getKey(), entry.getValue()))
+        .map(entry -> new PluginConfigWrapper(entry.getKey(), entry.getValue(), sc))
         .sorted(PLUGIN_COMPARATOR)
         .collect(Collectors.toList());
   }
@@ -561,8 +561,9 @@ public class StorageResources {
     private final PluginConfigWrapper plugin;
     private final String type;
     private final String csrfToken;
+    private final SecurityContext securityContext;
 
-    public StoragePluginModel(PluginConfigWrapper plugin, HttpServletRequest request) {
+    public StoragePluginModel(PluginConfigWrapper plugin, HttpServletRequest request, SecurityContext sc) {
       this.plugin = plugin;
 
       if (plugin != null) {
@@ -571,6 +572,11 @@ public class StorageResources {
         this.type = "Unknown";
       }
       csrfToken = WebUtils.getCsrfTokenFromHttpRequest(request);
+      this.securityContext = sc;
+    }
+
+    public String getActiveUser() {
+      return securityContext.getUserPrincipal().getName();
     }
 
     public String getType() {
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/UsernamePasswordContainer.java
similarity index 54%
copy from contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/UsernamePasswordContainer.java
index 9c10cf79db..ddbe9abb3f 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/UsernamePasswordContainer.java
@@ -15,18 +15,31 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.jdbc;
 
-import org.apache.drill.exec.store.jdbc.clickhouse.ClickhouseJdbcDialect;
+package org.apache.drill.exec.server.rest;
 
-public class JdbcDialectFactory {
-  public static final String JDBC_CLICKHOUSE_PREFIX = "jdbc:clickhouse";
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 
-  public static JdbcDialect getJdbcDialect(JdbcStoragePlugin plugin, String url) {
-    if (url.startsWith(JDBC_CLICKHOUSE_PREFIX)) {
-      return new ClickhouseJdbcDialect(plugin);
-    } else {
-      return new DefaultJdbcDialect(plugin);
-    }
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class UsernamePasswordContainer {
+  private final String username;
+  private final String password;
+
+  @JsonCreator
+  public UsernamePasswordContainer(@JsonProperty("username") String username,
+                             @JsonProperty("password") String password) {
+    this.username = username;
+    this.password = password;
   }
+
+  public String getUsername() {
+      return username;
+    }
+
+  public String getPassword() {
+      return password;
+    }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ViewableWithPermissions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ViewableWithPermissions.java
index 98775275dd..e678a7cf3a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ViewableWithPermissions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ViewableWithPermissions.java
@@ -79,8 +79,11 @@ public class ViewableWithPermissions extends Viewable {
 
     final boolean isUserLoggedIn = AuthDynamicFeature.isUserLoggedIn(sc);
 
+    final boolean showCredentials = (authEnabled && isUserLoggedIn);
+
     final ImmutableMap.Builder<String, Object> mapBuilder = ImmutableMap.<String, Object>builder()
         .put("showStorage", isAdmin)
+        .put("showCredentials", showCredentials)
         .put("showOptions", isAdmin)
         .put("showThreads", isAdmin)
         .put("showLogs", isAdmin)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
index 2e467704c9..19906fad0d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store;
 
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.exec.ops.ViewExpansionContext;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.server.options.OptionValue;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -79,6 +80,10 @@ public class SchemaConfig {
     return userName;
   }
 
+  public UserCredentials getQueryUserCredentials() {
+    return provider.getQueryUserCredentials();
+  }
+
   /**
    * @return Should ignore if authorization errors are reported while {@link SchemaPlus}
    * instances interact with the underlying storage.
@@ -105,6 +110,8 @@ public class SchemaConfig {
 
     String getQueryUserName();
 
+    UserCredentials getQueryUserCredentials();
+
     OptionValue getOption(String optionKey);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
index e28a898c5e..3ab26435f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.ViewExpansionContext;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.calcite.jdbc.DynamicSchema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -77,6 +78,12 @@ public class SchemaTreeProvider implements AutoCloseable {
       @Override public String getQueryUserName() {
         return ImpersonationUtil.getProcessUserName();
       }
+
+      @Override public UserCredentials getQueryUserCredentials() {
+        return UserCredentials.newBuilder()
+          .setUserName(ImpersonationUtil.getProcessUserName())
+          .build();
+      }
     };
 
     final SchemaConfig schemaConfig = SchemaConfig.newBuilder(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index f1a75ff0d8..dd34052114 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -217,7 +217,7 @@ public interface StoragePluginRegistry extends Iterable<Map.Entry<String, Storag
    */
   Map<String, StoragePluginConfig> storedConfigs();
 
-  enum PluginFilter { ALL, ENABLED, DISABLED };
+  enum PluginFilter { ALL, ENABLED, DISABLED, TRANSLATES_USERS };
 
   /**
    * Return a possibly-filtered set of plugins from the persistent
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index 2e9b2d096f..64d87f33af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.drill.common.collections.ImmutableEntry;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.planner.logical.StoragePlugins;
@@ -107,7 +108,7 @@ import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
  * make sure that the cache ends up agreeing with the persistent store
  * as it was at some point in time.
  * <p>
- * The {@link PluginsMap} class provides in-memory synchronization of the
+ * The {@link StoragePluginMap} class provides in-memory synchronization of the
  * name and config maps. Careful coding is needed when handling refresh
  * since another thread could make the same changes.
  * <p>
@@ -141,7 +142,7 @@ import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
  * <h4>Caveats</h4>
  *
  * The main problem with synchronization at present is that plugins
- * provide a {@link close()} method that, if used, could render the
+ * provide a {@code close()} method that, if used, could render the
  * plugin unusable. Suppose a Cassandra plugin, say, maintains a connection
  * to a server used across multiple queries and threads. Any change to
  * the config immediately calls {@code close()} on the plugin, even though
@@ -763,6 +764,10 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
       case DISABLED:
         include = !plugin.getValue().isEnabled();
         break;
+      case TRANSLATES_USERS:
+        include = plugin.getValue().getAuthMode() == AuthMode.USER_TRANSLATION
+          && plugin.getValue().isEnabled();
+        break;
       default:
         include = true;
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
index 4c11ab08a5..6fbba4196a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -34,7 +34,7 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
@@ -43,7 +43,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap.Buil
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 
 @JsonTypeName(FileSystemConfig.NAME)
-public class FileSystemConfig extends AbstractSecuredStoragePluginConfig {
+public class FileSystemConfig extends CredentialedStoragePluginConfig {
   private static final List<String> FS_CREDENTIAL_KEYS =
       Arrays.asList(
           CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH,
@@ -182,4 +182,9 @@ public class FileSystemConfig extends AbstractSecuredStoragePluginConfig {
     }
     return PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
   }
+
+  @Override
+  public FileSystemConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return this;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
index 210d49e808..2833637007 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
@@ -73,6 +73,9 @@ public abstract class InfoSchemaRecordGenerator<S> {
    * @param visitedPaths set used to ensure same path won't be visited twice
    */
   private void scanSchemaImpl(String schemaPath, SchemaPlus schema, Set<String> visitedPaths) {
+    if (schema == null) {
+      return;
+    }
     Set<String> subSchemaNames = schema.getParentSchema() == null
       ? schema.unwrap(DynamicRootSchema.class).schema.getSubSchemaNames()
       : schema.getSubSchemaNames();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/CredentialProviderUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/CredentialProviderUtils.java
index 297c1930fd..33a475762a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/CredentialProviderUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/CredentialProviderUtils.java
@@ -35,13 +35,13 @@ public class CredentialProviderUtils {
     if (credentialsProvider != null) {
       return credentialsProvider;
     }
-    ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
-    if (username != null) {
-      mapBuilder.put(UsernamePasswordCredentials.USERNAME, username);
-    }
-    if (password != null) {
-      mapBuilder.put(UsernamePasswordCredentials.PASSWORD, password);
+    if (username == null) {
+      return PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
     }
+
+    ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
+    mapBuilder.put(UsernamePasswordCredentials.USERNAME, username);
+    mapBuilder.put(UsernamePasswordCredentials.PASSWORD, password);
     return new PlainCredentialsProvider(mapBuilder.build());
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordCredentials.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordCredentials.java
index c7f3d02f54..379930527e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordCredentials.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordCredentials.java
@@ -17,27 +17,63 @@
  */
 package org.apache.drill.exec.store.security;
 
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 
 public class UsernamePasswordCredentials {
+  private static final Logger logger = LoggerFactory.getLogger(UsernamePasswordCredentials.class);
   public static final String USERNAME = "username";
   public static final String PASSWORD = "password";
 
   private final String username;
   private final String password;
 
-  public UsernamePasswordCredentials(CredentialsProvider credentialsProvider) {
-    if (credentialsProvider == null) {
-      this.username = null;
-      this.password = null;
-    } else {
-      Map<String, String> credentials = credentialsProvider.getCredentials() == null ? new HashMap<>() : credentialsProvider.getCredentials();
-      this.username = credentials.get(USERNAME);
-      this.password = credentials.get(PASSWORD);
+  /**
+   * While a builder may seem like overkill for a class that is little more than small struct,
+   * it allows us to wrap new instances in an Optional while using contructors does not.
+   */
+  public static class Builder {
+    private CredentialsProvider credentialsProvider;
+    private String queryUser;
+
+    public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
+      this.credentialsProvider = credentialsProvider;
+      return this;
+    }
+
+    public Builder setQueryUser(String queryUser) {
+      this.queryUser = queryUser;
+      return this;
     }
+
+    public Optional<UsernamePasswordCredentials> build() {
+      if (credentialsProvider == null) {
+        return Optional.empty();
+      }
+
+      Map<String, String> credentials = queryUser != null
+        ? credentialsProvider.getCredentials(queryUser)
+        : credentialsProvider.getCredentials();
+
+      if (credentials.size() == 0) {
+        return Optional.empty();
+      }
+
+      return Optional.of(
+        new UsernamePasswordCredentials(credentials.get(USERNAME), credentials.get(PASSWORD))
+      );
+    }
+  }
+
+  public UsernamePasswordCredentials(String username, String password) {
+    this.username = username;
+    this.password = password;
   }
 
   public String getUsername() {
@@ -47,4 +83,30 @@ public class UsernamePasswordCredentials {
   public String getPassword() {
     return password;
   }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(username, password);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    UsernamePasswordCredentials that = (UsernamePasswordCredentials) o;
+    return Objects.equals(username, that.password) &&
+      Objects.equals(password, that.password);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("username", username)
+      .maskedField("password", password)
+      .toString();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordWithProxyCredentials.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordWithProxyCredentials.java
index 7ae56cc6a0..d6285a3b57 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordWithProxyCredentials.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordWithProxyCredentials.java
@@ -21,25 +21,66 @@ package org.apache.drill.exec.store.security;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.store.security.oauth.OAuthTokenCredentials;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 public class UsernamePasswordWithProxyCredentials extends UsernamePasswordCredentials {
   private final String proxyUsername;
   private final String proxyPassword;
 
-  public UsernamePasswordWithProxyCredentials(CredentialsProvider credentialsProvider) {
-    super(credentialsProvider);
-    if (credentialsProvider == null || credentialsProvider.getCredentials() == null) {
-      this.proxyUsername = null;
-      this.proxyPassword = null;
-    } else {
-      Map<String, String> credentials = credentialsProvider.getCredentials() == null ? new HashMap<>() : credentialsProvider.getCredentials();
-      this.proxyUsername = credentials.get(OAuthTokenCredentials.PROXY_USERNAME);
-      this.proxyPassword = credentials.get(OAuthTokenCredentials.PROXY_PASSWORD);
+  /**
+   * While a builder may seem like overkill for a class that is little more than small struct,
+   * it allows us to wrap new instances in an Optional while using contructors does not.
+   */
+  public static class Builder {
+    private CredentialsProvider credentialsProvider;
+    private String queryUser;
+
+    public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
+      this.credentialsProvider = credentialsProvider;
+      return this;
+    }
+
+    public Builder setQueryUser(String queryUser) {
+      this.queryUser = queryUser;
+      return this;
+    }
+
+    public Optional<UsernamePasswordWithProxyCredentials> build() {
+      if (credentialsProvider == null) {
+        return Optional.empty();
+      }
+
+      Map<String, String> credentials = queryUser != null
+        ? credentialsProvider.getCredentials(queryUser)
+        : credentialsProvider.getCredentials();
+
+      if (credentials.size() == 0) {
+        return Optional.empty();
+      }
+
+      return Optional.of(
+        new UsernamePasswordWithProxyCredentials(
+          credentials.get(USERNAME),
+          credentials.get(PASSWORD),
+          credentials.get(OAuthTokenCredentials.PROXY_USERNAME),
+          credentials.get(OAuthTokenCredentials.PROXY_PASSWORD)
+        )
+      );
     }
   }
 
+  public UsernamePasswordWithProxyCredentials(
+    String username,
+    String password,
+    String proxyUsername,
+    String proxyPassword
+  ) {
+    super(username, password);
+    this.proxyUsername = proxyUsername;
+    this.proxyPassword = proxyPassword;
+  }
+
   public String getProxyUsername() {
     return proxyUsername;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/oauth/OAuthTokenCredentials.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/oauth/OAuthTokenCredentials.java
index 516d8722f9..66b2279bb2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/oauth/OAuthTokenCredentials.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/oauth/OAuthTokenCredentials.java
@@ -22,8 +22,8 @@ import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.oauth.PersistentTokenTable;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 public class OAuthTokenCredentials extends UsernamePasswordCredentials {
 
@@ -38,27 +38,71 @@ public class OAuthTokenCredentials extends UsernamePasswordCredentials {
   private final String clientID;
   private final String clientSecret;
   private final String tokenURI;
-  private PersistentTokenTable tokenTable;
-
-  public OAuthTokenCredentials(CredentialsProvider credentialsProvider) {
-   super(credentialsProvider);
-   if (credentialsProvider == null) {
-     this.clientID = null;
-     this.clientSecret = null;
-     this.tokenURI = null;
-   } else {
-     Map<String, String> credentials = credentialsProvider.getCredentials() == null
-       ? new HashMap<>() : credentialsProvider.getCredentials();
-
-     this.clientID = credentials.getOrDefault(CLIENT_ID, null);
-     this.clientSecret = credentials.getOrDefault(CLIENT_SECRET, null);
-     this.tokenURI = credentials.getOrDefault(TOKEN_URI, null);
-   }
+  private Optional<PersistentTokenTable> tokenTable;
+
+  /**
+   * While a builder may seem like overkill for a class that is little more than small struct,
+   * it allows us to wrap new instances in an Optional while using contructors does not.
+   */
+  public static class Builder {
+    private CredentialsProvider credentialsProvider;
+    private String queryUser;
+    private PersistentTokenTable tokenTable;
+
+    public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
+      this.credentialsProvider = credentialsProvider;
+      return this;
+    }
+
+    public Builder setQueryUser(String queryUser) {
+      this.queryUser = queryUser;
+      return this;
+    }
+
+    public Builder setTokenTable(PersistentTokenTable tokenTable) {
+      this.tokenTable = tokenTable;
+      return this;
+    }
+
+    public Optional<OAuthTokenCredentials> build() {
+      if (credentialsProvider == null) {
+        return Optional.empty();
+      }
+
+      Map<String, String> credentials = queryUser != null
+        ? credentialsProvider.getCredentials(queryUser)
+        : credentialsProvider.getCredentials();
+
+      if (credentials.size() == 0) {
+        return Optional.empty();
+      }
+
+      return Optional.of(
+        new OAuthTokenCredentials(
+          credentials.get(USERNAME),
+          credentials.get(PASSWORD),
+          credentials.get(CLIENT_ID),
+          credentials.get(CLIENT_SECRET),
+          credentials.get(TOKEN_URI),
+          tokenTable
+        )
+      );
+    }
   }
 
-  public OAuthTokenCredentials(CredentialsProvider credentialsProvider, PersistentTokenTable tokenTable) {
-    this(credentialsProvider);
-    this.tokenTable = tokenTable;
+  public OAuthTokenCredentials(
+    String username,
+    String password,
+    String clientID,
+    String clientSecret,
+    String tokenURI,
+    PersistentTokenTable tokenTable
+  ) {
+    super(username, password);
+    this.clientID = clientID;
+    this.clientSecret = clientSecret;
+    this.tokenURI = tokenURI;
+    this.tokenTable = Optional.ofNullable(tokenTable);
   }
 
   public String getClientID() {
@@ -70,17 +114,11 @@ public class OAuthTokenCredentials extends UsernamePasswordCredentials {
   }
 
   public String getAccessToken() {
-    if (tokenTable == null) {
-      return null;
-    }
-    return tokenTable.getAccessToken();
+    return tokenTable.map(PersistentTokenTable::getAccessToken).orElse(null);
   }
 
   public String getRefreshToken() {
-    if (tokenTable == null) {
-      return null;
-    }
-    return tokenTable.getRefreshToken();
+    return tokenTable.map(PersistentTokenTable::getRefreshToken).orElse(null);
   }
 
   public String getTokenUri() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index c12c713f3d..f9343d3e56 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -730,7 +730,7 @@ public class Foreman implements Runnable {
             new Date(queryContext.getQueryContextInfo().getQueryStartTime()),
             new Date(System.currentTimeMillis()),
             queryStateProcessor.getState(),
-            queryContext.getSession().getCredentials().getUserName(),
+            queryContext.getQueryUserCredentials().getUserName(),
             initiatingClient.getRemoteAddress());
         queryLogger.info(MAPPER.writeValueAsString(q));
       } catch (Exception e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
index 08d1baba4e..2d2dd90589 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
@@ -40,6 +40,7 @@ import org.apache.drill.common.exceptions.ErrorHelper;
 import org.apache.drill.exec.ops.ViewExpansionContext;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.proto.UserProtos.CatalogMetadata;
 import org.apache.drill.exec.proto.UserProtos.ColumnMetadata;
 import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
@@ -602,6 +603,10 @@ public class MetadataProvider {
       public String getQueryUserName() {
         return session.getCredentials().getUserName();
       }
+
+      @Override public UserCredentials getQueryUserCredentials() {
+        return session.getCredentials();
+      }
     };
   }
 
diff --git a/exec/java-exec/src/main/resources/rest/credentials/list.ftl b/exec/java-exec/src/main/resources/rest/credentials/list.ftl
new file mode 100644
index 0000000000..8872594177
--- /dev/null
+++ b/exec/java-exec/src/main/resources/rest/credentials/list.ftl
@@ -0,0 +1,117 @@
+<#--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+
+<#include "*/generic.ftl">
+<#macro page_head>
+  <script src="/static/js/jquery.form.js"></script>
+  <!-- Ace Libraries for Syntax Formatting -->
+  <script src="/static/js/ace-code-editor/ace.js" type="text/javascript" charset="utf-8"></script>
+  <script src="/static/js/ace-code-editor/theme-eclipse.js" type="text/javascript" charset="utf-8"></script>
+  <script src="/static/js/credentialsServerMessage.js"></script>
+</#macro>
+
+<#macro page_body>
+
+<#include "*/confirmationModals.ftl">
+
+<h4 class="col-xs-6 mx-3">User Credential Management</h4>
+
+<div class="pb-2 mt-4 mb-2 border-bottom" style="margin: 5px;"></div>
+
+<div class="container-fluid">
+  <div class="row">
+    <div class="table-responsive col-sm-12 col-md-6 col-lg-5 col-xl-5">
+      <h4>Enabled Storage Plugins</h4>
+      <table class="table table-hover">
+        <tbody>
+        <#list model as pluginModel>
+          <tr>
+            <td style="border:none; max-width: 200px; overflow: hidden; text-overflow: ellipsis;">
+                ${pluginModel.getPlugin().getName()}
+            </td>
+            <td style="border:none;">
+              <button type="button" class="btn btn-primary" data-toggle="modal" data-target="#new-plugin-modal" data-plugin="${pluginModel.getPlugin().getName()}"
+                      data-username="${pluginModel.getPlugin().getUserName()}" data-password="${pluginModel.getPlugin().getPassword()}">
+                Update Credentials
+              </button>
+            </td>
+          </tr>
+        </#list>
+        </tbody>
+      </table>
+    </div>
+
+      <#--onclick="doUpdate('${pluginModel.getPlugin().getName()}')"-->
+      <#-- Modal window for creating plugin -->
+    <div class="modal fade" id="new-plugin-modal" role="dialog" aria-labelledby="configuration">
+      <div class="modal-dialog" role="document">
+        <div class="modal-content">
+          <div class="modal-header">
+            <h4 class="modal-title" id="configuration">Update Credentials</h4>
+            <button type="button" class="close" data-dismiss="modal" aria-hidden="true">&times;</button>
+          </div>
+          <div class="modal-body">
+
+            <form id="createForm" role="form" action="/credentials/update_credentials" method="POST">
+              <input type="text" class="form-control" name="username" id="usernameField" placeholder="Username" />
+              <input type="text" class="form-control" name="password" id="passwordField" placeholder="Password" />
+              <input type="hidden" name="plugin" id="plugin" />
+              <div style="text-align: right; margin: 10px">
+                <button type="button" class="btn btn-secondary" data-dismiss="modal">Close</button>
+                <button type="submit" class="btn btn-primary" onclick="doCreate()">Update Credentials</button>
+              </div>
+              <input type="hidden" name="csrfToken" value="${model[0].getCsrfToken()}">
+            </form>
+
+            <div id="message" class="d-none alert alert-info">
+            </div>
+          </div>
+        </div>
+      </div>
+    </div>
+      <#-- Modal window for creating plugin -->
+
+    <script>
+      // Populate the modal fields
+      $(function () {
+        $('#new-plugin-modal').on('show.bs.modal', function (event) {
+          var button = $(event.relatedTarget);
+          var username = button.data('username');
+          var password = button.data('password');
+          var plugin = button.data('plugin');
+
+          $('#plugin').val(plugin);
+          $('#usernameField').val(username);
+          $('#passwordField').val(password);
+        });
+      });
+
+      function doCreate() {
+        $("#createForm").ajaxForm({
+          dataType: 'json',
+          success: serverMessage,
+          error: serverMessage
+        });
+      }
+    </script>
+<#include "*/alertModals.ftl">
+</#macro>
+
+<@page_html/>
diff --git a/exec/java-exec/src/main/resources/rest/generic.ftl b/exec/java-exec/src/main/resources/rest/generic.ftl
index d91a7ed0ab..b6c2bf079b 100644
--- a/exec/java-exec/src/main/resources/rest/generic.ftl
+++ b/exec/java-exec/src/main/resources/rest/generic.ftl
@@ -67,6 +67,9 @@
             <ul class="nav navbar-nav mr-auto">
               <li class="nav-item"><a class="nav-link" href="/query">Query</a></li>
               <li class="nav-item"><a class="nav-link" href="/profiles">Profiles</a></li>
+              <#if showCredentials == true>
+              <li class="nav-item"><a class="nav-link" href="/credentials">Credentials</a></li>
+              </#if>
               <#if showStorage == true>
               <li class="nav-item"><a class="nav-link" href="/storage">Storage</a></li>
               </#if>
diff --git a/exec/java-exec/src/main/resources/rest/static/js/credentialsServerMessage.js b/exec/java-exec/src/main/resources/rest/static/js/credentialsServerMessage.js
new file mode 100644
index 0000000000..d013f2ca79
--- /dev/null
+++ b/exec/java-exec/src/main/resources/rest/static/js/credentialsServerMessage.js
@@ -0,0 +1,39 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ *  license agreements. See the NOTICE file distributed with this work for additional
+ *  information regarding copyright ownership. The ASF licenses this file to
+ *  You under the Apache License, Version 2.0 (the "License"); you may not use
+ *  this file except in compliance with the License. You may obtain a copy of
+ *  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ *  by applicable law or agreed to in writing, software distributed under the
+ *  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ *  OF ANY KIND, either express or implied. See the License for the specific
+ *  language governing permissions and limitations under the License.
+ */
+
+// Shows Json message from the server
+function serverMessage(data) {
+    const messageEl = $("#message");
+    if (data.result === "Success") {
+        messageEl.removeClass("d-none")
+            .removeClass("alert-danger")
+            .addClass("alert-info")
+            .text(data.result).alert();
+        setTimeout(function() { window.location.href = "/credentials"; }, 800);
+        return true;
+    } else {
+        const errorMessage = data.errorMessage || data.responseJSON["result"];
+
+        messageEl.addClass("d-none");
+        // Wait a fraction of a second before showing the message again. This
+        // makes it clear if a second attempt gives the same error as
+        // the first that a "new" message came back from the server
+        setTimeout(function() {
+            messageEl.removeClass("d-none")
+                .removeClass("alert-info")
+                .addClass("alert-danger")
+                .text("Please retry: " + errorMessage).alert();
+        }, 200);
+        return false;
+    }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestClassicLocator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestClassicLocator.java
index acfa60605e..a64e96f083 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestClassicLocator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestClassicLocator.java
@@ -27,7 +27,7 @@ import java.util.Collections;
 import java.util.Set;
 
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.logical.StoragePlugins;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
@@ -63,7 +63,7 @@ public class TestClassicLocator extends BasePluginRegistryTest {
 
       // Abstract classes do not appear
       assertFalse(result.contains(StoragePluginConfig.class));
-      assertFalse(result.contains(AbstractSecuredStoragePluginConfig.class));
+      assertFalse(result.contains(CredentialedStoragePluginConfig.class));
 
       // The private plugin class does not appear
       assertFalse(result.contains(StoragePluginFixtureConfig.class));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/storage/CredentialsProviderSerDeTest.java b/exec/java-exec/src/test/java/org/apache/drill/storage/CredentialsProviderSerDeTest.java
index 75adfbb3fa..b2549871f9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/storage/CredentialsProviderSerDeTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/storage/CredentialsProviderSerDeTest.java
@@ -96,12 +96,11 @@ public class CredentialsProviderSerDeTest extends ClusterTest {
     String serialized = mapper.writerFor(CredentialsProvider.class).writeValueAsString(credentialsProvider);
 
     String expected =
-        "{\n" +
+      "{\n" +
         "  \"credentialsProviderType\" : \"PlainCredentialsProvider\",\n" +
-        "  \"credentials\" : {\n" +
-        "    \"username\" : \"myLogin\",\n" +
-        "    \"password\" : \"myPass\"\n" +
-        "  }\n" +
+        "  \"credentials\" : {\n" + "    \"username\" : \"myLogin\",\n" +
+        "    \"password\" : \"myPass\"\n" + "  },\n" +
+        "  \"userCredentials\" : { }\n" +
         "}";
 
     assertEquals(expected, serialized);
@@ -124,7 +123,8 @@ public class CredentialsProviderSerDeTest extends ClusterTest {
         "  \"credentials\" : {\n" +
         "    \"username\" : \"myLogin\",\n" +
         "    \"password\" : \"myPass\"\n" +
-        "  }\n" +
+        "  }, \n" +
+        "  \"userCredentials\" : { }\n" +
         "}";
 
     CredentialsProvider deserialized = mapper.readerFor(CredentialsProvider.class).readValue(serialized);
diff --git a/logical/pom.xml b/logical/pom.xml
index 8713a4b492..e3cbfdf9c1 100644
--- a/logical/pom.xml
+++ b/logical/pom.xml
@@ -85,6 +85,24 @@
         <artifactId>joda-time</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-codec</groupId>
+          <artifactId>commons-codec</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-all</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
 
diff --git a/logical/src/main/java/org/apache/drill/common/logical/AbstractSecuredStoragePluginConfig.java b/logical/src/main/java/org/apache/drill/common/logical/CredentialedStoragePluginConfig.java
similarity index 59%
rename from logical/src/main/java/org/apache/drill/common/logical/AbstractSecuredStoragePluginConfig.java
rename to logical/src/main/java/org/apache/drill/common/logical/CredentialedStoragePluginConfig.java
index ce8d8dc373..9549f7b355 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/AbstractSecuredStoragePluginConfig.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/CredentialedStoragePluginConfig.java
@@ -19,19 +19,43 @@ package org.apache.drill.common.logical;
 
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public abstract class AbstractSecuredStoragePluginConfig extends StoragePluginConfig {
+public abstract class CredentialedStoragePluginConfig extends StoragePluginConfig {
 
-  protected final CredentialsProvider credentialsProvider;
+  private static final Logger logger = LoggerFactory.getLogger(CredentialedStoragePluginConfig.class);
   protected boolean directCredentials;
+  protected final CredentialsProvider credentialsProvider;
 
-  public AbstractSecuredStoragePluginConfig() {
+  public CredentialedStoragePluginConfig() {
     this(PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER,  true);
   }
 
-  public AbstractSecuredStoragePluginConfig(CredentialsProvider credentialsProvider, boolean directCredentials) {
+  public CredentialedStoragePluginConfig(
+    CredentialsProvider credentialsProvider,
+    boolean directCredentials
+  ) {
+    // Default auth mode for credentialed storage plugins is shared user.
+    this(credentialsProvider, directCredentials, AuthMode.SHARED_USER);
+  }
+
+  public CredentialedStoragePluginConfig(
+    CredentialsProvider credentialsProvider,
+    boolean directCredentials,
+    AuthMode authMode
+  ) {
     this.credentialsProvider = credentialsProvider;
     this.directCredentials = directCredentials;
+    this.authMode = authMode;
+  }
+
+  public abstract CredentialedStoragePluginConfig updateCredentialProvider(CredentialsProvider credentialsProvider);
+
+  @Override
+  public boolean isEnabled() {
+    logger.debug("Enabled status");
+    return super.isEnabled();
   }
 
   public CredentialsProvider getCredentialsProvider() {
diff --git a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java b/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
index 390e2a8491..32154fa394 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
@@ -22,13 +22,17 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
 public abstract class StoragePluginConfig {
 
   // DO NOT include enabled status in equality and hash
   // comparisons; doing so will break the plugin registry.
-  private Boolean enabled;
+  protected Boolean enabled;
+
+  protected AuthMode authMode;
 
   /**
    * Check for enabled status of the plugin
@@ -43,6 +47,14 @@ public abstract class StoragePluginConfig {
     this.enabled = enabled;
   }
 
+  public AuthMode getAuthMode() {
+    return authMode;
+  }
+
+  public void setAuthMode(AuthMode authMode) {
+    this.authMode = authMode;
+  }
+
   /**
    * Allows to check whether the enabled status is present in config
    *
@@ -62,4 +74,39 @@ public abstract class StoragePluginConfig {
   public String getValue(String key) {
     return null;
   }
+
+  /**
+   * The standardised authentication modes that storage plugins may offer.
+   */
+  public enum AuthMode {
+    /**
+     * Default. Connects using the identity of the Drill cluster (OS user or
+     * service principal) if the external storage is aware of said identity,
+     * otherwise connects without authentication. Unaffected by the Drill
+     * query user's identity.
+     */
+    DRILL_PROCESS,
+    /**
+     * Connects using a single set of shared credentials stored in some
+     * credential provider.  Unaffected by the Drill query user's identity.
+     */
+    SHARED_USER,
+    /**
+     * Depending on the plugin, connects using one of the two modes above then
+     * instructs the external storage to set the identity on the connection
+     * to that of the Drill query user.  User identity in the external system
+     * will match the Drill query user's identity.
+     */
+    USER_IMPERSONATION,
+    /**
+     * Connects with stored credentials looked up for (translated from)
+     * the Drill query user.  User identity in the external system will be
+     * a function of the Drill query user's identity (1-1 or *-1) .
+     */
+    USER_TRANSLATION;
+
+    public static AuthMode parseOrDefault(String authMode) {
+      return !Strings.isNullOrEmpty(authMode) ? AuthMode.valueOf(authMode.toUpperCase()) : DRILL_PROCESS;
+    }
+  }
 }
diff --git a/logical/src/main/java/org/apache/drill/common/logical/security/CredentialsProvider.java b/logical/src/main/java/org/apache/drill/common/logical/security/CredentialsProvider.java
index 5c82edf69e..ba38ad0ab7 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/security/CredentialsProvider.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/security/CredentialsProvider.java
@@ -19,6 +19,7 @@ package org.apache.drill.common.logical.security;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.drill.common.exceptions.UserException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,15 +29,33 @@ import java.util.Map;
  * Provider of authentication credentials.
  */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
-    property = "credentialsProviderType",
-    defaultImpl = PlainCredentialsProvider.class)
+  property = "credentialsProviderType",
+  defaultImpl = PlainCredentialsProvider.class)
 public interface CredentialsProvider {
+  Logger logger = LoggerFactory.getLogger(CredentialsProvider.class);
   /**
    * Returns map with authentication credentials. Key is the credential name, for example {@code "username"}
    * and map value is corresponding credential value.
    */
-  Logger logger = LoggerFactory.getLogger(CredentialsProvider.class);
-
   @JsonIgnore
   Map<String, String> getCredentials();
+
+  /**
+   * This method returns the credentials associated with a specific user.
+   * @param username The logged in username
+   * @return A Map of the logged in user's credentials.
+   */
+  @JsonIgnore
+  default Map<String, String> getCredentials(String username) {
+    throw UserException.unsupportedError()
+      .message("%s does not support per-user credentials.", getClass())
+      .build(logger);
+  }
+
+  @JsonIgnore
+  default void setUserCredentials(String username, String password, String queryUser) {
+    throw UserException.unsupportedError()
+      .message("%s does not support per-user credentials.", getClass())
+      .build(logger);
+  }
 }
diff --git a/logical/src/main/java/org/apache/drill/common/logical/security/PlainCredentialsProvider.java b/logical/src/main/java/org/apache/drill/common/logical/security/PlainCredentialsProvider.java
index 8427b63fcc..5624b39276 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/security/PlainCredentialsProvider.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/security/PlainCredentialsProvider.java
@@ -19,13 +19,19 @@ package org.apache.drill.common.logical.security;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
 import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.drill.common.PlanStringBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 /**
  * Implementation of {@link CredentialsProvider} that holds credentials provided by user.
@@ -35,21 +41,70 @@ import java.util.Objects;
 public class PlainCredentialsProvider implements CredentialsProvider {
   private static final Logger logger = LoggerFactory.getLogger(PlainCredentialsProvider.class);
   public static final CredentialsProvider EMPTY_CREDENTIALS_PROVIDER =
-      new PlainCredentialsProvider(Collections.emptyMap());
+    new PlainCredentialsProvider(Collections.emptyMap());
 
   private final Map<String, String> credentials;
+  private final Map<String, Map<String, String>> userCredentials;
+
+  public PlainCredentialsProvider(Map<String, String> credentials) {
+   this(credentials, new HashMap<>());
+  }
 
   @JsonCreator
-  public PlainCredentialsProvider(@JsonProperty("credentials") Map<String, String> credentials) {
-    this.credentials = credentials;
+  public PlainCredentialsProvider(
+    @JsonProperty("credentials") Map<String, String> credentials,
+    @JsonProperty("userCredentials") Map<String, Map<String, String>> userCredentials
+  ) {
+    this.credentials = Optional.ofNullable(credentials).orElse(new HashMap<>());
+    this.userCredentials = Optional.ofNullable(userCredentials).orElse(new HashMap<>());
+  }
+
+  @JsonIgnore
+  public PlainCredentialsProvider(String username, Map<String, String> credentials) {
+    this.credentials = new HashMap<>();
+    this.userCredentials = new HashMap<>();
+    userCredentials.put(username,credentials);
   }
 
   @Override
   @JsonIgnore(false)
-  public Map<String, String> getCredentials() {
+  @JsonProperty("credentials") public Map<String, String> getCredentials() {
     return credentials;
   }
 
+  @JsonProperty("userCredentials")
+  @JsonInclude(Include.NON_NULL)
+  public Map<String, Map<String, String>> getUserCredentials() {
+    return userCredentials;
+  }
+
+  /**
+   * Returns the credentials for a given query user.  If that user does not have credentials,
+   * the function will add an entry for that user with keys username, password which are both null.
+   * @param queryUser A String of the currently logged in user
+   * @return A Map of the active user's credentials
+   */
+  @Override
+  public Map<String, String> getCredentials(String queryUser) {
+    assert queryUser != null;
+    logger.debug("Getting credentials for query user {}", queryUser);
+
+    return userCredentials.getOrDefault(queryUser, new HashMap<>());
+  }
+
+  @Override
+  public void setUserCredentials(String username, String password, String queryUser) {
+    assert queryUser != null;
+    logger.debug("Setting credentials for query user {}", queryUser);
+
+    Map<String, String> creds = userCredentials.computeIfAbsent(
+      queryUser,
+      c -> new HashMap<String, String>()
+    );
+    creds.put("username", username);
+    creds.put("password", password);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -59,11 +114,20 @@ public class PlainCredentialsProvider implements CredentialsProvider {
       return false;
     }
     PlainCredentialsProvider that = (PlainCredentialsProvider) o;
-    return Objects.equals(credentials, that.credentials);
+    return Objects.equals(credentials, that.credentials) &&
+      Objects.equals(userCredentials, that.userCredentials);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(credentials);
+    return Objects.hash(credentials, userCredentials);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("credentials", credentials)
+      .field("userCredentials", userCredentials)
+      .toString();
   }
 }