You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2022/05/16 14:09:16 UTC

[drill] branch master updated: DRILL-8220: Add User Translation Support for OAuth Enabled Plugins (#2544)

This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 58391fecd1 DRILL-8220: Add User Translation Support for OAuth Enabled Plugins (#2544)
58391fecd1 is described below

commit 58391fecd165665141402ccedf8d2789a1014751
Author: Charles S. Givre <cg...@apache.org>
AuthorDate: Mon May 16 10:09:10 2022 -0400

    DRILL-8220: Add User Translation Support for OAuth Enabled Plugins (#2544)
---
 contrib/storage-http/OAuth.md                      |  21 +-
 .../drill/exec/store/http/HttpStoragePlugin.java   |  37 +++-
 .../exec/store/http/HttpStoragePluginConfig.java   |  19 +-
 .../store/http/oauth/AccessTokenRepository.java    |   4 +-
 .../drill/exec/store/http/util/SimpleHttp.java     |   6 +-
 .../drill/exec/store/http/TestOAuthProcess.java    |   5 +-
 .../exec/store/http/TestOAuthTokenUpdate.java      |   3 +-
 .../http/TestUserTranslationInHttpPlugin.java      | 150 +++++++++-----
 .../drill/exec/oauth/OAuthTokenProvider.java       |  17 +-
 .../exec/server/rest/CredentialResources.java      |  38 +++-
 .../drill/exec/server/rest/OAuthRequests.java      | 230 +++++++++++++++++++++
 .../exec/server/rest/PluginConfigWrapper.java      |  82 ++++++++
 .../drill/exec/server/rest/StorageResources.java   | 176 +++-------------
 .../src/main/resources/rest/credentials/list.ftl   |  19 +-
 .../logical/CredentialedStoragePluginConfig.java   |  25 ++-
 .../apache/drill/common/logical/OAuthConfig.java   |  50 ++---
 16 files changed, 625 insertions(+), 257 deletions(-)

diff --git a/contrib/storage-http/OAuth.md b/contrib/storage-http/OAuth.md
index 6c95c00dc8..d1a3bda9ef 100644
--- a/contrib/storage-http/OAuth.md
+++ b/contrib/storage-http/OAuth.md
@@ -59,7 +59,7 @@ credentialProvider.
 To use OAuth2.0, you will have to create an `oAuthConfig` in the plugin configuration.  Within the `oAuthConfig`, define the `callbackURL` and `authorizationURL` parameters:
 * The `authorizationURL` is provided by the API and is the URL where the authorization code is obtained. 
 * The `callbackURL` parameter is the URL where the API will send the access token.  You must provide this when you register and obtain your client ID and client secret.  This 
-  will be in the format: `http(s)://<your drill host>/storage/<storage plugin name>update_oauth2_authtoken`
+  will be in the format: `http(s)://<your drill host>/credentials/<storage plugin name>update_oauth2_authtoken`
 * (Optional)`scope`: The scope parameter limits the scope of your access.  This is something which can be found in the remote API documentation.
 
 ### The Credential Provider
@@ -104,7 +104,7 @@ The example configuration below demonstrates how to connect Drill to the API ava
   },
   "proxyType": "direct",
   "oAuthConfig": {
-    "callbackURL": "http://localhost:8047/storage/clickup/update_oath2_authtoken",
+    "callbackURL": "http://localhost:8047/credentials/clickup/update_oath2_authtoken",
     "authorizationURL": "https://app.clickup.com/api"
   },
   "credentialsProvider": {
@@ -125,3 +125,20 @@ There are a few optional parameters in the OAuth config which you may need to se
 
 * `tokenType`:  Some OAuth enabled APIs provide a `Bearer` token.  If that is the case, this should be set to `Bearer`.
 * `authorizationParams`:  A key value parameters which are sent during the authentication process.
+
+## Enabling Individual User Credentials with OAuth 2.0
+Drill recently introduced the `USER_TRANSLATION` authorization mode, which is useful for plugins that do not have the concept of user impersonation.  This is very much the
+case for OAuth enabled APIs.  When you configure an OAuth enabled API, the client secret keys belong to the application.  Following this design pattern, each individual user
+should authorize (or not) the application.  Thus the `clientID` and `client_secret` tokens really belong to the application and the `access_token` and `refresh_token` belong 
+to the individual user.
+
+Enabling user translation is quite simple.  In the configuration for the storage plugin simply add the key below to your 
+plugin configuration.  Note that for user translation to work, user impersonation and authentication must both be enabled globally.
+
+```json
+"authMode":"USER_TRANSLATION"
+```
+
+Once you've done this, when a user logs in, they will see a new menu option at the top bar called `Credentials`.  This will contain a listing 
+of storage plugin instances that require credentials.  For OAuth enabled plugins, there will be an `Authorize` button next to the plugin name.
+Each user will have to authorize Drill to access the plugin.
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
index 6787c600be..bb086f609c 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
@@ -21,6 +21,7 @@ import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.exec.metastore.MetadataProviderManager;
 import org.apache.drill.exec.oauth.OAuthTokenProvider;
 import org.apache.drill.exec.oauth.PersistentTokenTable;
@@ -34,6 +35,7 @@ import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.base.filter.FilterPushDownUtils;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -46,7 +48,7 @@ public class HttpStoragePlugin extends AbstractStoragePlugin {
   private final HttpStoragePluginConfig config;
   private final HttpSchemaFactory schemaFactory;
   private final StoragePluginRegistry registry;
-  private final TokenRegistry tokenRegistry;
+  private TokenRegistry tokenRegistry;
 
   public HttpStoragePlugin(HttpStoragePluginConfig configuration, DrillbitContext context, String name) {
     super(context, name);
@@ -54,17 +56,30 @@ public class HttpStoragePlugin extends AbstractStoragePlugin {
     this.registry = context.getStorage();
     this.schemaFactory = new HttpSchemaFactory(this);
 
-    // Get OAuth Token Provider if needed
-    OAuthTokenProvider tokenProvider = context.getoAuthTokenProvider();
-    tokenRegistry = tokenProvider.getOauthTokenRegistry();
-    tokenRegistry.createTokenTable(getName());
+    if (config.getAuthMode() != AuthMode.USER_TRANSLATION) {
+      initializeOauthTokenTable(null);
+    }
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+
+    // For user translation mode, this is moved here because we don't have the
+    // active username in the constructor.  Removing it from the constructor makes
+    // it difficult to test, so we do the check and leave it in both places.
+    if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
+      initializeOauthTokenTable(schemaConfig.getUserName());
+    }
     schemaFactory.registerSchemas(schemaConfig, parent);
   }
 
+  @VisibleForTesting
+  public void initializeOauthTokenTable(String username) {
+    OAuthTokenProvider tokenProvider = context.getoAuthTokenProvider();
+    tokenRegistry = tokenProvider.getOauthTokenRegistry(username);
+    tokenRegistry.createTokenTable(getName());
+  }
+
   @Override
   public HttpStoragePluginConfig getConfig() {
     return config;
@@ -78,6 +93,18 @@ public class HttpStoragePlugin extends AbstractStoragePlugin {
     return tokenRegistry;
   }
 
+  /**
+   * This method returns the {@link TokenRegistry} for a given user.  It is only used for testing user translation
+   * with OAuth 2.0.
+   * @param username A {@link String} of the current active user.
+   * @return A {@link TokenRegistry} for the given user.
+   */
+  @VisibleForTesting
+  public TokenRegistry getTokenRegistry(String username) {
+    initializeOauthTokenTable(username);
+    return tokenRegistry;
+  }
+
   public PersistentTokenTable getTokenTable() { return tokenRegistry.getTokenTable(getName()); }
 
   @Override
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
index 6bd9475199..75cb1937e1 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.http;
 
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.OAuthConfig;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
 
@@ -50,8 +51,6 @@ public class HttpStoragePluginConfig extends CredentialedStoragePluginConfig {
   public final String proxyHost;
   public final int proxyPort;
   public final String proxyType;
-  public final HttpOAuthConfig oAuthConfig;
-
   /**
    * Timeout in {@link TimeUnit#SECONDS}.
    */
@@ -68,7 +67,7 @@ public class HttpStoragePluginConfig extends CredentialedStoragePluginConfig {
                                  @JsonProperty("proxyType") String proxyType,
                                  @JsonProperty("proxyUsername") String proxyUsername,
                                  @JsonProperty("proxyPassword") String proxyPassword,
-                                 @JsonProperty("oAuthConfig") HttpOAuthConfig oAuthConfig,
+                                 @JsonProperty("oAuthConfig") OAuthConfig oAuthConfig,
                                  @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider,
                                  @JsonProperty("authMode") String authMode
                                  ) {
@@ -82,8 +81,8 @@ public class HttpStoragePluginConfig extends CredentialedStoragePluginConfig {
         normalize(proxyPassword),
         credentialsProvider),
         credentialsProvider == null,
-        AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER)
-    );
+        AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER),
+      oAuthConfig);
     this.cacheResults = cacheResults != null && cacheResults;
 
     this.connections = CaseInsensitiveMap.newHashMap();
@@ -94,7 +93,6 @@ public class HttpStoragePluginConfig extends CredentialedStoragePluginConfig {
     this.timeout = timeout == null ? 0 : timeout;
     this.proxyHost = normalize(proxyHost);
     this.proxyPort = proxyPort == null ? 0 : proxyPort;
-    this.oAuthConfig = oAuthConfig;
 
     proxyType = normalize(proxyType);
     this.proxyType = proxyType == null
@@ -130,7 +128,7 @@ public class HttpStoragePluginConfig extends CredentialedStoragePluginConfig {
    * @param that The current HTTP Plugin Config
    * @param oAuthConfig The updated OAuth config
    */
-  public HttpStoragePluginConfig(HttpStoragePluginConfig that, HttpOAuthConfig oAuthConfig) {
+  public HttpStoragePluginConfig(HttpStoragePluginConfig that, OAuthConfig oAuthConfig) {
     super(CredentialProviderUtils.getCredentialsProvider(that.proxyUsername(), that.proxyPassword(), that.credentialsProvider),
       that.credentialsProvider == null);
 
@@ -140,7 +138,7 @@ public class HttpStoragePluginConfig extends CredentialedStoragePluginConfig {
     this.proxyHost = that.proxyHost;
     this.proxyPort = that.proxyPort;
     this.proxyType = that.proxyType;
-    this.oAuthConfig = oAuthConfig;
+    this.oAuthConfig = that.oAuthConfig;
   }
 
   private static String normalize(String value) {
@@ -234,11 +232,6 @@ public class HttpStoragePluginConfig extends CredentialedStoragePluginConfig {
   @JsonProperty("proxyPort")
   public int proxyPort() { return proxyPort; }
 
-  @JsonProperty("oAuthConfig")
-  public HttpOAuthConfig oAuthConfig() {
-    return oAuthConfig;
-  }
-
   @JsonProperty("username")
   public String username() {
     if (!directCredentials) {
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/oauth/AccessTokenRepository.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/oauth/AccessTokenRepository.java
index 48ce501967..4886f8e327 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/oauth/AccessTokenRepository.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/oauth/AccessTokenRepository.java
@@ -23,9 +23,9 @@ import okhttp3.OkHttpClient;
 import okhttp3.Request;
 
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.OAuthConfig;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.oauth.PersistentTokenTable;
-import org.apache.drill.exec.store.http.HttpOAuthConfig;
 import org.apache.drill.exec.store.http.HttpStoragePluginConfig;
 import org.apache.drill.exec.store.http.util.HttpProxyConfig;
 import org.apache.drill.exec.store.http.util.SimpleHttp;
@@ -69,7 +69,7 @@ public class AccessTokenRepository {
     client = builder.build();
   }
 
-  public HttpOAuthConfig getOAuthConfig() {
+  public OAuthConfig getOAuthConfig() {
     return pluginConfig.oAuthConfig();
   }
 
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 936008a96e..0ec18de858 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -32,6 +32,7 @@ import okhttp3.Response;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
+import org.apache.drill.common.logical.OAuthConfig;
 import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.exceptions.CustomErrorContext;
@@ -47,7 +48,6 @@ import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
 import org.apache.drill.exec.store.http.HttpApiConfig;
 import org.apache.drill.exec.store.http.HttpApiConfig.HttpMethod;
 import org.apache.drill.exec.store.http.HttpApiConfig.PostLocation;
-import org.apache.drill.exec.store.http.HttpOAuthConfig;
 import org.apache.drill.exec.store.http.HttpStoragePlugin;
 import org.apache.drill.exec.store.http.HttpStoragePluginConfig;
 import org.apache.drill.exec.store.http.HttpSubScan;
@@ -114,7 +114,7 @@ public class SimpleHttp {
   private final String connection;
   private final HttpStoragePluginConfig pluginConfig;
   private final HttpApiConfig apiConfig;
-  private final HttpOAuthConfig oAuthConfig;
+  private final OAuthConfig oAuthConfig;
   private String responseMessage;
   private int responseCode;
   private String responseProtocol;
@@ -967,7 +967,7 @@ public class SimpleHttp {
     private PersistentTokenTable tokenTable;
     private HttpStoragePluginConfig pluginConfig;
     private HttpApiConfig endpointConfig;
-    private HttpOAuthConfig oAuthConfig;
+    private OAuthConfig oAuthConfig;
     private Map<String,String> filters;
     private String connection;
     private String username;
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
index daccab7519..09b1c4470a 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
@@ -23,6 +23,7 @@ import okhttp3.Request;
 import okhttp3.Response;
 import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
+import org.apache.drill.common.logical.OAuthConfig;
 import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
@@ -58,7 +59,7 @@ import static org.junit.Assert.fail;
 public class TestOAuthProcess extends ClusterTest {
 
   private static final Logger logger = LoggerFactory.getLogger(TestOAuthProcess.class);
-  private static final int MOCK_SERVER_PORT = 47770;
+  private static final int MOCK_SERVER_PORT = 47775;
 
   private static final int TIMEOUT = 30;
   private static final String CONNECTION_NAME = "localOauth";
@@ -101,7 +102,7 @@ public class TestOAuthProcess extends ClusterTest {
       .inputType("json")
       .build();
 
-    HttpOAuthConfig oAuthConfig = HttpOAuthConfig.builder()
+    OAuthConfig oAuthConfig = OAuthConfig.builder()
       .callbackURL(hostname + "/update_oauth2_authtoken")
       .build();
 
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java
index 81739d5c3f..0490005d5b 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java
@@ -24,6 +24,7 @@ import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.RequestBody;
 import okhttp3.Response;
+import org.apache.drill.common.logical.OAuthConfig;
 import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
@@ -81,7 +82,7 @@ public class TestOAuthTokenUpdate extends ClusterTest {
       .inputType("json")
       .build();
 
-    HttpOAuthConfig oAuthConfig = HttpOAuthConfig.builder()
+    OAuthConfig oAuthConfig = OAuthConfig.builder()
       .callbackURL(hostname + "/update_ouath2_authtoken")
       .build();
 
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java
index 2256393563..3b9af438a9 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java
@@ -20,13 +20,9 @@ package org.apache.drill.exec.store.http;
 
 import okhttp3.Cookie;
 import okhttp3.CookieJar;
-import okhttp3.FormBody;
 import okhttp3.Headers;
 import okhttp3.HttpUrl;
 import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import okhttp3.Response;
 import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
@@ -35,14 +31,22 @@ import org.apache.commons.net.util.Base64;
 import org.apache.drill.common.config.DrillProperties;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
+import org.apache.drill.common.logical.OAuthConfig;
 import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.util.DrillFileUtils;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.oauth.PersistentTokenTable;
 import org.apache.drill.exec.physical.rowSet.RowSet;
-import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.StoragePlugin;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.security.oauth.OAuthTokenCredentials;
 import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 import org.apache.drill.shaded.guava.com.google.common.io.Files;
 import org.apache.drill.test.BaseDirTestWatcher;
@@ -50,10 +54,14 @@ import org.apache.drill.test.ClientFixture;
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.jetbrains.annotations.NotNull;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -72,20 +80,20 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-
 public class TestUserTranslationInHttpPlugin extends ClusterTest {
+  private static final Logger logger = LoggerFactory.getLogger(TestUserTranslationInHttpPlugin.class);
 
   private static final int MOCK_SERVER_PORT = 47775;
-
   private static final int TIMEOUT = 30;
-  private final OkHttpClient httpClient = new OkHttpClient.Builder()
+  private final OkHttpClient httpClient = new OkHttpClient
+    .Builder()
     .connectTimeout(TIMEOUT, TimeUnit.SECONDS)
     .writeTimeout(TIMEOUT, TimeUnit.SECONDS)
     .readTimeout(TIMEOUT, TimeUnit.SECONDS)
     .cookieJar(new TestCookieJar())
     .build();
-
   private static String TEST_JSON_RESPONSE_WITH_DATATYPES;
+  private static String ACCESS_TOKEN_RESPONSE;
   private static int portNumber;
 
 
@@ -100,11 +108,11 @@ public class TestUserTranslationInHttpPlugin extends ClusterTest {
   @BeforeClass
   public static void setup() throws Exception {
     TEST_JSON_RESPONSE_WITH_DATATYPES = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response2.json"), Charsets.UTF_8).read();
+    ACCESS_TOKEN_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/oauth_access_token_response.json"), Charsets.UTF_8).read();
 
     ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher)
       .configProperty(ExecConstants.HTTP_ENABLE, true)
       .configProperty(ExecConstants.HTTP_PORT_HUNT, true)
-      .configProperty(ExecConstants.USER_AUTHENTICATOR_IMPL, UserAuthenticatorTestImpl.TYPE)
       .configProperty(ExecConstants.IMPERSONATION_ENABLED, true);
 
     startCluster(builder);
@@ -119,27 +127,44 @@ public class TestUserTranslationInHttpPlugin extends ClusterTest {
       .errorOn400(true)
       .build();
 
+    OAuthConfig oAuthConfig = OAuthConfig.builder()
+      .callbackURL(makeUrl("http://localhost:%d") + "/update_oauth2_authtoken")
+      .build();
+
+    Map<String, String> oauthCreds = new HashMap<>();
+    oauthCreds.put("clientID", "12345");
+    oauthCreds.put("clientSecret", "54321");
+    oauthCreds.put(OAuthTokenCredentials.TOKEN_URI, "http://localhost:" + MOCK_SERVER_PORT + "/get_access_token");
+
+    CredentialsProvider oauthCredentialProvider = new PlainCredentialsProvider(oauthCreds);
+
+
+
     Map<String, HttpApiConfig> configs = new HashMap<>();
     configs.put("sharedEndpoint", testEndpoint);
 
-
     Map<String, String> credentials = new HashMap<>();
     credentials.put("username", "user2user");
     credentials.put("password", "user2pass");
 
     PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(TEST_USER_2, credentials);
 
-
-    HttpStoragePluginConfig mockStorageConfigWithWorkspace =
-      new HttpStoragePluginConfig(false, configs, 2, null, null, "",
-        80, "", "", "", null, credentialsProvider, AuthMode.USER_TRANSLATION.name());
+    HttpStoragePluginConfig mockStorageConfigWithWorkspace = new HttpStoragePluginConfig(false, configs, 2, null, null, "", 80, "", "", "", null, credentialsProvider,
+      AuthMode.USER_TRANSLATION.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
+
+    HttpStoragePluginConfig mockOAuthPlugin = new HttpStoragePluginConfig(false, configs, 2, null, null, "", 80, "", "", "", oAuthConfig, oauthCredentialProvider,
+      AuthMode.USER_TRANSLATION.name());
+    mockOAuthPlugin.setEnabled(true);
+
     cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
+    cluster.defineStoragePlugin("oauth", mockOAuthPlugin);
   }
 
   @Test
   public void testEmptyUserCredentials() throws Exception {
-    ClientFixture client = cluster.clientBuilder()
+    ClientFixture client = cluster
+      .clientBuilder()
       .property(DrillProperties.USER, TEST_USER_1)
       .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
       .build();
@@ -147,7 +172,7 @@ public class TestUserTranslationInHttpPlugin extends ClusterTest {
     // First verify that the user has no credentials
     StoragePluginRegistry registry = cluster.storageRegistry();
     StoragePlugin plugin = registry.getPlugin("local");
-    PlainCredentialsProvider credentialsProvider = (PlainCredentialsProvider)((CredentialedStoragePluginConfig)plugin.getConfig()).getCredentialsProvider();
+    PlainCredentialsProvider credentialsProvider = (PlainCredentialsProvider) ((CredentialedStoragePluginConfig) plugin.getConfig()).getCredentialsProvider();
     Map<String, String> credentials = credentialsProvider.getCredentials(TEST_USER_1);
     assertNotNull(credentials);
     assertNull(credentials.get("username"));
@@ -157,15 +182,14 @@ public class TestUserTranslationInHttpPlugin extends ClusterTest {
   @Test
   public void testQueryWithValidCredentials() throws Exception {
     // This test validates that the correct credentials are sent down to the HTTP API.
-    ClientFixture client = cluster.clientBuilder()
+    ClientFixture client = cluster
+      .clientBuilder()
       .property(DrillProperties.USER, TEST_USER_2)
       .property(DrillProperties.PASSWORD, TEST_USER_2_PASSWORD)
       .build();
 
     try (MockWebServer server = startServer()) {
-      server.enqueue(new MockResponse()
-        .setResponseCode(200)
-        .setBody(TEST_JSON_RESPONSE_WITH_DATATYPES));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_RESPONSE_WITH_DATATYPES));
 
       String sql = "SELECT * FROM local.sharedEndpoint";
       RowSet results = client.queryBuilder().sql(sql).rowSet();
@@ -175,22 +199,21 @@ public class TestUserTranslationInHttpPlugin extends ClusterTest {
       // Verify correct username/password from endpoint configuration
       RecordedRequest recordedRequest = server.takeRequest();
       Headers headers = recordedRequest.getHeaders();
-      assertEquals(headers.get("Authorization"), createEncodedText("user2user", "user2pass") );
+      assertEquals(headers.get("Authorization"), createEncodedText("user2user", "user2pass"));
     }
   }
 
   @Test
   public void testQueryWithMissingCredentials() throws Exception {
     // This test validates that the correct credentials are sent down to the HTTP API.
-    ClientFixture client = cluster.clientBuilder()
+    ClientFixture client = cluster
+      .clientBuilder()
       .property(DrillProperties.USER, TEST_USER_1)
       .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
       .build();
 
     try (MockWebServer server = startServer()) {
-      server.enqueue(new MockResponse()
-        .setResponseCode(200)
-        .setBody(TEST_JSON_RESPONSE_WITH_DATATYPES));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_RESPONSE_WITH_DATATYPES));
 
       String sql = "SELECT * FROM local.sharedEndpoint";
       try {
@@ -202,30 +225,65 @@ public class TestUserTranslationInHttpPlugin extends ClusterTest {
     }
   }
 
-  private boolean makeLoginRequest(String username, String password) throws IOException {
-    String loginURL =  "http://localhost:" + portNumber + "/j_security_check";
-
-    RequestBody formBody = new FormBody.Builder()
-      .add("j_username", username)
-      .add("j_password", password)
+  @Test
+  public void testQueryWithOAuth() throws Exception {
+    ClientFixture client = cluster
+      .clientBuilder()
+      .property(DrillProperties.USER, TEST_USER_2)
+      .property(DrillProperties.PASSWORD, TEST_USER_2_PASSWORD)
       .build();
 
-    Request request = new Request.Builder()
-      .url(loginURL)
-      .post(formBody)
-      .addHeader("Content-Type", "application/x-www-form-urlencoded")
-      .addHeader("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8")
-      .build();
+    try (MockWebServer server = startServer()) {
+      // Get the token table for test user 2, which should be empty
+      PersistentTokenTable tokenTable = ((HttpStoragePlugin) cluster.storageRegistry()
+        .getPlugin("oauth"))
+        .getTokenRegistry(TEST_USER_2)
+        .getTokenTable("oauth");
+
+      // Add the access tokens for user 2
+      tokenTable.setAccessToken("you_have_access_2");
+      tokenTable.setRefreshToken("refresh_me_2");
+
+      assertEquals("you_have_access_2", tokenTable.getAccessToken());
+      assertEquals("refresh_me_2", tokenTable.getRefreshToken());
+
+      // Now execute a query and get query results.
+      server.enqueue(new MockResponse()
+        .setResponseCode(200)
+        .setBody(TEST_JSON_RESPONSE_WITH_DATATYPES));
+
+      String sql = "SELECT * FROM oauth.sharedEndpoint";
+      RowSet results = queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("col_1", MinorType.FLOAT8, DataMode.OPTIONAL)
+        .add("col_2", MinorType.BIGINT, DataMode.OPTIONAL)
+        .add("col_3", MinorType.VARCHAR, DataMode.OPTIONAL)
+        .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow(1.0, 2, "3.0")
+        .addRow(4.0, 5, "6.0")
+        .build();
+
+      RowSetUtilities.verify(expected, results);
 
-    Response response = httpClient.newCall(request).execute();
-    return response.code() == 200;
+      // Verify the correct tokens were passed
+      RecordedRequest recordedRequest = server.takeRequest();
+      String authToken = recordedRequest.getHeader("Authorization");
+      assertEquals("you_have_access_2", authToken);
+    } catch (Exception e) {
+      logger.debug(e.getMessage());
+      fail();
+    }
   }
 
   @Test
   public void testUnrelatedQueryWithUser() throws Exception {
     // This test verifies that a query with a user that does NOT have credentials
     // for a plugin using user translation will still execute.
-    ClientFixture client = cluster.clientBuilder()
+    ClientFixture client = cluster
+      .clientBuilder()
       .property(DrillProperties.USER, TEST_USER_1)
       .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
       .build();
@@ -237,10 +295,11 @@ public class TestUserTranslationInHttpPlugin extends ClusterTest {
 
   /**
    * Helper function to start the MockHTTPServer
+   *
    * @return Started Mock server
    * @throws IOException If the server cannot start, throws IOException
    */
-  public static MockWebServer startServer () throws IOException {
+  public static MockWebServer startServer() throws IOException {
     MockWebServer server = new MockWebServer();
     server.start(MOCK_SERVER_PORT);
     return server;
@@ -257,22 +316,19 @@ public class TestUserTranslationInHttpPlugin extends ClusterTest {
   }
 
   public static class TestCookieJar implements CookieJar {
-
     private List<Cookie> cookies;
 
     @Override
     public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {
-      this.cookies =  cookies;
+      this.cookies = cookies;
     }
 
     @Override
-    public List<Cookie> loadForRequest(HttpUrl url) {
+    public List<Cookie> loadForRequest(@NotNull HttpUrl url) {
       if (cookies != null) {
         return cookies;
       }
       return new ArrayList<>();
     }
   }
-
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/oauth/OAuthTokenProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/oauth/OAuthTokenProvider.java
index c6a29175cc..8fba531b21 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/oauth/OAuthTokenProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/oauth/OAuthTokenProvider.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.oauth;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.exec.server.DrillbitContext;
 
@@ -35,16 +36,24 @@ public class OAuthTokenProvider implements AutoCloseable {
     this.context = context;
   }
 
-  public TokenRegistry getOauthTokenRegistry() {
+  public TokenRegistry getOauthTokenRegistry(String username) {
     if (oauthTokenRegistry == null) {
-      initRemoteRegistries();
+      initRemoteRegistries(username);
     }
     return oauthTokenRegistry;
   }
 
-  private synchronized void initRemoteRegistries() {
+  private synchronized void initRemoteRegistries(String username) {
+    // Add the username to the path if present
+    String finalpath;
+    if (StringUtils.isNotEmpty(username)) {
+      finalpath = STORAGE_REGISTRY_PATH + "/" + username;
+    } else {
+      finalpath = STORAGE_REGISTRY_PATH;
+    }
+
     if (oauthTokenRegistry == null) {
-      oauthTokenRegistry = new PersistentTokenRegistry(context, STORAGE_REGISTRY_PATH);
+      oauthTokenRegistry = new PersistentTokenRegistry(context, finalpath);
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/CredentialResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/CredentialResources.java
index 02c784ea0d..f50c07bd7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/CredentialResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/CredentialResources.java
@@ -43,6 +43,7 @@ import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
@@ -136,7 +137,6 @@ public class CredentialResources {
     } else {
       return results;
     }
-
   }
 
   @POST
@@ -247,6 +247,42 @@ public class CredentialResources {
       .build();
   }
 
+  @POST
+  @Path("/credentials/{name}/update_refresh_token")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Operation(externalDocs = @ExternalDocumentation(description = "Apache Drill REST API documentation:", url = "https://drill.apache.org/docs/rest-api-introduction/"))
+  public Response updateRefreshToken(@PathParam("name") String name, OAuthTokenContainer tokens) {
+    return OAuthRequests.updateRefreshToken(name, tokens, storage, authEnabled, sc);
+  }
+
+  @POST
+  @Path("/credentials/{name}/update_access_token")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Operation(externalDocs = @ExternalDocumentation(description = "Apache Drill REST API documentation:", url = "https://drill.apache.org/docs/rest-api-introduction/"))
+  public Response updateAccessToken(@PathParam("name") String name, OAuthTokenContainer tokens) {
+    return OAuthRequests.updateAccessToken(name, tokens, storage, authEnabled, sc);
+  }
+
+  @POST
+  @Path("/credentials/{name}/update_oauth_tokens")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Operation(externalDocs = @ExternalDocumentation(description = "Apache Drill REST API documentation:", url = "https://drill.apache.org/docs/rest-api-introduction/"))
+  public Response updateOAuthTokens(@PathParam("name") String name,
+                                    OAuthTokenContainer tokenContainer) {
+    return OAuthRequests.updateOAuthTokens(name, tokenContainer, storage, authEnabled, sc);
+  }
+
+  @GET
+  @Path("/credentials/{name}/update_oauth2_authtoken")
+  @Produces(MediaType.TEXT_HTML)
+  @Operation(externalDocs = @ExternalDocumentation(description = "Apache Drill REST API documentation:", url = "https://drill.apache.org/docs/rest-api-introduction/"))
+  public Response updateAuthToken(@PathParam("name") String name, @QueryParam("code") String code) {
+    return OAuthRequests.updateAuthToken(name, code, request, storage, authEnabled, sc);
+  }
+
   private JsonResult message(String message, Object... args) {
     return new JsonResult(String.format(message, args)); // lgtm [java/tainted-format-string]
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/OAuthRequests.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/OAuthRequests.java
new file mode 100644
index 0000000000..6c817c8d28
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/OAuthRequests.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.server.rest;
+
+import okhttp3.OkHttpClient;
+import okhttp3.OkHttpClient.Builder;
+import okhttp3.Request;
+import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.oauth.OAuthTokenProvider;
+import org.apache.drill.exec.oauth.PersistentTokenTable;
+import org.apache.drill.exec.oauth.TokenRegistry;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
+import org.apache.drill.exec.server.rest.StorageResources.JsonResult;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.exec.store.http.oauth.OAuthUtils;
+import org.apache.drill.exec.store.security.oauth.OAuthTokenCredentials;
+import org.eclipse.jetty.util.resource.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.SecurityContext;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class OAuthRequests {
+
+  private static final Logger logger = LoggerFactory.getLogger(OAuthRequests.class);
+  private static final String OAUTH_SUCCESS_PAGE = "/rest/storage/success.html";
+
+  public static Response updateAccessToken(String name,
+                                           OAuthTokenContainer tokens,
+                                           StoragePluginRegistry storage,
+                                           UserAuthEnabled authEnabled,
+                                           SecurityContext sc) {
+    try {
+      if (storage.getPlugin(name).getConfig() instanceof CredentialedStoragePluginConfig) {
+        DrillbitContext context = ((AbstractStoragePlugin) storage.getPlugin(name)).getContext();
+        OAuthTokenProvider tokenProvider = context.getoAuthTokenProvider();
+        PersistentTokenTable tokenTable = tokenProvider.getOauthTokenRegistry(getQueryUser(storage.getPlugin(name).getConfig(), authEnabled, sc)).getTokenTable(name);
+
+        // Set the access token
+        tokenTable.setAccessToken(tokens.getAccessToken());
+
+        return Response.status(Status.OK)
+          .entity("Access tokens have been updated.")
+          .build();
+      } else {
+        logger.error("{} does not support OAuth2.0.  You can only add tokens to OAuth enabled plugins.", name);
+        return Response.status(Status.INTERNAL_SERVER_ERROR)
+          .entity(message("Unable to add tokens: %s", name))
+          .build();
+      }
+    } catch (PluginException e) {
+      logger.error("Error when adding tokens to {}", name);
+      return Response.status(Status.INTERNAL_SERVER_ERROR)
+        .entity(message("Unable to add tokens: %s", e.getMessage()))
+        .build();
+    }
+  }
+
+  public static Response updateRefreshToken(String name, OAuthTokenContainer tokens,
+                                            StoragePluginRegistry storage, UserAuthEnabled authEnabled,
+                                            SecurityContext sc) {
+    try {
+      if (storage.getPlugin(name).getConfig() instanceof CredentialedStoragePluginConfig) {
+        DrillbitContext context = ((AbstractStoragePlugin) storage.getPlugin(name)).getContext();
+        OAuthTokenProvider tokenProvider = context.getoAuthTokenProvider();
+        PersistentTokenTable tokenTable = tokenProvider.getOauthTokenRegistry(
+          getQueryUser(storage.getPlugin(name).getConfig(), authEnabled, sc)).getTokenTable(name);
+
+        // Set the access token
+        tokenTable.setRefreshToken(tokens.getRefreshToken());
+
+        return Response.status(Status.OK)
+          .entity("Refresh token have been updated.")
+          .build();
+      } else {
+        logger.error("{} is not a HTTP plugin. You can only add access tokens to HTTP plugins.", name);
+        return Response.status(Status.INTERNAL_SERVER_ERROR)
+          .entity(message("Unable to add tokens: %s", name))
+          .build();
+      }
+    } catch (PluginException e) {
+      logger.error("Error when adding tokens to {}", name);
+      return Response.status(Status.INTERNAL_SERVER_ERROR)
+        .entity(message("Unable to add tokens: %s", e.getMessage()))
+        .build();
+    }
+  }
+
+  public static Response updateOAuthTokens(String name, OAuthTokenContainer tokenContainer, StoragePluginRegistry storage,
+                                           UserAuthEnabled authEnabled, SecurityContext sc) {
+    try {
+      if (storage.getPlugin(name).getConfig() instanceof CredentialedStoragePluginConfig) {
+        DrillbitContext context = ((AbstractStoragePlugin) storage.getPlugin(name)).getContext();
+        OAuthTokenProvider tokenProvider = context.getoAuthTokenProvider();
+        PersistentTokenTable tokenTable = tokenProvider
+          .getOauthTokenRegistry(getQueryUser(storage.getPlugin(name).getConfig(), authEnabled, sc))
+          .getTokenTable(name);
+
+        // Set the access and refresh token
+        tokenTable.setAccessToken(tokenContainer.getAccessToken());
+        tokenTable.setRefreshToken(tokenContainer.getRefreshToken());
+
+        return Response.status(Status.OK)
+          .entity("Access tokens have been updated.")
+          .build();
+      } else {
+        logger.error("{} is not a HTTP plugin. You can only add access tokens to HTTP plugins.", name);
+        return Response.status(Status.INTERNAL_SERVER_ERROR)
+          .entity(message("Unable to add tokens: %s", name))
+          .build();
+      }
+    } catch (PluginException e) {
+      logger.error("Error when adding tokens to {}", name);
+      return Response.status(Status.INTERNAL_SERVER_ERROR)
+        .entity(message("Unable to add tokens: %s", e.getMessage()))
+        .build();
+    }
+  }
+
+  public static Response updateAuthToken(String name, String code, HttpServletRequest request,
+                                         StoragePluginRegistry storage, UserAuthEnabled authEnabled,
+                                         SecurityContext sc) {
+    try {
+      if (storage.getPlugin(name).getConfig() instanceof CredentialedStoragePluginConfig) {
+        CredentialedStoragePluginConfig securedStoragePluginConfig = (CredentialedStoragePluginConfig) storage.getPlugin(name).getConfig();
+        CredentialsProvider credentialsProvider = securedStoragePluginConfig.getCredentialsProvider();
+        String callbackURL = request.getRequestURL().toString();
+
+        // Now exchange the authorization token for an access token
+        Builder builder = new OkHttpClient.Builder();
+        OkHttpClient client = builder.build();
+
+        Request accessTokenRequest = OAuthUtils.getAccessTokenRequest(credentialsProvider, code, callbackURL);
+        Map<String, String> updatedTokens = OAuthUtils.getOAuthTokens(client, accessTokenRequest);
+
+        // Add to token registry
+        // If USER_TRANSLATION is enabled, Drill will create a token table for each user.
+        TokenRegistry tokenRegistry = ((AbstractStoragePlugin) storage.getPlugin(name))
+          .getContext()
+          .getoAuthTokenProvider()
+          .getOauthTokenRegistry(getQueryUser(storage.getPlugin(name).getConfig(), authEnabled, sc));
+
+        // Add a token registry table if none exists
+        tokenRegistry.createTokenTable(name);
+        PersistentTokenTable tokenTable = tokenRegistry.getTokenTable(name);
+
+        // Add tokens to persistent storage
+        tokenTable.setAccessToken(updatedTokens.get(OAuthTokenCredentials.ACCESS_TOKEN));
+        tokenTable.setRefreshToken(updatedTokens.get(OAuthTokenCredentials.REFRESH_TOKEN));
+
+        // Get success page
+        String successPage = null;
+        try (InputStream inputStream = Resource.newClassPathResource(OAUTH_SUCCESS_PAGE).getInputStream()) {
+          InputStreamReader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
+          BufferedReader bufferedReader = new BufferedReader(reader);
+          successPage = bufferedReader.lines()
+            .collect(Collectors.joining("\n"));
+          bufferedReader.close();
+          reader.close();
+        } catch (IOException e) {
+          return Response.status(Status.OK).entity("You may close this window.").build();
+        }
+
+        return Response.status(Status.OK).entity(successPage).build();
+      } else {
+        logger.error("{} is not a HTTP plugin. You can only add auth code to HTTP plugins.", name);
+        return Response.status(Status.INTERNAL_SERVER_ERROR)
+          .entity(message("Unable to add authorization code: %s", name))
+          .build();
+      }
+    } catch (PluginException e) {
+      logger.error("Error when adding auth token to {}", name);
+      return Response.status(Status.INTERNAL_SERVER_ERROR)
+        .entity(message("Unable to add authorization code: %s", e.getMessage()))
+        .build();
+    }
+  }
+
+  private static JsonResult message(String message, Object... args) {
+    return new JsonResult(String.format(message, args));  // lgtm [java/tainted-format-string]
+  }
+
+  /**
+   * This function checks to see if a given storage plugin is using USER_TRANSLATION mode and if user
+   * authentication is enabled.  If so, it will return the active user name.  If not it will return null.
+   * @param config {@link StoragePluginConfig} The current plugin configuration
+   * @return If USER_TRANSLATION is enabled, returns the active user.  If not, returns null.
+   */
+  private static String getQueryUser(StoragePluginConfig config,
+                                     UserAuthEnabled authEnabled,
+                                     SecurityContext sc) {
+    if (config.getAuthMode() == AuthMode.USER_TRANSLATION && authEnabled.get()) {
+      return sc.getUserPrincipal().getName();
+    } else {
+      return null;
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java
index 69d621178e..bbc87cda69 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java
@@ -17,12 +17,20 @@
  */
 package org.apache.drill.exec.server.rest;
 
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 
 import javax.xml.bind.annotation.XmlRootElement;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
+import org.apache.drill.common.logical.OAuthConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -32,9 +40,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.exec.store.security.oauth.OAuthTokenCredentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @XmlRootElement
 public class PluginConfigWrapper {
+  private static final Logger logger = LoggerFactory.getLogger(PluginConfigWrapper.class);
   private final String name;
   private final StoragePluginConfig config;
 
@@ -114,4 +125,75 @@ public class PluginConfigWrapper {
 
     return tokenCredentials.map(OAuthTokenCredentials::getClientID).orElse(null) != null;
   }
+
+  @JsonIgnore
+  public String getClientID() {
+    CredentialedStoragePluginConfig securedStoragePluginConfig = (CredentialedStoragePluginConfig) config;
+    CredentialsProvider credentialsProvider = securedStoragePluginConfig.getCredentialsProvider();
+
+    return credentialsProvider.getCredentials().getOrDefault("clientID", "");
+  }
+
+  /**
+   * This function generates the authorization URI for use when a non-admin user is authorizing
+   * OAuth2.0 access for a storage plugin.  This function is necessary as we do not wish to expose
+   * any plugin configuration information to the user.
+   *
+   * If the plugin is not OAuth, or is missing components, the function will return an empty string.
+   * @return The authorization URI for an OAuth enabled plugin.
+   */
+  @JsonIgnore
+  public String getAuthorizationURIWithParams() {
+    if (!isOauth()) {
+      logger.warn("{} is not an OAuth enabled storage plugin", name);
+      return "";
+    }
+
+    String clientID = getClientID();
+    OAuthConfig oAuthConfig = ((CredentialedStoragePluginConfig)config).oAuthConfig();
+    String authorizationURI = oAuthConfig.getAuthorizationURL();
+
+    StringBuilder finalUrlBuilder = new StringBuilder();
+
+    // Add the client id and redirect URI
+    finalUrlBuilder.append(authorizationURI)
+      .append("?client_id=")
+      .append(clientID)
+      .append("&redirect_uri=")
+      .append(oAuthConfig.getCallbackURL());
+
+    // Add scope if populated
+    if (StringUtils.isNotEmpty(oAuthConfig.getScope())) {
+      finalUrlBuilder.append("&scope=")
+        .append(URLEncodeValue(oAuthConfig.getScope()));
+    }
+
+    // Add additional params if present
+    Map<String,String> params = oAuthConfig.getAuthorizationParams();
+    if (params != null) {
+      for (Entry<String, String> param: params.entrySet()) {
+        finalUrlBuilder.append("&")
+          .append(param.getKey())
+          .append("=")
+          .append(URLEncodeValue(param.getValue()));
+      }
+    }
+    return finalUrlBuilder.toString();
+  }
+
+  /**
+   * URL Encodes a String.  Throws a {@link UserException} if anything goes wrong.
+   * @param value The unencoded String
+   * @return The URL encoded version of the input String
+   */
+  private String URLEncodeValue(String value) {
+    try {
+      return URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
+    } catch (UnsupportedEncodingException e) {
+      throw UserException
+        .internalError(e)
+        .message("Error encoding value: " + value)
+        .build(logger);
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
index 0dff0818b1..23030ccc27 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
@@ -17,15 +17,9 @@
  */
 package org.apache.drill.exec.server.rest;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
 import java.util.Spliterator;
 import java.util.Spliterators;
 import java.util.stream.Collectors;
@@ -46,22 +40,15 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.SecurityContext;
 import javax.xml.bind.annotation.XmlRootElement;
 
 import io.swagger.v3.oas.annotations.ExternalDocumentation;
 import io.swagger.v3.oas.annotations.Operation;
-import okhttp3.OkHttpClient;
-import okhttp3.OkHttpClient.Builder;
-import okhttp3.Request;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.logical.CredentialedStoragePluginConfig;
-import org.apache.drill.common.logical.security.CredentialsProvider;
-import org.apache.drill.exec.oauth.OAuthTokenProvider;
-import org.apache.drill.exec.oauth.PersistentTokenTable;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.exec.oauth.TokenRegistry;
-import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -69,9 +56,6 @@ import org.apache.drill.exec.store.StoragePluginRegistry.PluginEncodingException
 import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
 import org.apache.drill.exec.store.StoragePluginRegistry.PluginFilter;
 import org.apache.drill.exec.store.StoragePluginRegistry.PluginNotFoundException;
-import org.apache.drill.exec.store.http.oauth.OAuthUtils;
-import org.apache.drill.exec.store.security.oauth.OAuthTokenCredentials;
-import org.eclipse.jetty.util.resource.Resource;
 import org.glassfish.jersey.server.mvc.Viewable;
 
 import org.slf4j.Logger;
@@ -103,7 +87,6 @@ public class StorageResources {
   private static final String ALL_PLUGINS = "all";
   private static final String ENABLED_PLUGINS = "enabled";
   private static final String DISABLED_PLUGINS = "disabled";
-  private static final String OAUTH_SUCCESS_PAGE = "/rest/storage/success.html";
 
   private static final Comparator<PluginConfigWrapper> PLUGIN_COMPARATOR =
     Comparator.comparing(PluginConfigWrapper::getName);
@@ -211,153 +194,40 @@ public class StorageResources {
   @Path("/storage/{name}/update_refresh_token")
   @Consumes(MediaType.APPLICATION_JSON)
   @Produces(MediaType.APPLICATION_JSON)
+  @Deprecated
   public Response updateRefreshToken(@PathParam("name") String name, OAuthTokenContainer tokens) {
-    try {
-      if (storage.getPlugin(name).getConfig() instanceof CredentialedStoragePluginConfig) {
-        DrillbitContext context = ((AbstractStoragePlugin) storage.getPlugin(name)).getContext();
-        OAuthTokenProvider tokenProvider = context.getoAuthTokenProvider();
-        PersistentTokenTable tokenTable = tokenProvider.getOauthTokenRegistry().getTokenTable(name);
-
-        // Set the access token
-        tokenTable.setRefreshToken(tokens.getRefreshToken());
-
-        return Response.status(Status.OK)
-          .entity("Refresh token have been updated.")
-          .build();
-      } else {
-        logger.error("{} is not a HTTP plugin. You can only add access tokens to HTTP plugins.", name);
-        return Response.status(Status.INTERNAL_SERVER_ERROR)
-          .entity(message("Unable to add tokens: %s", name))
-          .build();
-      }
-    } catch (PluginException e) {
-      logger.error("Error when adding tokens to {}", name);
-      return Response.status(Status.INTERNAL_SERVER_ERROR)
-        .entity(message("Unable to add tokens: %s", e.getMessage()))
-        .build();
-    }
+    // This endpoint is deprecated.  Use the same path in credentials resources instead.
+    return OAuthRequests.updateRefreshToken(name, tokens, storage, authEnabled, sc);
   }
 
   @POST
   @Path("/storage/{name}/update_access_token")
   @Consumes(MediaType.APPLICATION_JSON)
   @Produces(MediaType.APPLICATION_JSON)
+  @Deprecated
   public Response updateAccessToken(@PathParam("name") String name, OAuthTokenContainer tokens) {
-    try {
-      if (storage.getPlugin(name).getConfig() instanceof CredentialedStoragePluginConfig) {
-        DrillbitContext context = ((AbstractStoragePlugin) storage.getPlugin(name)).getContext();
-        OAuthTokenProvider tokenProvider = context.getoAuthTokenProvider();
-        PersistentTokenTable tokenTable = tokenProvider.getOauthTokenRegistry().getTokenTable(name);
-
-        // Set the access token
-        tokenTable.setAccessToken(tokens.getAccessToken());
-
-        return Response.status(Status.OK)
-          .entity("Access tokens have been updated.")
-          .build();
-      } else {
-        logger.error("{} is not a HTTP plugin. You can only add access tokens to HTTP plugins.", name);
-        return Response.status(Status.INTERNAL_SERVER_ERROR)
-          .entity(message("Unable to add tokens: %s", name))
-          .build();
-      }
-    } catch (PluginException e) {
-      logger.error("Error when adding tokens to {}", name);
-      return Response.status(Status.INTERNAL_SERVER_ERROR)
-        .entity(message("Unable to add tokens: %s", e.getMessage()))
-        .build();
-    }
+    // This endpoint is deprecated.  Use the same path in credentials resources instead.
+    return OAuthRequests.updateAccessToken(name, tokens, storage, authEnabled, sc);
   }
 
   @POST
   @Path("/storage/{name}/update_oauth_tokens")
   @Consumes(MediaType.APPLICATION_JSON)
   @Produces(MediaType.APPLICATION_JSON)
+  @Deprecated
   public Response updateOAuthTokens(@PathParam("name") String name,
                                     OAuthTokenContainer tokenContainer) {
-    try {
-      if (storage.getPlugin(name).getConfig() instanceof CredentialedStoragePluginConfig) {
-        DrillbitContext context = ((AbstractStoragePlugin) storage.getPlugin(name)).getContext();
-        OAuthTokenProvider tokenProvider = context.getoAuthTokenProvider();
-        PersistentTokenTable tokenTable = tokenProvider.getOauthTokenRegistry().getTokenTable(name);
-
-        // Set the access and refresh token
-        tokenTable.setAccessToken(tokenContainer.getAccessToken());
-        tokenTable.setRefreshToken(tokenContainer.getRefreshToken());
-
-        return Response.status(Status.OK)
-          .entity("Access tokens have been updated.")
-          .build();
-      } else {
-        logger.error("{} is not a HTTP plugin. You can only add access tokens to HTTP plugins.", name);
-        return Response.status(Status.INTERNAL_SERVER_ERROR)
-          .entity(message("Unable to add tokens: %s", name))
-          .build();
-      }
-    } catch (PluginException e) {
-      logger.error("Error when adding tokens to {}", name);
-      return Response.status(Status.INTERNAL_SERVER_ERROR)
-        .entity(message("Unable to add tokens: %s", e.getMessage()))
-        .build();
-    }
+    // This endpoint is deprecated.  Use the same path in credentials resources instead.
+    return OAuthRequests.updateOAuthTokens(name, tokenContainer, storage, authEnabled, sc);
   }
 
   @GET
   @Path("/storage/{name}/update_oauth2_authtoken")
   @Produces(MediaType.TEXT_HTML)
+  @Deprecated
   public Response updateAuthToken(@PathParam("name") String name, @QueryParam("code") String code) {
-    try {
-      if (storage.getPlugin(name).getConfig() instanceof CredentialedStoragePluginConfig) {
-        CredentialedStoragePluginConfig securedStoragePluginConfig = (CredentialedStoragePluginConfig) storage.getPlugin(name).getConfig();
-        CredentialsProvider credentialsProvider = securedStoragePluginConfig.getCredentialsProvider();
-        String callbackURL = this.request.getRequestURL().toString();
-
-        // Now exchange the authorization token for an access token
-        Builder builder = new OkHttpClient.Builder();
-        OkHttpClient client = builder.build();
-        Request accessTokenRequest = OAuthUtils.getAccessTokenRequest(credentialsProvider, code, callbackURL);
-        Map<String, String> updatedTokens = OAuthUtils.getOAuthTokens(client, accessTokenRequest);
-
-        // Add to token registry
-        TokenRegistry tokenRegistry = ((AbstractStoragePlugin) storage.getPlugin(name))
-          .getContext()
-          .getoAuthTokenProvider()
-          .getOauthTokenRegistry();
-
-        // Add a token registry table if none exists
-        tokenRegistry.createTokenTable(name);
-        PersistentTokenTable tokenTable = tokenRegistry.getTokenTable(name);
-
-        // Add tokens to persistent storage
-        tokenTable.setAccessToken(updatedTokens.get(OAuthTokenCredentials.ACCESS_TOKEN));
-        tokenTable.setRefreshToken(updatedTokens.get(OAuthTokenCredentials.REFRESH_TOKEN));
-
-        // Get success page
-        String successPage = null;
-        try (InputStream inputStream = Resource.newClassPathResource(OAUTH_SUCCESS_PAGE).getInputStream()) {
-          InputStreamReader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
-          BufferedReader bufferedReader = new BufferedReader(reader);
-          successPage = bufferedReader.lines()
-            .collect(Collectors.joining("\n"));
-          bufferedReader.close();
-          reader.close();
-        } catch (IOException e) {
-          Response.status(Status.OK).entity("You may close this window.").build();
-        }
-
-        return Response.status(Status.OK).entity(successPage).build();
-      } else {
-        logger.error("{} is not a HTTP plugin. You can only add auth code to HTTP plugins.", name);
-        return Response.status(Status.INTERNAL_SERVER_ERROR)
-          .entity(message("Unable to add authorization code: %s", name))
-          .build();
-      }
-    } catch (PluginException e) {
-      logger.error("Error when adding auth token to {}", name);
-      return Response.status(Status.INTERNAL_SERVER_ERROR)
-        .entity(message("Unable to add authorization code: %s", e.getMessage()))
-        .build();
-    }
+    // This endpoint is deprecated.  Use the same path in credentials resources instead.
+    return OAuthRequests.updateAuthToken(name, code, request, storage, authEnabled, sc);
   }
 
   /**
@@ -403,7 +273,7 @@ public class StorageResources {
       TokenRegistry tokenRegistry = ((AbstractStoragePlugin) storage.getPlugin(name))
         .getContext()
         .getoAuthTokenProvider()
-        .getOauthTokenRegistry();
+        .getOauthTokenRegistry(getActiveUser(storage.getPlugin(name).getConfig()));
 
       // Delete a token registry table if it exists
       tokenRegistry.deleteTokenTable(name);
@@ -539,8 +409,22 @@ public class StorageResources {
     return deletePlugin(name);
   }
 
+  /**
+   * This function checks to see if a given storage plugin is using USER_TRANSLATION mode.
+   * If so, it will return the active user name.  If not it will return null.
+   * @param config {@link StoragePluginConfig} The current plugin configuration
+   * @return If USER_TRANSLATION is enabled, returns the active user.  If not, returns null.
+   */
+  private String getActiveUser(StoragePluginConfig config) {
+    if (config.getAuthMode() == AuthMode.USER_TRANSLATION && authEnabled.get()) {
+      return sc.getUserPrincipal().getName();
+    } else {
+      return null;
+    }
+  }
+
   @XmlRootElement
-  public class JsonResult {
+  public static class JsonResult {
 
     private final String result;
 
diff --git a/exec/java-exec/src/main/resources/rest/credentials/list.ftl b/exec/java-exec/src/main/resources/rest/credentials/list.ftl
index b7f53ae1af..ef1601790e 100644
--- a/exec/java-exec/src/main/resources/rest/credentials/list.ftl
+++ b/exec/java-exec/src/main/resources/rest/credentials/list.ftl
@@ -48,10 +48,16 @@
                 ${pluginModel.getPlugin().getName()}
             </td>
             <td style="border:none;">
+                <#if pluginModel.getPlugin().isOauth()>
+                  <button type="button" class="btn btn-primary"
+                          id="getOauth" class="btn btn-success text-white"
+                          onclick="authorize('${pluginModel.getPlugin().getAuthorizationURIWithParams()!}')">Authorize</button>
+                <#else>
               <button type="button" class="btn btn-primary" data-toggle="modal" data-target="#new-plugin-modal" data-plugin="${pluginModel.getPlugin().getName()}"
                       data-username="${pluginModel.getUserName()}" data-password="${pluginModel.getPassword()}">
                 Update Credentials
               </button>
+                </#if>
             </td>
           </tr>
           </#if>
@@ -60,7 +66,6 @@
       </table>
     </div>
 
-      <#--onclick="doUpdate('${pluginModel.getPlugin().getName()}')"-->
       <#-- Modal window for creating plugin -->
     <div class="modal fade" id="new-plugin-modal" role="dialog" aria-labelledby="configuration">
       <div class="modal-dialog" role="document">
@@ -70,7 +75,6 @@
             <button type="button" class="close" data-dismiss="modal" aria-hidden="true">&times;</button>
           </div>
           <div class="modal-body">
-
             <form id="createForm" role="form" action="/credentials/update_credentials" method="POST">
               <input type="text" class="form-control" name="username" id="usernameField" placeholder="Username" />
               <input type="text" class="form-control" name="password" id="passwordField" placeholder="Password" />
@@ -105,6 +109,17 @@
         });
       });
 
+      function authorize(finalURL) {
+        console.log(finalURL);
+        var tokenGetterWindow = window.open(finalURL, 'Authorize Drill', "toolbar=no,menubar=no,scrollbars=yes,resizable=yes,top=500,left=500,width=450,height=600");
+        var timer = setInterval(function () {
+          if (tokenGetterWindow.closed) {
+            clearInterval(timer);
+            window.location.reload(); // Refresh the parent page
+          }
+        }, 1000);
+      }
+
       function doCreate() {
         $("#createForm").ajaxForm({
           dataType: 'json',
diff --git a/logical/src/main/java/org/apache/drill/common/logical/CredentialedStoragePluginConfig.java b/logical/src/main/java/org/apache/drill/common/logical/CredentialedStoragePluginConfig.java
index 9549f7b355..587413220f 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/CredentialedStoragePluginConfig.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/CredentialedStoragePluginConfig.java
@@ -17,6 +17,9 @@
  */
 package org.apache.drill.common.logical;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.slf4j.Logger;
@@ -27,6 +30,7 @@ public abstract class CredentialedStoragePluginConfig extends StoragePluginConfi
   private static final Logger logger = LoggerFactory.getLogger(CredentialedStoragePluginConfig.class);
   protected boolean directCredentials;
   protected final CredentialsProvider credentialsProvider;
+  protected OAuthConfig oAuthConfig;
 
   public CredentialedStoragePluginConfig() {
     this(PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER,  true);
@@ -48,14 +52,27 @@ public abstract class CredentialedStoragePluginConfig extends StoragePluginConfi
     this.credentialsProvider = credentialsProvider;
     this.directCredentials = directCredentials;
     this.authMode = authMode;
+    this.oAuthConfig = null;
+  }
+
+  public CredentialedStoragePluginConfig(
+    CredentialsProvider credentialsProvider,
+    boolean directCredentials,
+    AuthMode authMode,
+    OAuthConfig oAuthConfig
+  ) {
+    this.credentialsProvider = credentialsProvider;
+    this.directCredentials = directCredentials;
+    this.authMode = authMode;
+    this.oAuthConfig = oAuthConfig;
   }
 
   public abstract CredentialedStoragePluginConfig updateCredentialProvider(CredentialsProvider credentialsProvider);
 
-  @Override
-  public boolean isEnabled() {
-    logger.debug("Enabled status");
-    return super.isEnabled();
+  @JsonProperty("oAuthConfig")
+  @JsonInclude(Include.NON_NULL)
+  public OAuthConfig oAuthConfig() {
+    return oAuthConfig;
   }
 
   public CredentialsProvider getCredentialsProvider() {
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpOAuthConfig.java b/logical/src/main/java/org/apache/drill/common/logical/OAuthConfig.java
similarity index 74%
rename from contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpOAuthConfig.java
rename to logical/src/main/java/org/apache/drill/common/logical/OAuthConfig.java
index cdac404aac..db4ad41f74 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpOAuthConfig.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/OAuthConfig.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.drill.exec.store.http;
+package org.apache.drill.common.logical;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonInclude;
@@ -29,8 +29,8 @@ import java.util.Map;
 import java.util.Objects;
 
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
-@JsonDeserialize(builder = HttpOAuthConfig.HttpOAuthConfigBuilder.class)
-public class HttpOAuthConfig {
+@JsonDeserialize(builder = OAuthConfig.OAuthConfigBuilder.class)
+public class OAuthConfig {
 
   private final String callbackURL;
   private final String authorizationURL;
@@ -41,13 +41,13 @@ public class HttpOAuthConfig {
   private final boolean accessTokenInHeader;
 
   @JsonCreator
-  public HttpOAuthConfig(@JsonProperty("callbackURL") String callbackURL,
-                         @JsonProperty("authorizationURL") String authorizationURL,
-                         @JsonProperty("authorizationParams") Map<String, String> authorizationParams,
-                         @JsonProperty("tokenType") String tokenType,
-                         @JsonProperty("generateCSRFToken") boolean generateCSRFToken,
-                         @JsonProperty("scope") String scope,
-                         @JsonProperty("accessTokenInHeader") boolean accessTokenInHeader) {
+  public OAuthConfig(@JsonProperty("callbackURL") String callbackURL,
+                     @JsonProperty("authorizationURL") String authorizationURL,
+                     @JsonProperty("authorizationParams") Map<String, String> authorizationParams,
+                     @JsonProperty("tokenType") String tokenType,
+                     @JsonProperty("generateCSRFToken") boolean generateCSRFToken,
+                     @JsonProperty("scope") String scope,
+                     @JsonProperty("accessTokenInHeader") boolean accessTokenInHeader) {
     this.callbackURL = callbackURL;
     this.authorizationURL = authorizationURL;
     this.authorizationParams = authorizationParams;
@@ -57,7 +57,7 @@ public class HttpOAuthConfig {
     this.scope = scope;
   }
 
-  public HttpOAuthConfig(HttpOAuthConfig.HttpOAuthConfigBuilder builder) {
+  public OAuthConfig(OAuthConfig.OAuthConfigBuilder builder) {
     this.callbackURL = builder.callbackURL;
     this.authorizationURL = builder.authorizationURL;
     this.authorizationParams = builder.authorizationParams;
@@ -67,8 +67,8 @@ public class HttpOAuthConfig {
     this.scope = builder.scope;
   }
 
-  public static HttpOAuthConfigBuilder builder() {
-    return new HttpOAuthConfigBuilder();
+  public static OAuthConfigBuilder builder() {
+    return new OAuthConfigBuilder();
   }
 
   public String getCallbackURL() {
@@ -125,7 +125,7 @@ public class HttpOAuthConfig {
     } else if (that == null || getClass() != that.getClass()) {
       return false;
     }
-    HttpOAuthConfig thatConfig = (HttpOAuthConfig) that;
+    OAuthConfig thatConfig = (OAuthConfig) that;
     return Objects.equals(callbackURL, thatConfig.callbackURL) &&
       Objects.equals(authorizationURL, thatConfig.authorizationURL) &&
       Objects.equals(authorizationParams, thatConfig.authorizationParams) &&
@@ -136,7 +136,7 @@ public class HttpOAuthConfig {
   }
 
   @JsonPOJOBuilder(withPrefix = "")
-  public static class HttpOAuthConfigBuilder {
+  public static class OAuthConfigBuilder {
     private String callbackURL;
 
     private String authorizationURL;
@@ -153,44 +153,44 @@ public class HttpOAuthConfig {
 
     private Map<String, String> tokens;
 
-    HttpOAuthConfigBuilder() {
+    OAuthConfigBuilder() {
     }
 
-    public HttpOAuthConfig build() {
-      return new HttpOAuthConfig(this);
+    public OAuthConfig build() {
+      return new OAuthConfig(this);
     }
 
-    public HttpOAuthConfigBuilder callbackURL(String callbackURL) {
+    public OAuthConfigBuilder callbackURL(String callbackURL) {
       this.callbackURL = callbackURL;
       return this;
     }
 
-    public HttpOAuthConfigBuilder authorizationURL(String authorizationURL) {
+    public OAuthConfigBuilder authorizationURL(String authorizationURL) {
       this.authorizationURL = authorizationURL;
       return this;
     }
 
-    public HttpOAuthConfigBuilder authorizationParams(Map<String, String> authorizationParams) {
+    public OAuthConfigBuilder authorizationParams(Map<String, String> authorizationParams) {
       this.authorizationParams = authorizationParams;
       return this;
     }
 
-    public HttpOAuthConfigBuilder tokenType(String tokenType) {
+    public OAuthConfigBuilder tokenType(String tokenType) {
       this.tokenType = tokenType;
       return this;
     }
 
-    public HttpOAuthConfigBuilder generateCSRFToken(boolean generateCSRFToken) {
+    public OAuthConfigBuilder generateCSRFToken(boolean generateCSRFToken) {
       this.generateCSRFToken = generateCSRFToken;
       return this;
     }
 
-    public HttpOAuthConfigBuilder scope(String scope) {
+    public OAuthConfigBuilder scope(String scope) {
       this.scope = scope;
       return this;
     }
 
-    public HttpOAuthConfigBuilder accessTokenInHeader(boolean accessTokenInHeader) {
+    public OAuthConfigBuilder accessTokenInHeader(boolean accessTokenInHeader) {
       this.accessTokenInHeader = accessTokenInHeader;
       return this;
     }