You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2021/02/15 17:36:28 UTC

[drill] branch master updated: DRILL-7855: Provide a secure mechanism for specifying storage plugin credentials

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new fa5cf38  DRILL-7855: Provide a secure mechanism for specifying storage plugin credentials
fa5cf38 is described below

commit fa5cf38b05de5a240d0a727b3a9df260371412b1
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Sat Feb 6 19:44:48 2021 +0200

    DRILL-7855: Provide a secure mechanism for specifying storage plugin credentials
---
 common/src/main/resources/drill-module.conf        |   3 +-
 .../store/cassandra/CassandraStorageConfig.java    |  51 ++++--
 .../exec/store/cassandra/BaseCassandraTest.java    |   3 +-
 .../exec/store/druid/DruidStoragePluginConfig.java |   4 +-
 .../elasticsearch/ElasticsearchStorageConfig.java  |  41 +++--
 .../elasticsearch/ElasticComplexTypesTest.java     |   3 +-
 .../store/elasticsearch/ElasticInfoSchemaTest.java |   3 +-
 .../store/elasticsearch/ElasticSearchPlanTest.java |   3 +-
 .../elasticsearch/ElasticSearchQueryTest.java      |   3 +-
 .../exec/store/hbase/HBaseStoragePluginConfig.java |   4 +-
 .../exec/store/hive/HiveStoragePluginConfig.java   |   4 +-
 .../drill/exec/store/http/HttpBatchReader.java     |   6 +-
 .../exec/store/http/HttpStoragePluginConfig.java   |  69 +++++---
 .../drill/exec/store/http/TestHttpPlugin.java      |   7 +-
 .../drill/exec/store/jdbc/JdbcStorageConfig.java   |  38 ++--
 .../drill/exec/store/jdbc/JdbcStoragePlugin.java   |   6 +-
 .../drill/exec/store/jdbc/TestDataSource.java      |  10 +-
 .../exec/store/jdbc/TestJdbcPluginWithH2IT.java    |   3 +-
 .../exec/store/jdbc/TestJdbcPluginWithMySQLIT.java |   4 +-
 .../exec/store/kudu/KuduStoragePluginConfig.java   |   4 +-
 .../drill/exec/store/mongo/MongoStoragePlugin.java |  34 ++--
 .../exec/store/mongo/MongoStoragePluginConfig.java |  15 +-
 .../drill/exec/store/mongo/MongoTestBase.java      |   4 +-
 ...TestMongoStoragePluginUsesCredentialsStore.java |   6 +-
 .../openTSDB/OpenTSDBStoragePluginConfig.java      |   4 +-
 .../drill/exec/store/splunk/SplunkConnection.java  |  16 +-
 .../exec/store/splunk/SplunkPluginConfig.java      |  40 +++--
 .../exec/store/splunk/SplunkConnectionTest.java    |   3 +-
 .../drill/exec/store/splunk/SplunkTestSuite.java   |   2 +-
 docs/dev/PluginCredentialsProvider.md              | 164 ++++++++++++++++++
 exec/java-exec/pom.xml                             |  11 ++
 .../drill/exec/planner/PhysicalPlanReader.java     |   1 +
 .../drill/exec/server/rest/StorageResources.java   |   2 +-
 .../drill/exec/store/dfs/FileSystemConfig.java     |  39 ++++-
 .../drill/exec/store/dfs/FileSystemPlugin.java     |   5 +
 .../exec/store/mock/MockStorageEngineConfig.java   |   4 +-
 .../store/security/CredentialProviderUtils.java    |  46 +++++
 .../EnvCredentialsProvider.java}                   |  50 +++---
 .../store/security/HadoopCredentialsProvider.java  |  88 ++++++++++
 .../security/UsernamePasswordCredentials.java      |  27 ++-
 .../security/vault/VaultCredentialsProvider.java   | 116 +++++++++++++
 .../java-exec/src/main/resources/drill-module.conf |   3 +-
 .../exec/impersonation/BaseTestImpersonation.java  |   3 +-
 .../java/org/apache/drill/exec/sql/TestCTTAS.java  |   4 +-
 .../org/apache/drill/exec/sql/TestInfoSchema.java  |   1 -
 .../drill/exec/store/BasePluginRegistryTest.java   |   4 +-
 .../drill/exec/store/TestClassicLocator.java       |   4 +-
 .../drill/exec/store/TestPluginRegistry.java       |   7 +-
 .../drill/exec/util/StoragePluginTestUtils.java    |   7 +-
 .../CredentialsProviderImplementationsTest.java    | 117 +++++++++++++
 .../storage/CredentialsProviderSerDeTest.java      | 191 +++++++++++++++++++++
 .../java/org/apache/drill/test/ClusterFixture.java |   4 +-
 exec/vector/pom.xml                                |   4 +
 .../apache/drill/exec/util/JsonStringHashMap.java  |  10 +-
 .../common/config/LogicalPlanPersistence.java      |   7 +
 .../AbstractSecuredStoragePluginConfig.java        |  44 +++++
 .../drill/common/logical/StoragePluginConfig.java  |   2 +
 .../CredentialsProvider.java}                      |  21 ++-
 .../logical/security/PlainCredentialsProvider.java |  46 ++---
 metastore/metastore-api/pom.xml                    |   1 -
 pom.xml                                            |   7 +-
 61 files changed, 1208 insertions(+), 225 deletions(-)

diff --git a/common/src/main/resources/drill-module.conf b/common/src/main/resources/drill-module.conf
index bbccf19..4c5e0ef 100644
--- a/common/src/main/resources/drill-module.conf
+++ b/common/src/main/resources/drill-module.conf
@@ -24,7 +24,8 @@ drill {
     base.classes : ${?drill.classpath.scanning.base.classes} [
       org.apache.drill.common.logical.data.LogicalOperator,
       org.apache.drill.common.logical.FormatPluginConfig,
-      org.apache.drill.common.logical.StoragePluginConfig
+      org.apache.drill.common.logical.StoragePluginConfig,
+      org.apache.drill.common.logical.security.CredentialsProvider
     ],
 
     packages : ${?drill.classpath.scanning.packages} [
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
index 837bb58..26ed2a0 100644
--- a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
@@ -21,19 +21,20 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.exec.store.security.CredentialProviderUtils;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
 @JsonTypeName(CassandraStorageConfig.NAME)
-public class CassandraStorageConfig extends StoragePluginConfig {
+public class CassandraStorageConfig extends AbstractSecuredStoragePluginConfig {
   public static final String NAME = "cassandra";
 
   private final String host;
-  private final String username;
-  private final String password;
   private final int port;
 
   @JsonCreator
@@ -41,10 +42,11 @@ public class CassandraStorageConfig extends StoragePluginConfig {
       @JsonProperty("host") String host,
       @JsonProperty("port") int port,
       @JsonProperty("username") String username,
-      @JsonProperty("password") String password) {
+      @JsonProperty("password") String password,
+      @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
+    super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider),
+        credentialsProvider == null);
     this.host = host;
-    this.username = username;
-    this.password = password;
     this.port = port;
   }
 
@@ -52,26 +54,38 @@ public class CassandraStorageConfig extends StoragePluginConfig {
     return host;
   }
 
-  public String getUsername() {
-    return username;
+  public int getPort() {
+    return port;
   }
 
-  public String getPassword() {
-    return password;
+  @JsonIgnore
+  public UsernamePasswordCredentials getUsernamePasswordCredentials() {
+    return new UsernamePasswordCredentials(credentialsProvider);
   }
 
-  public int getPort() {
-    return port;
+  public String getUsername() {
+    if (directCredentials) {
+      return getUsernamePasswordCredentials().getUsername();
+    }
+    return null;
+  }
+
+  public String getPassword() {
+    if (directCredentials) {
+      return getUsernamePasswordCredentials().getPassword();
+    }
+    return null;
   }
 
   @JsonIgnore
   public Map<String, Object> toConfigMap() {
-    Map<String, Object> result = new HashMap<>();
+    UsernamePasswordCredentials credentials = getUsernamePasswordCredentials();
 
+    Map<String, Object> result = new HashMap<>();
     result.put("host", host);
     result.put("port", port);
-    result.put("username", username);
-    result.put("password", password);
+    result.put("username", credentials.getUsername());
+    result.put("password", credentials.getPassword());
     return result;
   }
 
@@ -85,12 +99,11 @@ public class CassandraStorageConfig extends StoragePluginConfig {
     }
     CassandraStorageConfig that = (CassandraStorageConfig) o;
     return Objects.equals(host, that.host)
-        && Objects.equals(username, that.username)
-        && Objects.equals(password, that.password);
+        && Objects.equals(credentialsProvider, that.credentialsProvider);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(host, username, password);
+    return Objects.hash(host, credentialsProvider);
   }
 }
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
index 225873b..a8eb40f 100644
--- a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
@@ -38,7 +38,8 @@ public class BaseCassandraTest extends ClusterTest {
         cassandra.getHost(),
         cassandra.getMappedPort(CassandraContainer.CQL_PORT),
         cassandra.getUsername(),
-        cassandra.getPassword());
+        cassandra.getPassword(),
+        null);
     config.setEnabled(true);
     cluster.defineStoragePlugin("cassandra", config);
   }
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
old mode 100755
new mode 100644
index 1477117..6bf13a1
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
@@ -20,14 +20,14 @@ package org.apache.drill.exec.store.druid;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.apache.drill.common.logical.StoragePluginConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
 
 @JsonTypeName(DruidStoragePluginConfig.NAME)
-public class DruidStoragePluginConfig extends StoragePluginConfigBase {
+public class DruidStoragePluginConfig extends StoragePluginConfig {
 
   private static final Logger logger = LoggerFactory.getLogger(DruidStoragePluginConfig.class);
   public static final String NAME = "druid";
diff --git a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
index a5a5be4..9e84635 100644
--- a/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
+++ b/contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
@@ -24,7 +24,10 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
-import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.exec.store.security.CredentialProviderUtils;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 
 import java.util.List;
@@ -32,23 +35,21 @@ import java.util.Map;
 import java.util.Objects;
 
 @JsonTypeName(ElasticsearchStorageConfig.NAME)
-public class ElasticsearchStorageConfig extends StoragePluginConfig {
+public class ElasticsearchStorageConfig extends AbstractSecuredStoragePluginConfig {
   public static final String NAME = "elastic";
 
   private static final ObjectWriter OBJECT_WRITER = new ObjectMapper().writerFor(List.class);
 
   private final List<String> hosts;
-  private final String username;
-  private final String password;
 
   @JsonCreator
   public ElasticsearchStorageConfig(
       @JsonProperty("hosts") List<String> hosts,
       @JsonProperty("username") String username,
-      @JsonProperty("password") String password) {
+      @JsonProperty("password") String password,
+      @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
+    super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider), credentialsProvider == null);
     this.hosts = hosts;
-    this.username = username;
-    this.password = password;
   }
 
   public List<String> getHosts() {
@@ -56,11 +57,22 @@ public class ElasticsearchStorageConfig extends StoragePluginConfig {
   }
 
   public String getUsername() {
-    return username;
+    if (directCredentials) {
+      return getUsernamePasswordCredentials().getUsername();
+    }
+    return null;
   }
 
   public String getPassword() {
-    return password;
+    if (directCredentials) {
+      return getUsernamePasswordCredentials().getPassword();
+    }
+    return null;
+  }
+
+  @JsonIgnore
+  public UsernamePasswordCredentials getUsernamePasswordCredentials() {
+    return new UsernamePasswordCredentials(credentialsProvider);
   }
 
   @JsonIgnore
@@ -68,10 +80,8 @@ public class ElasticsearchStorageConfig extends StoragePluginConfig {
       throws JsonProcessingException {
     ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
     builder.put("hosts", OBJECT_WRITER.writeValueAsString(hosts));
-    if (username != null) {
-      builder.put("username", username)
-          .put("password", password);
-    }
+
+    builder.putAll(credentialsProvider.getCredentials());
     return builder.build();
   }
 
@@ -85,12 +95,11 @@ public class ElasticsearchStorageConfig extends StoragePluginConfig {
     }
     ElasticsearchStorageConfig that = (ElasticsearchStorageConfig) o;
     return Objects.equals(hosts, that.hosts)
-        && Objects.equals(username, that.username)
-        && Objects.equals(password, that.password);
+        && Objects.equals(credentialsProvider, that.credentialsProvider);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(hosts, username, password);
+    return Objects.hash(hosts, credentialsProvider);
   }
 }
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java
index 75192e5..43d4681 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticComplexTypesTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.elasticsearch;
 
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
@@ -56,7 +57,7 @@ public class ElasticComplexTypesTest extends ClusterTest {
     startCluster(ClusterFixture.builder(dirTestWatcher));
 
     ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
-        Collections.singletonList(HOST), null, null);
+        Collections.singletonList(HOST), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     config.setEnabled(true);
     cluster.defineStoragePlugin("elastic", config);
 
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java
index 2bafc11..cad11bb 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticInfoSchemaTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.elasticsearch;
 
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.apache.http.HttpHost;
@@ -51,7 +52,7 @@ public class ElasticInfoSchemaTest extends ClusterTest {
     startCluster(ClusterFixture.builder(dirTestWatcher));
 
     ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
-        Collections.singletonList(HOST), null, null);
+        Collections.singletonList(HOST), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     config.setEnabled(true);
     cluster.defineStoragePlugin("elastic", config);
 
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
index 89c0410..74773af 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.elasticsearch;
 
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.apache.http.HttpHost;
@@ -49,7 +50,7 @@ public class ElasticSearchPlanTest extends ClusterTest {
     startCluster(ClusterFixture.builder(dirTestWatcher));
 
     ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
-        Collections.singletonList(HOST), null, null);
+        Collections.singletonList(HOST), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     config.setEnabled(true);
     cluster.defineStoragePlugin("elastic", config);
 
diff --git a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java
index f7b1738..4fa35a8 100644
--- a/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java
+++ b/contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchQueryTest.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.elasticsearch;
 
 import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.apache.http.HttpHost;
@@ -56,7 +57,7 @@ public class ElasticSearchQueryTest extends ClusterTest {
     startCluster(ClusterFixture.builder(dirTestWatcher));
 
     ElasticsearchStorageConfig config = new ElasticsearchStorageConfig(
-        Collections.singletonList(HOST), null, null);
+        Collections.singletonList(HOST), null, null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     config.setEnabled(true);
     cluster.defineStoragePlugin("elastic", config);
 
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
index 46e9720..ac7186c 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.store.hbase;
 
 import java.util.Map;
 
-import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -35,7 +35,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 
 @JsonTypeName(HBaseStoragePluginConfig.NAME)
-public class HBaseStoragePluginConfig extends StoragePluginConfigBase implements DrillHBaseConstants {
+public class HBaseStoragePluginConfig extends StoragePluginConfig implements DrillHBaseConstants {
   private static final Logger logger = LoggerFactory.getLogger(HBaseStoragePluginConfig.class);
 
   private Map<String, String> config;
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
index d812468..983304c 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
@@ -20,14 +20,14 @@ package org.apache.drill.exec.store.hive;
 import java.util.Map;
 
 import com.fasterxml.jackson.annotation.JsonAlias;
-import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.apache.drill.common.logical.StoragePluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName(HiveStoragePluginConfig.NAME)
-public class HiveStoragePluginConfig extends StoragePluginConfigBase {
+public class HiveStoragePluginConfig extends StoragePluginConfig {
 
   public static final String NAME = "hive";
 
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index 83058e2..4ac41fe 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBui
 import org.apache.drill.exec.store.http.util.HttpProxyConfig;
 import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyBuilder;
 import org.apache.drill.exec.store.http.util.SimpleHttp;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 
 import java.io.File;
 import java.io.InputStream;
@@ -132,12 +133,13 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
         .fromConfigForURL(drillConfig, url.toString());
     final String proxyType = config.proxyType();
     if (proxyType != null && !"direct".equals(proxyType)) {
+      UsernamePasswordCredentials credentials = config.getUsernamePasswordCredentials();
       builder
         .type(config.proxyType())
         .host(config.proxyHost())
         .port(config.proxyPort())
-        .username(config.proxyUsername())
-        .password(config.proxyPassword());
+        .username(credentials.getUsername())
+        .password(credentials.getPassword());
     }
     return builder.build();
   }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
index 35e5654..280895d 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
@@ -20,12 +20,15 @@ package org.apache.drill.exec.store.http;
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.map.CaseInsensitiveMap;
-import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.store.security.CredentialProviderUtils;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +38,7 @@ import java.util.Objects;
 
 
 @JsonTypeName(HttpStoragePluginConfig.NAME)
-public class HttpStoragePluginConfig extends StoragePluginConfigBase {
+public class HttpStoragePluginConfig extends AbstractSecuredStoragePluginConfig {
   private static final Logger logger = LoggerFactory.getLogger(HttpStoragePluginConfig.class);
   public static final String NAME = "http";
   public final Map<String, HttpApiConfig> connections;
@@ -43,8 +46,6 @@ public class HttpStoragePluginConfig extends StoragePluginConfigBase {
   public final String proxyHost;
   public final int proxyPort;
   public final String proxyType;
-  public final String proxyUsername;
-  public final String proxyPassword;
   /**
    * Timeout in seconds.
    */
@@ -58,9 +59,12 @@ public class HttpStoragePluginConfig extends StoragePluginConfigBase {
                                  @JsonProperty("proxyPort") Integer proxyPort,
                                  @JsonProperty("proxyType") String proxyType,
                                  @JsonProperty("proxyUsername") String proxyUsername,
-                                 @JsonProperty("proxyPassword") String proxyPassword
+                                 @JsonProperty("proxyPassword") String proxyPassword,
+                                 @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider
                                  ) {
-    this.cacheResults = cacheResults == null ? false : cacheResults;
+    super(CredentialProviderUtils.getCredentialsProvider(normalize(proxyUsername), normalize(proxyPassword), credentialsProvider),
+        credentialsProvider == null);
+    this.cacheResults = cacheResults != null && cacheResults;
 
     this.connections = CaseInsensitiveMap.newHashMap();
     if (connections != null) {
@@ -70,25 +74,21 @@ public class HttpStoragePluginConfig extends StoragePluginConfigBase {
     this.timeout = timeout == null ? 0 : timeout;
     this.proxyHost = normalize(proxyHost);
     this.proxyPort = proxyPort == null ? 0 : proxyPort;
-    this.proxyUsername = normalize(proxyUsername);
-    this.proxyPassword = normalize(proxyPassword);
     proxyType = normalize(proxyType);
     this.proxyType = proxyType == null
         ? "direct" : proxyType.trim().toLowerCase();
 
     // Validate Proxy Type
-    if (this.proxyType != null) {
-      switch (this.proxyType) {
-        case "direct":
-        case "http":
-        case "socks":
-          break;
-        default:
-          throw UserException
-            .validationError()
-            .message("Invalid Proxy Type: %s.  Drill supports 'direct', 'http' and 'socks' proxies.", proxyType)
-            .build(logger);
-      }
+    switch (this.proxyType) {
+      case "direct":
+      case "http":
+      case "socks":
+        break;
+      default:
+        throw UserException
+          .validationError()
+          .message("Invalid Proxy Type: %s.  Drill supports 'direct', 'http' and 'socks' proxies.", proxyType)
+          .build(logger);
     }
   }
 
@@ -107,7 +107,7 @@ public class HttpStoragePluginConfig extends StoragePluginConfigBase {
   public HttpStoragePluginConfig copyForPlan(String connectionName) {
     return new HttpStoragePluginConfig(
         cacheResults, configFor(connectionName), timeout,
-        proxyHost, proxyPort, proxyType, proxyUsername, proxyPassword);
+        proxyHost, proxyPort, proxyType, null, null, credentialsProvider);
   }
 
   private Map<String, HttpApiConfig> configFor(String connectionName) {
@@ -129,8 +129,7 @@ public class HttpStoragePluginConfig extends StoragePluginConfigBase {
            Objects.equals(proxyHost, thatConfig.proxyHost) &&
            Objects.equals(proxyPort, thatConfig.proxyPort) &&
            Objects.equals(proxyType, thatConfig.proxyType) &&
-           Objects.equals(proxyUsername, thatConfig.proxyUsername) &&
-           Objects.equals(proxyPassword, thatConfig.proxyPassword);
+           Objects.equals(credentialsProvider, thatConfig.credentialsProvider);
   }
 
   @Override
@@ -141,8 +140,7 @@ public class HttpStoragePluginConfig extends StoragePluginConfigBase {
       .field("timeout", timeout)
       .field("proxyHost", proxyHost)
       .field("proxyPort", proxyPort)
-      .field("proxyUsername", proxyUsername)
-      .maskedField("proxyPassword", proxyPassword)
+      .field("credentialsProvider", credentialsProvider)
       .field("proxyType", proxyType)
       .toString();
   }
@@ -150,7 +148,7 @@ public class HttpStoragePluginConfig extends StoragePluginConfigBase {
   @Override
   public int hashCode() {
     return Objects.hash(connections, cacheResults, timeout,
-        proxyHost, proxyPort, proxyType, proxyUsername, proxyPassword);
+        proxyHost, proxyPort, proxyType, credentialsProvider);
   }
 
   @JsonProperty("cacheResults")
@@ -169,10 +167,20 @@ public class HttpStoragePluginConfig extends StoragePluginConfigBase {
   public int proxyPort() { return proxyPort; }
 
   @JsonProperty("proxyUsername")
-  public String proxyUsername() { return proxyUsername; }
+  public String proxyUsername() {
+    if (directCredentials) {
+      return getUsernamePasswordCredentials().getUsername();
+    }
+    return null;
+  }
 
   @JsonProperty("proxyPassword")
-  public String proxyPassword() { return proxyPassword; }
+  public String proxyPassword() {
+    if (directCredentials) {
+      return getUsernamePasswordCredentials().getPassword();
+    }
+    return null;
+  }
 
   @JsonProperty("proxyType")
   public String proxyType() { return proxyType; }
@@ -181,4 +189,9 @@ public class HttpStoragePluginConfig extends StoragePluginConfigBase {
   public HttpApiConfig getConnection(String connectionName) {
     return connections.get(connectionName);
   }
+
+  @JsonIgnore
+  public UsernamePasswordCredentials getUsernamePasswordCredentials() {
+    return new UsernamePasswordCredentials(credentialsProvider);
+  }
 }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index fe45c41..934d12a 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 import org.apache.drill.shaded.guava.com.google.common.io.Files;
 import org.apache.drill.test.ClusterFixture;
@@ -100,7 +101,8 @@ public class TestHttpPlugin extends ClusterTest {
     configs.put("sunrise", sunriseConfig);
     configs.put("sunrise2", sunriseWithParamsConfig);
 
-    HttpStoragePluginConfig mockStorageConfigWithWorkspace = new HttpStoragePluginConfig(false, configs, 10, "", 80, "", "", "");
+    HttpStoragePluginConfig mockStorageConfigWithWorkspace =
+        new HttpStoragePluginConfig(false, configs, 10, "", 80, "", "", "", PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("live", mockStorageConfigWithWorkspace);
   }
@@ -145,7 +147,8 @@ public class TestHttpPlugin extends ClusterTest {
     configs.put("mockcsv", mockCsvConfig);
     configs.put("mockxml", mockXmlConfig);
 
-    HttpStoragePluginConfig mockStorageConfigWithWorkspace = new HttpStoragePluginConfig(false, configs, 2, "", 80, "", "", "");
+    HttpStoragePluginConfig mockStorageConfigWithWorkspace =
+        new HttpStoragePluginConfig(false, configs, 2, "", 80, "", "", "", PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
   }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java
old mode 100755
new mode 100644
index 9124c40..edb3bd0
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java
@@ -22,22 +22,24 @@ import java.util.Map;
 import java.util.Objects;
 
 import com.fasterxml.jackson.annotation.JsonFilter;
-import org.apache.drill.common.logical.StoragePluginConfig;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.exec.store.security.CredentialProviderUtils;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 
 @JsonTypeName(JdbcStorageConfig.NAME)
 @JsonFilter("passwordFilter")
-public class JdbcStorageConfig extends StoragePluginConfig {
+public class JdbcStorageConfig extends AbstractSecuredStoragePluginConfig {
 
   public static final String NAME = "jdbc";
 
   private final String driver;
   private final String url;
-  private final String username;
-  private final String password;
   private final boolean caseInsensitiveTableNames;
   private final Map<String, Object> sourceParameters;
 
@@ -48,11 +50,11 @@ public class JdbcStorageConfig extends StoragePluginConfig {
       @JsonProperty("username") String username,
       @JsonProperty("password") String password,
       @JsonProperty("caseInsensitiveTableNames") boolean caseInsensitiveTableNames,
-      @JsonProperty("sourceParameters") Map<String, Object> sourceParameters) {
+      @JsonProperty("sourceParameters") Map<String, Object> sourceParameters,
+      @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
+    super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider), credentialsProvider == null);
     this.driver = driver;
     this.url = url;
-    this.username = username;
-    this.password = password;
     this.caseInsensitiveTableNames = caseInsensitiveTableNames;
     this.sourceParameters = sourceParameters == null ? Collections.emptyMap() : sourceParameters;
   }
@@ -66,11 +68,17 @@ public class JdbcStorageConfig extends StoragePluginConfig {
   }
 
   public String getUsername() {
-    return username;
+    if (directCredentials) {
+      return getUsernamePasswordCredentials().getUsername();
+    }
+    return null;
   }
 
   public String getPassword() {
-    return password;
+    if (directCredentials) {
+      return getUsernamePasswordCredentials().getPassword();
+    }
+    return null;
   }
 
   @JsonProperty("caseInsensitiveTableNames")
@@ -82,9 +90,14 @@ public class JdbcStorageConfig extends StoragePluginConfig {
     return sourceParameters;
   }
 
+  @JsonIgnore
+  public UsernamePasswordCredentials getUsernamePasswordCredentials() {
+    return new UsernamePasswordCredentials(credentialsProvider);
+  }
+
   @Override
   public int hashCode() {
-    return Objects.hash(driver, url, username, password, caseInsensitiveTableNames, sourceParameters);
+    return Objects.hash(driver, url, caseInsensitiveTableNames, sourceParameters, credentialsProvider);
   }
 
   @Override
@@ -99,8 +112,7 @@ public class JdbcStorageConfig extends StoragePluginConfig {
     return caseInsensitiveTableNames == that.caseInsensitiveTableNames &&
         Objects.equals(driver, that.driver) &&
         Objects.equals(url, that.url) &&
-        Objects.equals(username, that.username) &&
-        Objects.equals(password, that.password) &&
-        Objects.equals(sourceParameters, that.sourceParameters);
+        Objects.equals(sourceParameters, that.sourceParameters) &&
+        Objects.equals(credentialsProvider, that.credentialsProvider);
   }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
old mode 100755
new mode 100644
index e2945cc..f707f30
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -112,8 +113,9 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
 
       hikariConfig.setDriverClassName(config.getDriver());
       hikariConfig.setJdbcUrl(config.getUrl());
-      hikariConfig.setUsername(config.getUsername());
-      hikariConfig.setPassword(config.getPassword());
+      UsernamePasswordCredentials credentials = config.getUsernamePasswordCredentials();
+      hikariConfig.setUsername(credentials.getUsername());
+      hikariConfig.setPassword(credentials.getPassword());
 
       return new HikariDataSource(hikariConfig);
     } catch (RuntimeException e) {
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestDataSource.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestDataSource.java
index f4fa9f7..196111b 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestDataSource.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestDataSource.java
@@ -53,7 +53,7 @@ public class TestDataSource extends BaseTest {
   @Test
   public void testInitWithoutUserAndPassword() {
     JdbcStorageConfig config = new JdbcStorageConfig(
-      DRIVER, url, null, null, false, null);
+      DRIVER, url, null, null, false, null, null);
     try (HikariDataSource dataSource = JdbcStoragePlugin.initDataSource(config)) {
       assertEquals(DRIVER, dataSource.getDriverClassName());
       assertEquals(url, dataSource.getJdbcUrl());
@@ -65,7 +65,7 @@ public class TestDataSource extends BaseTest {
   @Test
   public void testInitWithUserAndPassword() {
     JdbcStorageConfig config = new JdbcStorageConfig(
-      DRIVER, url, "user", "password", false, null);
+      DRIVER, url, "user", "password", false, null, null);
     try (HikariDataSource dataSource = JdbcStoragePlugin.initDataSource(config)) {
       assertEquals("user", dataSource.getUsername());
       assertEquals("password", dataSource.getPassword());
@@ -81,7 +81,7 @@ public class TestDataSource extends BaseTest {
     sourceParameters.put("dataSource.cachePrepStmts", true);
     sourceParameters.put("dataSource.prepStmtCacheSize", 250);
     JdbcStorageConfig config = new JdbcStorageConfig(
-      DRIVER, url, "user", "password", false, sourceParameters);
+      DRIVER, url, "user", "password", false, sourceParameters, null);
     try (HikariDataSource dataSource = JdbcStoragePlugin.initDataSource(config)) {
       assertEquals(5, dataSource.getMinimumIdle());
       assertFalse(dataSource.isAutoCommit());
@@ -96,7 +96,7 @@ public class TestDataSource extends BaseTest {
     Map<String, Object> sourceParameters = new HashMap<>();
     sourceParameters.put("abc", "abc");
     JdbcStorageConfig config = new JdbcStorageConfig(
-      DRIVER, url, "user", "password", false, sourceParameters);
+      DRIVER, url, "user", "password", false, sourceParameters, null);
 
     thrown.expect(UserException.class);
     thrown.expectMessage(UserBitShared.DrillPBError.ErrorType.CONNECTION.name());
@@ -109,7 +109,7 @@ public class TestDataSource extends BaseTest {
     Map<String, Object> sourceParameters = new HashMap<>();
     sourceParameters.put("minimumIdle", "abc");
     JdbcStorageConfig config = new JdbcStorageConfig(
-      DRIVER, url, "user", "password", false, sourceParameters);
+      DRIVER, url, "user", "password", false, sourceParameters, null);
 
     thrown.expect(UserException.class);
     thrown.expectMessage(UserBitShared.DrillPBError.ErrorType.CONNECTION.name());
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
index 1e7dc91..07b142c 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
@@ -64,7 +64,8 @@ public class TestJdbcPluginWithH2IT extends ClusterTest {
     }
     Map<String, Object> sourceParameters =  new HashMap<>();
     sourceParameters.put("minimumIdle", 1);
-    JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("org.h2.Driver", connString, "root", "root", true, sourceParameters);
+    JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("org.h2.Driver", connString,
+        "root", "root", true, sourceParameters, null);
     jdbcStorageConfig.setEnabled(true);
     cluster.defineStoragePlugin("h2", jdbcStorageConfig);
     cluster.defineStoragePlugin("h2o", jdbcStorageConfig);
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
index 6b7ae90..543f3ad 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
@@ -71,7 +71,7 @@ public class TestJdbcPluginWithMySQLIT extends ClusterTest {
 
     JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver",
         String.format("jdbc:mysql://localhost:%s/%s?useJDBCCompliantTimezoneShift=true", mysqlPort, mysqlDBName),
-        "mysqlUser", "mysqlPass", false, null);
+        "mysqlUser", "mysqlPass", false, null, null);
     jdbcStorageConfig.setEnabled(true);
 
     cluster.defineStoragePlugin("mysql", jdbcStorageConfig);
@@ -80,7 +80,7 @@ public class TestJdbcPluginWithMySQLIT extends ClusterTest {
       // adds storage plugin with case insensitive table names
       JdbcStorageConfig jdbcCaseSensitiveStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver",
           String.format("jdbc:mysql://localhost:%s/%s?useJDBCCompliantTimezoneShift=true", mysqlPort, mysqlDBName),
-          "mysqlUser", "mysqlPass", true, null);
+          "mysqlUser", "mysqlPass", true, null, null);
       jdbcCaseSensitiveStorageConfig.setEnabled(true);
       cluster.defineStoragePlugin("mysqlCaseInsensitive", jdbcCaseSensitiveStorageConfig);
     }
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java
index 586d38a..dc3a3f9 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java
@@ -17,14 +17,14 @@
  */
 package org.apache.drill.exec.store.kudu;
 
-import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.apache.drill.common.logical.StoragePluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName(KuduStoragePluginConfig.NAME)
-public class KuduStoragePluginConfig extends StoragePluginConfigBase {
+public class KuduStoragePluginConfig extends StoragePluginConfig {
 
   public static final String NAME = "kudu";
 
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index 1ed2026..e64632d 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -33,13 +33,17 @@ import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.store.security.HadoopCredentialsProvider;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.shaded.guava.com.google.common.cache.Cache;
 import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
 import org.apache.drill.shaded.guava.com.google.common.cache.RemovalListener;
 import org.apache.drill.shaded.guava.com.google.common.cache.RemovalNotification;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,20 +76,16 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
     this.schemaFactory = new MongoSchemaFactory(this, name);
   }
 
-  private static String addCredentialsFromCredentialsProvider(String connection, String name) {
+  private String addCredentialsFromCredentialsProvider(String connection, String name) {
     MongoClientURI parsed = new MongoClientURI(connection);
     if (parsed.getCredentials() == null) {
-      Configuration configuration = new Configuration();
+      UsernamePasswordCredentials credentials = getUsernamePasswordCredentials(name);
       try {
         // The default connection has the name "mongo" but multiple connections can be added;
         // each will need their own credentials.
-        char[] usernameChars = configuration.getPassword(
-            DrillMongoConstants.STORE_CONFIG_PREFIX + name + DrillMongoConstants.USERNAME_CONFIG_SUFFIX);
-        char[] passwordChars = configuration.getPassword(
-            DrillMongoConstants.STORE_CONFIG_PREFIX + name + DrillMongoConstants.PASSWORD_CONFIG_SUFFIX);
-        if (usernameChars != null && passwordChars != null) {
-          String username = URLEncoder.encode(new String(usernameChars), "UTF-8");
-          String password = URLEncoder.encode(new String(passwordChars), "UTF-8");
+        if (credentials.getUsername() != null && credentials.getPassword() != null) {
+          String username = URLEncoder.encode(credentials.getUsername(), "UTF-8");
+          String password = URLEncoder.encode(credentials.getPassword(), "UTF-8");
           return connection.replaceFirst("://",
               String.format("://%s:%s@", username, password));
         }
@@ -96,6 +96,20 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
     return connection;
   }
 
+  private UsernamePasswordCredentials getUsernamePasswordCredentials(String name) {
+    CredentialsProvider credentialsProvider = mongoConfig.getCredentialsProvider();
+    // for the case if empty credentials, tries to obtain credentials using HadoopCredentialsProvider
+    if (credentialsProvider == null || credentialsProvider == PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER) {
+      credentialsProvider = new HadoopCredentialsProvider(
+          ImmutableMap.of(
+              UsernamePasswordCredentials.USERNAME,
+              DrillMongoConstants.STORE_CONFIG_PREFIX + name + DrillMongoConstants.USERNAME_CONFIG_SUFFIX,
+              UsernamePasswordCredentials.PASSWORD,
+              DrillMongoConstants.STORE_CONFIG_PREFIX + name + DrillMongoConstants.PASSWORD_CONFIG_SUFFIX));
+    }
+    return new UsernamePasswordCredentials(credentialsProvider);
+  }
+
   @Override
   public MongoStoragePluginConfig getConfig() {
     return mongoConfig;
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
index 03acd05..77bca4c 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
@@ -19,8 +19,6 @@ package org.apache.drill.exec.store.mongo;
 
 import java.util.List;
 
-import org.apache.drill.common.logical.StoragePluginConfig;
-
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -28,9 +26,12 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.mongodb.MongoClientOptions;
 import com.mongodb.MongoClientURI;
 import com.mongodb.MongoCredential;
+import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 
 @JsonTypeName(MongoStoragePluginConfig.NAME)
-public class MongoStoragePluginConfig extends StoragePluginConfig {
+public class MongoStoragePluginConfig extends AbstractSecuredStoragePluginConfig {
 
   public static final String NAME = "mongo";
 
@@ -40,7 +41,9 @@ public class MongoStoragePluginConfig extends StoragePluginConfig {
   private final MongoClientURI clientURI;
 
   @JsonCreator
-  public MongoStoragePluginConfig(@JsonProperty("connection") String connection) {
+  public MongoStoragePluginConfig(@JsonProperty("connection") String connection,
+      @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
+    super(getCredentialsProvider(credentialsProvider), credentialsProvider == null);
     this.connection = connection;
     this.clientURI = new MongoClientURI(connection);
   }
@@ -80,4 +83,8 @@ public class MongoStoragePluginConfig extends StoragePluginConfig {
   public String getConnection() {
     return connection;
   }
+
+  private static CredentialsProvider getCredentialsProvider(CredentialsProvider credentialsProvider) {
+    return credentialsProvider != null ? credentialsProvider : PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
+  }
 }
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java
index 158e48b..d9cff73 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.mongo;
 
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.junit.AfterClass;
@@ -37,7 +38,8 @@ public class MongoTestBase extends ClusterTest implements MongoTestConstants {
   }
 
   private static void initMongoStoragePlugin(String connectionURI) throws Exception {
-    MongoStoragePluginConfig storagePluginConfig = new MongoStoragePluginConfig(connectionURI);
+    MongoStoragePluginConfig storagePluginConfig = new MongoStoragePluginConfig(connectionURI,
+        PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     storagePluginConfig.setEnabled(true);
     pluginRegistry.put(MongoStoragePluginConfig.NAME, storagePluginConfig);
 
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java
index f5da553..673a36d 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.mongo;
 import com.mongodb.MongoCredential;
 import org.apache.drill.categories.MongoStorageTest;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.test.BaseTest;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -32,8 +33,9 @@ import static org.junit.Assert.assertEquals;
 public class TestMongoStoragePluginUsesCredentialsStore extends BaseTest {
 
   private void test(String expectedUserName, String expectedPassword, String connection, String name) throws ExecutionSetupException {
-    MongoStoragePlugin plugin = new MongoStoragePlugin(new MongoStoragePluginConfig(
-      connection), null, name);
+    MongoStoragePlugin plugin = new MongoStoragePlugin(
+        new MongoStoragePluginConfig(connection, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER),
+        null, name);
     List<MongoCredential> creds = plugin.getClient().getCredentialsList();
     if (expectedUserName == null) {
       assertEquals(0, creds.size());
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePluginConfig.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePluginConfig.java
index e207886..f7cbc1c 100644
--- a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePluginConfig.java
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePluginConfig.java
@@ -21,14 +21,14 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.apache.drill.common.logical.StoragePluginConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
 
 @JsonTypeName(OpenTSDBStoragePluginConfig.NAME)
-public class OpenTSDBStoragePluginConfig extends StoragePluginConfigBase {
+public class OpenTSDBStoragePluginConfig extends StoragePluginConfig {
 
   private static final Logger logger = LoggerFactory.getLogger(OpenTSDBStoragePluginConfig.class);
 
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
index d2b82d8..05988c4 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
@@ -26,6 +26,7 @@ import com.splunk.SSLSecurityProtocol;
 import com.splunk.Service;
 import com.splunk.ServiceArgs;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,15 +37,13 @@ public class SplunkConnection {
 
   private static final Logger logger = LoggerFactory.getLogger(SplunkConnection.class);
 
-  private final String username;
-  private final String password;
+  private final UsernamePasswordCredentials credentials;
   private final String hostname;
   private final int port;
   private Service service;
 
   public SplunkConnection(SplunkPluginConfig config) {
-    this.username = config.getUsername();
-    this.password = config.getPassword();
+    this.credentials = config.getUsernamePasswordCredentials();
     this.hostname = config.getHostname();
     this.port = config.getPort();
     service = connect();
@@ -53,12 +52,9 @@ public class SplunkConnection {
 
   /**
    * This constructor is used for testing only
-   * @param config
-   * @param service
    */
   public SplunkConnection(SplunkPluginConfig config, Service service) {
-    this.username = config.getUsername();
-    this.password = config.getPassword();
+    this.credentials = config.getUsernamePasswordCredentials();
     this.hostname = config.getHostname();
     this.port = config.getPort();
     this.service = service;
@@ -73,8 +69,8 @@ public class SplunkConnection {
     ServiceArgs loginArgs = new ServiceArgs();
     loginArgs.setHost(hostname);
     loginArgs.setPort(port);
-    loginArgs.setPassword(password);
-    loginArgs.setUsername(username);
+    loginArgs.setPassword(credentials.getPassword());
+    loginArgs.setUsername(credentials.getUsername());
 
     try {
       service = Service.connect(loginArgs);
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
index d889c01..c122a98 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
@@ -19,20 +19,22 @@
 package org.apache.drill.exec.store.splunk;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.store.security.CredentialProviderUtils;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 
 import java.util.Objects;
 
 @JsonTypeName(SplunkPluginConfig.NAME)
-public class SplunkPluginConfig extends StoragePluginConfigBase {
+public class SplunkPluginConfig extends AbstractSecuredStoragePluginConfig {
 
   public static final String NAME = "splunk";
 
-  private final String username;
-  private final String password;
   private final String hostname;
   private final String earliestTime;
   private final String latestTime;
@@ -45,23 +47,35 @@ public class SplunkPluginConfig extends StoragePluginConfigBase {
                             @JsonProperty("hostname") String hostname,
                             @JsonProperty("port") int port,
                             @JsonProperty("earliestTime") String earliestTime,
-                            @JsonProperty("latestTime") String latestTime) {
-    this.username = username;
-    this.password = password;
+                            @JsonProperty("latestTime") String latestTime,
+                            @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
+    super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider),
+        credentialsProvider == null);
     this.hostname = hostname;
     this.port = port;
     this.earliestTime = earliestTime;
     this.latestTime = latestTime == null ? "now" : latestTime;
   }
 
+  @JsonIgnore
+  public UsernamePasswordCredentials getUsernamePasswordCredentials() {
+    return new UsernamePasswordCredentials(credentialsProvider);
+  }
+
   @JsonProperty("username")
   public String getUsername() {
-    return username;
+    if (directCredentials) {
+      return getUsernamePasswordCredentials().getUsername();
+    }
+    return null;
   }
 
   @JsonProperty("password")
   public String getPassword() {
-    return password;
+    if (directCredentials) {
+      return getUsernamePasswordCredentials().getPassword();
+    }
+    return null;
   }
 
   @JsonProperty("hostname")
@@ -93,8 +107,7 @@ public class SplunkPluginConfig extends StoragePluginConfigBase {
       return false;
     }
     SplunkPluginConfig thatConfig = (SplunkPluginConfig) that;
-    return Objects.equals(username, thatConfig.username) &&
-      Objects.equals(password, thatConfig.password) &&
+    return Objects.equals(credentialsProvider, thatConfig.credentialsProvider) &&
       Objects.equals(hostname, thatConfig.hostname) &&
       Objects.equals(port, thatConfig.port) &&
       Objects.equals(earliestTime, thatConfig.earliestTime) &&
@@ -103,14 +116,13 @@ public class SplunkPluginConfig extends StoragePluginConfigBase {
 
   @Override
   public int hashCode() {
-    return Objects.hash(username, password, hostname, port, earliestTime, latestTime);
+    return Objects.hash(credentialsProvider, hostname, port, earliestTime, latestTime);
   }
 
   @Override
   public String toString() {
     return new PlanStringBuilder(this)
-      .field("username", username)
-      .maskedField("password", password)
+      .field("credentialsProvider", credentialsProvider)
       .field("hostname", hostname)
       .field("port", port)
       .field("earliestTime", earliestTime)
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
index a538809..67006d5 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
@@ -48,7 +48,8 @@ public class SplunkConnectionTest extends SplunkBaseTest {
               SPLUNK_STORAGE_PLUGIN_CONFIG.getHostname(),
               SPLUNK_STORAGE_PLUGIN_CONFIG.getPort(),
               SPLUNK_STORAGE_PLUGIN_CONFIG.getEarliestTime(),
-              SPLUNK_STORAGE_PLUGIN_CONFIG.getLatestTime()
+              SPLUNK_STORAGE_PLUGIN_CONFIG.getLatestTime(),
+              null
       );
       SplunkConnection sc = new SplunkConnection(invalidSplunkConfig);
       sc.connect();
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
index 5dcbda2..274ea7d 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
@@ -71,7 +71,7 @@ public class SplunkTestSuite extends ClusterTest {
         String hostname = splunk.getHost();
         Integer port = splunk.getFirstMappedPort();
         StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
-        SPLUNK_STORAGE_PLUGIN_CONFIG = new SplunkPluginConfig(SPLUNK_LOGIN, SPLUNK_PASS, hostname, port, "1", "now");
+        SPLUNK_STORAGE_PLUGIN_CONFIG = new SplunkPluginConfig(SPLUNK_LOGIN, SPLUNK_PASS, hostname, port, "1", "now", null);
         SPLUNK_STORAGE_PLUGIN_CONFIG.setEnabled(true);
         pluginRegistry.put(SplunkPluginConfig.NAME, SPLUNK_STORAGE_PLUGIN_CONFIG);
         runningSuite = true;
diff --git a/docs/dev/PluginCredentialsProvider.md b/docs/dev/PluginCredentialsProvider.md
new file mode 100644
index 0000000..7702d1e
--- /dev/null
+++ b/docs/dev/PluginCredentialsProvider.md
@@ -0,0 +1,164 @@
+# Plugin credentials provider
+
+Drill provides a variety of ways for specifying credentials for storage plugins.
+Though all storage plugin credentials may be stored in Zookeeper, it may be unsafe to specify them directly in the plugin configs.
+
+Here is the example for specifying storage plugin credentials directly:
+```json
+{
+  "type": "jdbc",
+  "driver": "xxx.Driver",
+  "url": "jdbc:xxx:xxx",
+  "username": "xxx",
+  "password": "xxx"
+}
+```
+
+Drill provides `credentialsProvider` property for specifying desired credential provider type and its configs
+instead of holding raw credentials in storage plugin configs.
+
+## Using credentials from Hadoop Configuration
+
+One of the implementations for credentials provider is `HadoopCredentialsProvider` that allows using Hadoop 
+Configuration property values as credentials.
+To use it, `credentialsProviderType` property should be set to `HadoopCredentialsProvider`:
+```json
+{
+  "type": "jdbc",
+  "driver": "xxx.Driver",
+  "url": "jdbc:xxx:xxx",
+  "credentialsProvider": {
+    "credentialsProviderType": "HadoopCredentialsProvider",
+    "propertyNames": {
+      "username": "hadoop.user.property",
+      "password": "hadoop.password.property"
+    }
+  }
+}
+```
+
+`propertyNames` map contains keys that specify which credential will be obtained from the Hadoop Configuration 
+property with the name of the `propertyNames` value.
+
+For example, user may create in drill config directory `core-site.xml` config file with the following content:
+```xml
+<configuration>
+
+    <property>
+        <name>hadoop.user.property</name>
+        <value>user1</value>
+    </property>
+
+    <property>
+        <name>hadoop.password.property</name>
+        <value>user1Pass</value>
+    </property>
+
+</configuration>
+```
+
+In this case, the storage `jdbc` plugin will use `user1` value as the `username` and `user1Pass` value as its `password`.
+
+## Using credentials from Environment Variables
+
+`EnvCredentialsProvider` credentials provider implementation allows using Environment Variable values as plugin credentials.
+This way for specifying credentials is consistent with the Kubernetes world, where different user secrets and configmaps may be exposed as environment variables to be used by a container.
+
+To use it, `credentialsProviderType` property should be set to `EnvCredentialsProvider`:
+```json
+{
+  "type": "jdbc",
+  "driver": "xxx.Driver",
+  "url": "jdbc:xxx:xxx",
+  "credentialsProvider": {
+    "credentialsProviderType": "EnvCredentialsProvider",
+    "envVariableNames": {
+      "username": "USER_NAME",
+      "password": "USER_PASSWORD"
+    }
+  }
+}
+```
+
+`propertyNames` map contains keys that specify which credential will be obtained from the Environment Variable
+value with the name of the `propertyNames` value.
+
+For example, user may export the following variables:
+```shell
+export USER_NAME='user1'
+export USER_PASSWORD='user1Pass'
+```
+
+In this case, the storage `jdbc` plugin will use `user1` value as the `username` and `user1Pass` value as its `password`.
+
+## Using credentials managed by Vault
+
+`VaultCredentialsProvider` credentials provider implementation allows using Vault secrets as plugin credentials.
+
+Before using this credential provider, the following Drill properties should be configured in `drill-override.conf`:
+```
+"drill.exec.storage.vault.address" - address of the Vault server
+"drill.exec.storage.vault.token" - token used to access Vault
+```
+
+Once it is set, we can configure storage plugin to use this way of obtaining credentials:
+```json
+{
+  "type": "jdbc",
+  "driver": "xxx.Driver",
+  "url": "jdbc:xxx:xxx",
+  "credentialsProvider": {
+    "credentialsProviderType": "VaultCredentialsProvider",
+    "secretPath": "secret/jdbc",
+    "propertyNames": {
+      "username": "usernameSecret",
+      "password": "passwordSecret"
+    }
+  }
+}
+```
+
+`secretPath` property specifies the Vault key value from which to read
+`propertyNames` map contains keys that specify which credential will be obtained from the Vault secret with the secret name of the `propertyNames` value.
+
+For example, user may store the following secrets in the Vault:
+```shell
+vault kv put secret/jdbc usernameSecret=muser passwordSecret=mpassword
+```
+
+In this case, the storage `jdbc` plugin will use `user1` value as the `username` and `user1Pass` value as its `password`.
+
+## Specifying credentials inlined using credentials provider
+
+To be consistent with credentials providers implementations, Drill provides a new way of specifying credentials directly in storage plugin config:
+```json
+{
+  "type": "jdbc",
+  "driver": "xxx.Driver",
+  "url": "jdbc:xxx:xxx",
+  "credentialsProvider": {
+    "credentialsProviderType": "PlainCredentialsProvider",
+    "credentials": {
+      "username": "user1",
+      "password": "user1Pass"
+    }
+  }
+}
+```
+
+This way of specifying credentials directly should be used instead of the old one since it groups credentials and
+makes it easier to replace `PlainCredentialsProvider` with a more secured alternative.
+
+# Developer notes
+
+`CredentialsProvider` is a base interface for all credential provider implementations.
+Users may create and use their own credential provider implementations without changing Drill code.
+To achieve that, just add dependency on the `drill-logical` jar (where `CredentialsProvider` interface is placed),
+create own implementation of this interface, and create `drill-module.conf` file that will add implementation class 
+to Drill classpath scanning, for example if the class will have the following full name: `foo.bar.package.ExampleCredentialsProvider`,
+`drill-module.conf` content should be the following:
+```
+drill.classpath.scanning: {
+  packages += "foo.bar.package"
+}
+```
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 0242044..f30700c 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -312,6 +312,11 @@
       <version>1.30</version>
     </dependency>
     <dependency>
+      <groupId>com.bettercloud</groupId>
+      <artifactId>vault-java-driver</artifactId>
+      <version>5.1.0</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.curator</groupId>
       <artifactId>curator-client</artifactId>
     </dependency>
@@ -622,6 +627,12 @@
       <version>${okhttp.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>vault</artifactId>
+      <version>1.15.1</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
index 45975f2..22813e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
@@ -76,6 +76,7 @@ public class PhysicalPlanReader {
     lpMapper.registerSubtypes(DynamicPojoRecordReader.class);
     InjectableValues injectables = new InjectableValues.Std()
         .addValue(StoragePluginRegistry.class, pluginRegistry)
+        .addValue(DrillConfig.class, config)
         .addValue(DrillbitEndpoint.class, endpoint);
 
     this.mapper = lpMapper;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
index 1e3b640..85709b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
@@ -245,7 +245,7 @@ public class StorageResources {
       storage.putJson(name, storagePluginConfig);
       return message("Success");
     } catch (PluginEncodingException e) {
-      logger.debug("Error in JSON mapping: {}", storagePluginConfig, e);
+      logger.warn("Error in JSON mapping: {}", storagePluginConfig, e);
       return message("Invalid JSON");
     } catch (PluginException e) {
       return message(e.getMessage());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
index bc78d5d..4c11ab0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -17,27 +17,38 @@
  */
 package org.apache.drill.exec.store.dfs;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.logical.FormatPluginConfig;
-import org.apache.drill.common.logical.StoragePluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
+import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap.Builder;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 
 @JsonTypeName(FileSystemConfig.NAME)
-public class FileSystemConfig extends StoragePluginConfig {
+public class FileSystemConfig extends AbstractSecuredStoragePluginConfig {
+  private static final List<String> FS_CREDENTIAL_KEYS =
+      Arrays.asList(
+          CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH,
+          "fs.s3a.access.key",
+          "fs.s3a.secret.key");
 
   public static final String NAME = "file";
 
@@ -50,7 +61,9 @@ public class FileSystemConfig extends StoragePluginConfig {
   public FileSystemConfig(@JsonProperty("connection") String connection,
                           @JsonProperty("config") Map<String, String> config,
                           @JsonProperty("workspaces") Map<String, WorkspaceConfig> workspaces,
-                          @JsonProperty("formats") Map<String, FormatPluginConfig> formats) {
+                          @JsonProperty("formats") Map<String, FormatPluginConfig> formats,
+                          @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
+    super(getCredentialsProvider(config, credentialsProvider), credentialsProvider == null);
     this.connection = connection;
 
     // Force creation of an empty map so that configs compare equal
@@ -147,8 +160,26 @@ public class FileSystemConfig extends StoragePluginConfig {
       formatsCopy = formatsCopy == null ? new LinkedHashMap<>() : formatsCopy;
       formatsCopy.putAll(newFormats);
     }
-    FileSystemConfig newConfig = new FileSystemConfig(connection, configCopy, workspaces, formatsCopy);
+    FileSystemConfig newConfig =
+        new FileSystemConfig(connection, configCopy, workspaces, formatsCopy, credentialsProvider);
     newConfig.setEnabled(isEnabled());
     return newConfig;
   }
+
+  private static CredentialsProvider getCredentialsProvider(
+      Map<String, String> config,
+      CredentialsProvider credentialsProvider) {
+    if (credentialsProvider != null) {
+      return credentialsProvider;
+    }
+
+    if (config != null) {
+      Map<String, String> credentials = FS_CREDENTIAL_KEYS.stream()
+          .filter(config::containsKey)
+          .collect(Collectors.toMap(fsCredentialKey -> fsCredentialKey, config::get));
+
+      return new PlainCredentialsProvider(credentials);
+    }
+    return PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 6ed02f4..6cc733b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -33,6 +33,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.metastore.MetadataProviderManager;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
@@ -89,6 +90,10 @@ public class FileSystemPlugin extends AbstractStoragePlugin {
       fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.getConnection());
       fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
       fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName());
+      CredentialsProvider credentialsProvider = config.getCredentialsProvider();
+      if (credentialsProvider != null) {
+        credentialsProvider.getCredentials().forEach(fsConf::set);
+      }
 
       addCodecs(fsConf);
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java
index b58d9ab..a888d00 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java
@@ -17,14 +17,14 @@
  */
 package org.apache.drill.exec.store.mock;
 
-import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.apache.drill.common.logical.StoragePluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName(MockStorageEngineConfig.NAME)
-public class MockStorageEngineConfig extends StoragePluginConfigBase {
+public class MockStorageEngineConfig extends StoragePluginConfig {
   public static final String NAME = "mock";
   public static final MockStorageEngineConfig INSTANCE = new MockStorageEngineConfig("mock:///");
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/CredentialProviderUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/CredentialProviderUtils.java
new file mode 100644
index 0000000..e8ecf7c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/CredentialProviderUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.security;
+
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+
+public class CredentialProviderUtils {
+
+  /**
+   * Returns specified {@code CredentialsProvider credentialsProvider}
+   * if it is not null or builds and returns {@link PlainCredentialsProvider}
+   * with specified {@code USERNAME} and {@code PASSWORD}.
+   */
+  public static CredentialsProvider getCredentialsProvider(
+      String username, String password,
+      CredentialsProvider credentialsProvider) {
+    if (credentialsProvider != null) {
+      return credentialsProvider;
+    }
+    ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
+    if (username != null) {
+      mapBuilder.put(UsernamePasswordCredentials.USERNAME, username);
+    }
+    if (password != null) {
+      mapBuilder.put(UsernamePasswordCredentials.PASSWORD, password);
+    }
+    return new PlainCredentialsProvider(mapBuilder.build());
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/EnvCredentialsProvider.java
similarity index 50%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/store/security/EnvCredentialsProvider.java
index b58d9ab..2325018 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/EnvCredentialsProvider.java
@@ -15,28 +15,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.mock;
-
-import org.apache.drill.common.logical.StoragePluginConfigBase;
+package org.apache.drill.exec.store.security;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.security.CredentialsProvider;
 
-@JsonTypeName(MockStorageEngineConfig.NAME)
-public class MockStorageEngineConfig extends StoragePluginConfigBase {
-  public static final String NAME = "mock";
-  public static final MockStorageEngineConfig INSTANCE = new MockStorageEngineConfig("mock:///");
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
 
-  private final String url;
+/**
+ * Implementation of {@link CredentialsProvider} that obtains credential values from
+ * environment variables.
+ * <p>
+ * Its constructor accepts a map with credential names as keys and env variable names as values.
+ */
+public class EnvCredentialsProvider implements CredentialsProvider {
+  private final Map<String, String> envVariables;
 
   @JsonCreator
-  public MockStorageEngineConfig(@JsonProperty("url") String url) {
-    this.url = url;
+  public EnvCredentialsProvider(@JsonProperty("envVariableNames") Map<String, String> envVariableNames) {
+    this.envVariables = envVariableNames;
+  }
+
+  @Override
+  public Map<String, String> getCredentials() {
+    Map<String, String> credentials = new HashMap<>();
+    envVariables.forEach((key, value) -> credentials.put(key, System.getenv(value)));
+
+    return credentials;
   }
 
-  public String getUrl() {
-    return url;
+  public Map<String, String> getEnvVariables() {
+    return envVariables;
   }
 
   @Override
@@ -47,18 +59,12 @@ public class MockStorageEngineConfig extends StoragePluginConfigBase {
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-
-    MockStorageEngineConfig that = (MockStorageEngineConfig) o;
-
-    if (url != null ? !url.equals(that.url) : that.url != null) {
-      return false;
-    }
-
-    return true;
+    EnvCredentialsProvider that = (EnvCredentialsProvider) o;
+    return Objects.equals(envVariables, that.envVariables);
   }
 
   @Override
   public int hashCode() {
-    return url != null ? url.hashCode() : 0;
+    return Objects.hash(envVariables);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/HadoopCredentialsProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/HadoopCredentialsProvider.java
new file mode 100644
index 0000000..e678e6f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/HadoopCredentialsProvider.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.security;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Implementation of {@link CredentialsProvider} that obtains credential values from
+ * {@link Configuration} properties.
+ * <p>
+ * Its constructor accepts a map with credential names as keys and property names as values.
+ */
+public class HadoopCredentialsProvider implements CredentialsProvider {
+  private final Configuration configuration;
+  private final Map<String, String> propertyNames;
+
+  public HadoopCredentialsProvider(Configuration configuration, Map<String, String> propertyNames) {
+    this.configuration = configuration;
+    this.propertyNames = propertyNames;
+  }
+
+  @JsonCreator
+  public HadoopCredentialsProvider(@JsonProperty("propertyNames") Map<String, String> propertyNames) {
+    this.configuration = new Configuration();
+    this.propertyNames = propertyNames;
+  }
+
+  @Override
+  public Map<String, String> getCredentials() {
+    Map<String, String> credentials = new HashMap<>();
+    propertyNames.forEach((key, value) -> {
+      try {
+        char[] credValue = configuration.getPassword(value);
+        if (credValue != null) {
+          credentials.put(key, new String(credValue));
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("Error while fetching credentials from configuration", e);
+      }
+    });
+
+    return credentials;
+  }
+
+  public Map<String, String> getPropertyNames() {
+    return propertyNames;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    HadoopCredentialsProvider that = (HadoopCredentialsProvider) o;
+    return Objects.equals(propertyNames, that.propertyNames);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(propertyNames);
+  }
+}
diff --git a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfigBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordCredentials.java
similarity index 52%
copy from logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfigBase.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordCredentials.java
index d8c76d7..2dbc8b1 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfigBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/UsernamePasswordCredentials.java
@@ -15,7 +15,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.common.logical;
+package org.apache.drill.exec.store.security;
 
-public abstract class StoragePluginConfigBase extends StoragePluginConfig {
+import org.apache.drill.common.logical.security.CredentialsProvider;
+
+import java.util.Map;
+
+public class UsernamePasswordCredentials {
+  public static final String USERNAME = "username";
+  public static final String PASSWORD = "password";
+
+  private final String username;
+  private final String password;
+
+  public UsernamePasswordCredentials(CredentialsProvider credentialsProvider) {
+    Map<String, String> credentials = credentialsProvider.getCredentials();
+    this.username = credentials.get(USERNAME);
+    this.password = credentials.get(PASSWORD);
+  }
+
+  public String getUsername() {
+    return username;
+  }
+
+  public String getPassword() {
+    return password;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/vault/VaultCredentialsProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/vault/VaultCredentialsProvider.java
new file mode 100644
index 0000000..aff2acc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/security/vault/VaultCredentialsProvider.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.security.vault;
+
+import com.bettercloud.vault.Vault;
+import com.bettercloud.vault.VaultConfig;
+import com.bettercloud.vault.VaultException;
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.OptBoolean;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Implementation of {@link CredentialsProvider} that obtains credential values from
+ * {@link Vault}.
+ */
+public class VaultCredentialsProvider implements CredentialsProvider {
+
+  public static final String VAULT_ADDRESS = "drill.exec.storage.vault.address";
+  public static final String VAULT_TOKEN = "drill.exec.storage.vault.token";
+
+  private final String secretPath;
+
+  private final Map<String, String> propertyNames;
+
+  private final Vault vault;
+
+  /**
+   * @param secretPath The Vault key value from which to read
+   * @param propertyNames map with credential names as keys and vault keys as values.
+   * @param config drill config
+   * @throws VaultException if exception happens when connecting to Vault.
+   */
+  @JsonCreator
+  public VaultCredentialsProvider(
+      @JsonProperty("secretPath") String secretPath,
+      @JsonProperty("propertyNames") Map<String, String> propertyNames,
+      @JacksonInject(useInput = OptBoolean.FALSE) DrillConfig config) throws VaultException {
+    this.propertyNames = propertyNames;
+    this.secretPath = secretPath;
+    String vaultAddress = Objects.requireNonNull(config.getString(VAULT_ADDRESS),
+        String.format("Vault address is not specified. Please set [%s] config property.", VAULT_ADDRESS));
+    String token = Objects.requireNonNull(config.getString(VAULT_TOKEN),
+        String.format("Vault token is not specified. Please set [%s] config property.", VAULT_TOKEN));
+
+    VaultConfig vaultConfig = new VaultConfig()
+        .address(vaultAddress)
+        .token(token)
+        .build();
+    this.vault = new Vault(vaultConfig);
+  }
+
+  @Override
+  public Map<String, String> getCredentials() {
+    Map<String, String> credentials = new HashMap<>();
+    propertyNames.forEach((key, value) -> {
+      try {
+        String credValue = vault.logical()
+            .read(secretPath)
+            .getData()
+            .get(value);
+        credentials.put(key, credValue);
+      } catch (VaultException e) {
+        throw new RuntimeException("Error while fetching credentials from vault", e);
+      }
+    });
+
+    return credentials;
+  }
+
+  public String getSecretPath() {
+    return secretPath;
+  }
+
+  public Map<String, String> getPropertyNames() {
+    return propertyNames;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    VaultCredentialsProvider that = (VaultCredentialsProvider) o;
+    return Objects.equals(secretPath, that.secretPath) && Objects.equals(propertyNames, that.propertyNames);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(secretPath, propertyNames);
+  }
+}
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 01f454d..2a58194 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -48,7 +48,8 @@ drill {
       org.apache.drill.exec.rpc.user.security,
       org.apache.drill.exec.rpc.security,
       org.apache.drill.exec.server.rest.auth,
-      org.apache.drill.exec.coord.zk
+      org.apache.drill.exec.coord.zk,
+      org.apache.drill.exec.store.security
     ],
 
     # caches scanned result during build time
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
index d211485..6fd881f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
@@ -130,7 +130,8 @@ public class BaseTestImpersonation extends PlanTestBase {
     String connection = dfsConf.get(FileSystem.FS_DEFAULT_NAME_KEY);
     createAndAddWorkspace("tmp", "/tmp", (short) 0777, processUser, processUser, workspaces);
 
-    FileSystemConfig miniDfsPluginConfig = new FileSystemConfig(connection, null, workspaces, lfsPluginConfig.getFormats());
+    FileSystemConfig miniDfsPluginConfig = new FileSystemConfig(connection, null,
+        workspaces, lfsPluginConfig.getFormats(), null);
     miniDfsPluginConfig.setEnabled(true);
     pluginRegistry.put(MINI_DFS_STORAGE_PLUGIN_NAME, miniDfsPluginConfig);
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
index f160569..80d2fd7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
@@ -33,6 +33,7 @@ import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.BaseTestQuery;
 import org.junit.BeforeClass;
@@ -65,7 +66,8 @@ public class TestCTTAS extends BaseTestQuery {
         pluginConfig.getConnection(),
         pluginConfig.getConfig(),
         newWorkspaces,
-        pluginConfig.getFormats());
+        pluginConfig.getFormats(),
+        PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     newPluginConfig.setEnabled(pluginConfig.isEnabled());
     pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig);
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
index 509809c..11692d7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
@@ -458,7 +458,6 @@ public class TestInfoSchema extends BaseTestQuery {
 
     // check some stable properties existence
     assertTrue(configMap.containsKey("connection"));
-    assertTrue(configMap.containsKey("config"));
     assertTrue(configMap.containsKey("formats"));
     assertFalse(configMap.containsKey("workspaces"));
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/BasePluginRegistryTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/BasePluginRegistryTest.java
index f4d6981..55f58ab 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/BasePluginRegistryTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/BasePluginRegistryTest.java
@@ -51,7 +51,7 @@ public class BasePluginRegistryTest extends BaseTest {
 
   protected static final String RESOURCE_BASE = "plugins/";
 
-  protected class PluginRegistryContextFixture implements PluginRegistryContext {
+  protected static class PluginRegistryContextFixture implements PluginRegistryContext {
 
     private final DrillConfig drillConfig;
     private final ScanResult classpathScan;
@@ -171,7 +171,7 @@ public class BasePluginRegistryTest extends BaseTest {
     public void init() { }
 
     @Override
-    public StoragePlugins bootstrapPlugins() throws IOException {
+    public StoragePlugins bootstrapPlugins() {
       return null;
     }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestClassicLocator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestClassicLocator.java
index ea095ab..acfa606 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestClassicLocator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestClassicLocator.java
@@ -27,7 +27,7 @@ import java.util.Collections;
 import java.util.Set;
 
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.logical.StoragePlugins;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
@@ -63,7 +63,7 @@ public class TestClassicLocator extends BasePluginRegistryTest {
 
       // Abstract classes do not appear
       assertFalse(result.contains(StoragePluginConfig.class));
-      assertFalse(result.contains(StoragePluginConfigBase.class));
+      assertFalse(result.contains(AbstractSecuredStoragePluginConfig.class));
 
       // The private plugin class does not appear
       assertFalse(result.contains(StoragePluginFixtureConfig.class));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
index bfff3f9..16553e4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.store.StoragePluginRegistry.PluginFilter;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.BaseTest;
 import org.apache.drill.test.ClusterFixture;
@@ -83,7 +84,8 @@ public class TestPluginRegistry extends BaseTest {
 
   private FileSystemConfig myConfig1() {
     FileSystemConfig config = new FileSystemConfig("myConn",
-        new HashMap<>(), new HashMap<>(), new HashMap<>());
+        new HashMap<>(), new HashMap<>(), new HashMap<>(),
+        PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     config.setEnabled(true);
     return config;
   }
@@ -92,7 +94,8 @@ public class TestPluginRegistry extends BaseTest {
     Map<String, String> props = new HashMap<>();
     props.put("foo", "bar");
     FileSystemConfig config = new FileSystemConfig("myConn",
-        props, new HashMap<>(), new HashMap<>());
+        props, new HashMap<>(), new HashMap<>(),
+        PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     config.setEnabled(true);
     return config;
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/StoragePluginTestUtils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
index 35b7b31..2cd1a0c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -76,7 +77,8 @@ public class StoragePluginTestUtils {
         pluginConfig.getConnection(),
         pluginConfig.getConfig(),
         newWorkspaces,
-        pluginConfig.getFormats());
+        pluginConfig.getFormats(),
+        PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     newPluginConfig.setEnabled(pluginConfig.isEnabled());
     pluginRegistry.put(pluginName, newPluginConfig);
   }
@@ -112,7 +114,8 @@ public class StoragePluginTestUtils {
         fileSystemConfig.getConnection(),
         fileSystemConfig.getConfig(),
         fileSystemConfig.getWorkspaces(),
-        newFormats);
+        newFormats,
+        PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     newFileSystemConfig.setEnabled(fileSystemConfig.isEnabled());
 
     pluginRegistry.put(storagePlugin, newFileSystemConfig);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/storage/CredentialsProviderImplementationsTest.java b/exec/java-exec/src/test/java/org/apache/drill/storage/CredentialsProviderImplementationsTest.java
new file mode 100644
index 0000000..b8df40a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/storage/CredentialsProviderImplementationsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.storage;
+
+import com.bettercloud.vault.VaultException;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.store.security.EnvCredentialsProvider;
+import org.apache.drill.exec.store.security.HadoopCredentialsProvider;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.apache.drill.exec.store.security.vault.VaultCredentialsProvider;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.vault.VaultContainer;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class CredentialsProviderImplementationsTest extends ClusterTest {
+
+  private static final String VAULT_TOKEN_VALUE = "vault-token";
+
+  private static final String SECRET_PATH = "secret/testing";
+
+  @ClassRule
+  public static final VaultContainer<?> vaultContainer =
+      new VaultContainer<>(DockerImageName.parse("vault").withTag("1.1.3"))
+          .withVaultToken(VAULT_TOKEN_VALUE)
+          .withVaultPort(8200)
+          .withSecretInVault(SECRET_PATH,
+              "top_secret=password1",
+              "db_password=dbpassword1");
+
+  @BeforeClass
+  public static void init() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher)
+        .configProperty(VaultCredentialsProvider.VAULT_ADDRESS, "http://" + vaultContainer.getHost() + ":" + vaultContainer.getMappedPort(8200))
+    .configProperty(VaultCredentialsProvider.VAULT_TOKEN, VAULT_TOKEN_VALUE));
+  }
+
+  @Test
+  public void testEnvCredentialsProvider() {
+    String variableName = "USER";
+    String expectedValue = System.getenv(variableName);
+
+    CredentialsProvider envCredentialsProvider = new EnvCredentialsProvider(ImmutableMap.of(
+        UsernamePasswordCredentials.USERNAME, variableName));
+
+    Map<String, String> actualCredentials = envCredentialsProvider.getCredentials();
+
+    assertEquals(Collections.singletonMap(UsernamePasswordCredentials.USERNAME, expectedValue),
+        actualCredentials);
+  }
+
+  @Test
+  public void testHadoopCredentialsProvider() {
+    Configuration configuration = new Configuration();
+    String expectedUsernameValue = "user1";
+    String expectedPassValue = "pass123!@#";
+    String usernamePropertyName = "username_key";
+    String passwordPropertyName = "password_key";
+    configuration.set(usernamePropertyName, expectedUsernameValue);
+    configuration.set(passwordPropertyName, expectedPassValue);
+
+    CredentialsProvider envCredentialsProvider = new HadoopCredentialsProvider(configuration,
+        ImmutableMap.of(
+            UsernamePasswordCredentials.USERNAME, usernamePropertyName,
+            UsernamePasswordCredentials.PASSWORD, passwordPropertyName));
+
+    Map<String, String> actualCredentials = envCredentialsProvider.getCredentials();
+
+    assertEquals(ImmutableMap.of(
+        UsernamePasswordCredentials.USERNAME, expectedUsernameValue,
+        UsernamePasswordCredentials.PASSWORD, expectedPassValue),
+        actualCredentials);
+  }
+
+  @Test
+  public void testVaultCredentialsProvider() throws VaultException {
+    DrillConfig config = cluster.drillbit().getContext().getConfig();
+
+    CredentialsProvider envCredentialsProvider = new VaultCredentialsProvider(
+        SECRET_PATH,
+        ImmutableMap.of(UsernamePasswordCredentials.USERNAME, "top_secret",
+            UsernamePasswordCredentials.PASSWORD, "db_password"),
+        config);
+
+    Map<String, String> actualCredentials = envCredentialsProvider.getCredentials();
+
+    assertEquals(ImmutableMap.of(UsernamePasswordCredentials.USERNAME, "password1",
+        UsernamePasswordCredentials.PASSWORD, "dbpassword1"),
+        actualCredentials);
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/storage/CredentialsProviderSerDeTest.java b/exec/java-exec/src/test/java/org/apache/drill/storage/CredentialsProviderSerDeTest.java
new file mode 100644
index 0000000..fc2e3d8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/storage/CredentialsProviderSerDeTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.storage;
+
+import com.bettercloud.vault.VaultException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.store.security.EnvCredentialsProvider;
+import org.apache.drill.exec.store.security.HadoopCredentialsProvider;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.apache.drill.exec.store.security.vault.VaultCredentialsProvider;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.vault.VaultContainer;
+
+import static org.junit.Assert.assertEquals;
+
+public class CredentialsProviderSerDeTest extends ClusterTest {
+
+  private static final String VAULT_TOKEN_VALUE = "vault-token";
+
+  private static final String SECRET_PATH = "secret/testing";
+
+  @ClassRule
+  public static final VaultContainer<?> vaultContainer =
+      new VaultContainer<>(DockerImageName.parse("vault").withTag("1.1.3"))
+          .withVaultToken(VAULT_TOKEN_VALUE)
+          .withVaultPort(8200)
+          .withSecretInVault(SECRET_PATH,
+              "top_secret=password1",
+              "db_password=dbpassword1");
+
+  @BeforeClass
+  public static void init() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher)
+        .configProperty(VaultCredentialsProvider.VAULT_ADDRESS, "http://" + vaultContainer.getHost() + ":" + vaultContainer.getMappedPort(8200))
+        .configProperty(VaultCredentialsProvider.VAULT_TOKEN, VAULT_TOKEN_VALUE));
+  }
+
+  @Test
+  public void testEnvCredentialsProviderSerDe() throws JsonProcessingException {
+    ObjectMapper mapper = cluster.drillbit().getContext().getLpPersistence().getMapper();
+
+    CredentialsProvider envCredentialsProvider = new EnvCredentialsProvider(ImmutableMap.of(
+        UsernamePasswordCredentials.USERNAME, "myLoginVar",
+        UsernamePasswordCredentials.PASSWORD, "myPassVar"));
+
+    String serialized = mapper.writerFor(CredentialsProvider.class).writeValueAsString(envCredentialsProvider);
+
+    String expected =
+        "{\n" +
+        "  \"credentialsProviderType\" : \"EnvCredentialsProvider\",\n" +
+        "  \"envVariables\" : {\n" +
+        "    \"username\" : \"myLoginVar\",\n" +
+        "    \"password\" : \"myPassVar\"\n" +
+        "  }\n" +
+        "}";
+
+    assertEquals(expected, serialized);
+
+    CredentialsProvider deserialized = mapper.readerFor(CredentialsProvider.class).readValue(serialized);
+
+    assertEquals(envCredentialsProvider, deserialized);
+  }
+
+  @Test
+  public void testPlainCredentialsProviderSerDe() throws JsonProcessingException {
+    ObjectMapper mapper = cluster.drillbit().getContext().getLpPersistence().getMapper();
+
+    CredentialsProvider credentialsProvider = new PlainCredentialsProvider(ImmutableMap.of(
+        UsernamePasswordCredentials.USERNAME, "myLogin",
+        UsernamePasswordCredentials.PASSWORD, "myPass"));
+
+    String serialized = mapper.writerFor(CredentialsProvider.class).writeValueAsString(credentialsProvider);
+
+    String expected =
+        "{\n" +
+        "  \"credentialsProviderType\" : \"PlainCredentialsProvider\",\n" +
+        "  \"credentials\" : {\n" +
+        "    \"username\" : \"myLogin\",\n" +
+        "    \"password\" : \"myPass\"\n" +
+        "  }\n" +
+        "}";
+
+    assertEquals(expected, serialized);
+
+    CredentialsProvider deserialized = mapper.readerFor(CredentialsProvider.class).readValue(serialized);
+
+    assertEquals(credentialsProvider, deserialized);
+  }
+
+  @Test
+  public void testPlainCredentialsProviderWithNoType() throws JsonProcessingException {
+    ObjectMapper mapper = cluster.drillbit().getContext().getLpPersistence().getMapper();
+
+    CredentialsProvider expected = new PlainCredentialsProvider(ImmutableMap.of(
+        UsernamePasswordCredentials.USERNAME, "myLogin",
+        UsernamePasswordCredentials.PASSWORD, "myPass"));
+
+    String serialized =
+        "{\n" +
+        "  \"credentials\" : {\n" +
+        "    \"username\" : \"myLogin\",\n" +
+        "    \"password\" : \"myPass\"\n" +
+        "  }\n" +
+        "}";
+
+    CredentialsProvider deserialized = mapper.readerFor(CredentialsProvider.class).readValue(serialized);
+
+    assertEquals(expected, deserialized);
+  }
+
+  @Test
+  public void testHadoopCredentialsProviderSerDe() throws JsonProcessingException {
+    ObjectMapper mapper = cluster.drillbit().getContext().getLpPersistence().getMapper();
+
+    CredentialsProvider credentialsProvider = new HadoopCredentialsProvider(ImmutableMap.of(
+        UsernamePasswordCredentials.USERNAME, "myLoginProp",
+        UsernamePasswordCredentials.PASSWORD, "myPassProp"));
+
+    String serialized = mapper.writerFor(CredentialsProvider.class).writeValueAsString(credentialsProvider);
+
+    String expected =
+        "{\n" +
+        "  \"credentialsProviderType\" : \"HadoopCredentialsProvider\",\n" +
+        "  \"propertyNames\" : {\n" +
+        "    \"username\" : \"myLoginProp\",\n" +
+        "    \"password\" : \"myPassProp\"\n" +
+        "  }\n" +
+        "}";
+
+    assertEquals(expected, serialized);
+
+    CredentialsProvider deserialized = mapper.readerFor(CredentialsProvider.class).readValue(serialized);
+
+    assertEquals(credentialsProvider, deserialized);
+  }
+
+  @Test
+  public void testVaultCredentialsProviderSerDe() throws JsonProcessingException, VaultException {
+    ObjectMapper mapper = cluster.drillbit().getContext().getLpPersistence().getMapper();
+
+    DrillConfig config = cluster.drillbit().getContext().getConfig();
+
+    CredentialsProvider credentialsProvider = new VaultCredentialsProvider(SECRET_PATH, ImmutableMap.of(
+        UsernamePasswordCredentials.USERNAME, "myLoginProp",
+        UsernamePasswordCredentials.PASSWORD, "myPassProp"),
+        config);
+
+    String serialized = mapper.writerFor(CredentialsProvider.class).writeValueAsString(credentialsProvider);
+
+    String expected =
+        "{\n" +
+        "  \"credentialsProviderType\" : \"VaultCredentialsProvider\",\n" +
+        "  \"secretPath\" : \"secret/testing\",\n" +
+        "  \"propertyNames\" : {\n" +
+        "    \"username\" : \"myLoginProp\",\n" +
+        "    \"password\" : \"myPassProp\"\n" +
+        "  }\n" +
+        "}";
+
+    assertEquals(expected, serialized);
+
+    CredentialsProvider deserialized = mapper.readerFor(CredentialsProvider.class).readValue(serialized);
+
+    assertEquals(credentialsProvider, deserialized);
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index f6ba1df..3a027ed 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -57,6 +57,7 @@ import org.apache.drill.exec.store.StoragePluginRegistryImpl;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.WorkspaceConfig;
 import org.apache.drill.exec.store.mock.MockStorageEngineConfig;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider;
 import org.apache.drill.exec.util.StoragePluginTestUtils;
 import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
@@ -558,7 +559,8 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
       pluginConfig.getConnection(),
       pluginConfig.getConfig(),
       newWorkspaces == null ? pluginConfig.getWorkspaces() : newWorkspaces,
-      newFormats == null ? pluginConfig.getFormats() : newFormats);
+      newFormats == null ? pluginConfig.getFormats() : newFormats,
+      PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     newPluginConfig.setEnabled(pluginConfig.isEnabled());
 
     pluginRegistry.put(pluginName, newPluginConfig);
diff --git a/exec/vector/pom.xml b/exec/vector/pom.xml
index e034ed2..62f21dd 100644
--- a/exec/vector/pom.xml
+++ b/exec/vector/pom.xml
@@ -58,6 +58,10 @@
       <artifactId>jackson-databind</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.fasterxml.jackson.datatype</groupId>
+      <artifactId>jackson-datatype-joda</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <scope>provided</scope>
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/util/JsonStringHashMap.java b/exec/vector/src/main/java/org/apache/drill/exec/util/JsonStringHashMap.java
index f4352d11..337858f 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/util/JsonStringHashMap.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/util/JsonStringHashMap.java
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
 
 /*
  * Simple class that extends the regular java.util.HashMap but overrides the
@@ -30,12 +31,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
  */
 public class JsonStringHashMap<K, V> extends LinkedHashMap<K, V> {
 
-  private static ObjectMapper mapper;
-
-  static {
-    mapper = new ObjectMapper();
-    mapper.registerModule(SerializationModule.getModule());
-  }
+  private static final ObjectMapper mapper = new ObjectMapper()
+      .registerModule(SerializationModule.getModule())
+      .registerModule(new JodaModule());
 
   @Override
   public boolean equals(Object obj) {
diff --git a/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java b/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java
index 91afaed..719e838 100644
--- a/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java
+++ b/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java
@@ -19,6 +19,7 @@ package org.apache.drill.common.config;
 
 import java.util.Set;
 
+import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
@@ -26,6 +27,7 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.common.scanner.persistence.ScanResult;
+import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,6 +58,10 @@ public class LogicalPlanPersistence {
         .addDeserializer(LogicalExpression.class, new LogicalExpression.De(conf))
         .addDeserializer(SchemaPath.class, new SchemaPath.De());
 
+    InjectableValues injectables = new InjectableValues.Std()
+        .addValue(DrillConfig.class, conf);
+
+    mapper.setInjectableValues(injectables);
     mapper.registerModule(deserModule);
     mapper.enable(SerializationFeature.INDENT_OUTPUT);
     mapper.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
@@ -68,6 +74,7 @@ public class LogicalPlanPersistence {
     registerSubtypes(getSubTypes(scanResult, StoragePluginConfig.class));
     // For FormatPluginConfigBase
     registerSubtypes(getSubTypes(scanResult, FormatPluginConfig.class));
+    registerSubtypes(getSubTypes(scanResult, CredentialsProvider.class));
   }
 
   public ObjectMapper getMapper() {
diff --git a/logical/src/main/java/org/apache/drill/common/logical/AbstractSecuredStoragePluginConfig.java b/logical/src/main/java/org/apache/drill/common/logical/AbstractSecuredStoragePluginConfig.java
new file mode 100644
index 0000000..441ce73
--- /dev/null
+++ b/logical/src/main/java/org/apache/drill/common/logical/AbstractSecuredStoragePluginConfig.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.logical;
+
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+
+public abstract class AbstractSecuredStoragePluginConfig extends StoragePluginConfig {
+
+  protected final CredentialsProvider credentialsProvider;
+  protected boolean directCredentials;
+
+  public AbstractSecuredStoragePluginConfig(CredentialsProvider credentialsProvider, boolean directCredentials) {
+    this.credentialsProvider = credentialsProvider;
+    this.directCredentials = directCredentials;
+  }
+
+  public AbstractSecuredStoragePluginConfig() {
+    this.credentialsProvider = PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
+    this.directCredentials = true;
+  }
+
+  public CredentialsProvider getCredentialsProvider() {
+    if (directCredentials) {
+      return null;
+    }
+    return credentialsProvider;
+  }
+}
diff --git a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java b/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
index 2548c1c..390e2a8 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
@@ -19,9 +19,11 @@ package org.apache.drill.common.logical;
 
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
 public abstract class StoragePluginConfig {
 
   // DO NOT include enabled status in equality and hash
diff --git a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfigBase.java b/logical/src/main/java/org/apache/drill/common/logical/security/CredentialsProvider.java
similarity index 55%
rename from logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfigBase.java
rename to logical/src/main/java/org/apache/drill/common/logical/security/CredentialsProvider.java
index d8c76d7..59b6ce0 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfigBase.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/security/CredentialsProvider.java
@@ -15,7 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.common.logical;
+package org.apache.drill.common.logical.security;
 
-public abstract class StoragePluginConfigBase extends StoragePluginConfig {
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import java.util.Map;
+
+/**
+ * Provider of authentication credentials.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
+    property = "credentialsProviderType",
+    defaultImpl = PlainCredentialsProvider.class)
+public interface CredentialsProvider {
+  /**
+   * Returns map with authentication credentials. Key is the credential name, for example {@code "username"}
+   * and map value is corresponding credential value.
+   */
+  @JsonIgnore
+  Map<String, String> getCredentials();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java b/logical/src/main/java/org/apache/drill/common/logical/security/PlainCredentialsProvider.java
similarity index 52%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java
copy to logical/src/main/java/org/apache/drill/common/logical/security/PlainCredentialsProvider.java
index b58d9ab..d157d66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/security/PlainCredentialsProvider.java
@@ -15,28 +15,36 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.mock;
-
-import org.apache.drill.common.logical.StoragePluginConfigBase;
+package org.apache.drill.common.logical.security;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
 
-@JsonTypeName(MockStorageEngineConfig.NAME)
-public class MockStorageEngineConfig extends StoragePluginConfigBase {
-  public static final String NAME = "mock";
-  public static final MockStorageEngineConfig INSTANCE = new MockStorageEngineConfig("mock:///");
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Implementation of {@link CredentialsProvider} that holds credentials provided by user.
+ * <p>
+ * Its constructor accepts a map with credential names as keys and values as corresponding credential values.
+ */
+public class PlainCredentialsProvider implements CredentialsProvider {
+  public static final CredentialsProvider EMPTY_CREDENTIALS_PROVIDER =
+      new PlainCredentialsProvider(Collections.emptyMap());
 
-  private final String url;
+  private final Map<String, String> credentials;
 
   @JsonCreator
-  public MockStorageEngineConfig(@JsonProperty("url") String url) {
-    this.url = url;
+  public PlainCredentialsProvider(@JsonProperty("credentials") Map<String, String> credentials) {
+    this.credentials = credentials;
   }
 
-  public String getUrl() {
-    return url;
+  @Override
+  @JsonIgnore(false)
+  public Map<String, String> getCredentials() {
+    return credentials;
   }
 
   @Override
@@ -47,18 +55,12 @@ public class MockStorageEngineConfig extends StoragePluginConfigBase {
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-
-    MockStorageEngineConfig that = (MockStorageEngineConfig) o;
-
-    if (url != null ? !url.equals(that.url) : that.url != null) {
-      return false;
-    }
-
-    return true;
+    PlainCredentialsProvider that = (PlainCredentialsProvider) o;
+    return Objects.equals(credentials, that.credentials);
   }
 
   @Override
   public int hashCode() {
-    return url != null ? url.hashCode() : 0;
+    return Objects.hash(credentials);
   }
 }
diff --git a/metastore/metastore-api/pom.xml b/metastore/metastore-api/pom.xml
index b184f26..0906a2c 100644
--- a/metastore/metastore-api/pom.xml
+++ b/metastore/metastore-api/pom.xml
@@ -49,7 +49,6 @@
     <dependency>
       <groupId>com.fasterxml.jackson.datatype</groupId>
       <artifactId>jackson-datatype-joda</artifactId>
-      <version>${jackson.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
diff --git a/pom.xml b/pom.xml
index e4973ff..e16bfe1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,7 +62,7 @@
     <avatica.version>1.15.0</avatica.version>
     <janino.version>3.0.11</janino.version>
     <sqlline.version>1.9.0</sqlline.version>
-    <jackson.version>2.10.3</jackson.version>
+    <jackson.version>2.12.1</jackson.version>
     <zookeeper.version>3.5.7</zookeeper.version>
     <mapr.release.version>6.1.0-mapr</mapr.release.version>
     <ojai.version>3.0-mapr-1808</ojai.version>
@@ -1599,6 +1599,11 @@
         <version>${jackson.version}</version>
       </dependency>
       <dependency>
+        <groupId>com.fasterxml.jackson.datatype</groupId>
+        <artifactId>jackson-datatype-joda</artifactId>
+        <version>${jackson.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.honton.chas.hocon</groupId>
         <artifactId>jackson-dataformat-hocon</artifactId>
         <version>1.1.1</version>