You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/12/11 05:29:09 UTC

[drill] branch master updated: DRILL-8364: Add Support for OAuth Enabled File Systems (#2714)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b38a8ca7e DRILL-8364: Add Support for OAuth Enabled File Systems (#2714)
2b38a8ca7e is described below

commit 2b38a8ca7ebcacf2dad589be21f80a9260323103
Author: Charles S. Givre <cg...@apache.org>
AuthorDate: Sun Dec 11 00:29:02 2022 -0500

    DRILL-8364: Add Support for OAuth Enabled File Systems (#2714)
    
    DRILL-8364: Add Support for OAuth Enabled File Systems
---
 .../exec/store/iceberg/IcebergQueriesTest.java     |   2 +-
 .../store/http/oauth/AccessTokenRepository.java    |  12 +
 .../drill/exec/store/http/TestOAuthProcess.java    |   5 +
 .../src/test/resources/data/token_refresh.json     |   2 +-
 docs/dev/Box.md                                    |  84 ++++
 docs/dev/Dropbox.md                                |  67 ++-
 exec/java-exec/pom.xml                             |   7 +-
 .../drill/exec/server/rest/OAuthRequests.java      |   4 +
 .../exec/server/rest/OAuthTokenContainer.java      |   9 +-
 .../apache/drill/exec/store/dfs/BoxFileSystem.java | 459 +++++++++++++++++++++
 .../drill/exec/store/dfs/DrillFileSystem.java      |   8 +
 .../drill/exec/store/dfs/DropboxFileSystem.java    | 148 +++----
 .../drill/exec/store/dfs/FileSystemConfig.java     |  29 +-
 .../drill/exec/store/dfs/FileSystemPlugin.java     |  50 +++
 .../exec/store/dfs/FileSystemSchemaFactory.java    |  11 +-
 .../exec/store/dfs/OAuthEnabledFileSystem.java     | 115 ++++++
 .../dfs/SeekableByteArrayInputStream.java}         |  67 +--
 .../drill/exec/store/http/oauth/OAuthUtils.java    |   8 +
 .../drill/exec/vector/complex/fn/SeekableBAIS.java |  69 +++-
 .../main/resources/bootstrap-storage-plugins.json  |  56 +++
 .../exec/impersonation/BaseTestImpersonation.java  |   2 +-
 .../java/org/apache/drill/exec/sql/TestCTTAS.java  |   2 +-
 .../apache/drill/exec/store/BoxFileSystemTest.java | 149 +++++++
 .../drill/exec/store/DropboxFileSystemTest.java    |   3 +-
 .../drill/exec/store/TestPluginRegistry.java       |   4 +-
 .../drill/exec/util/StoragePluginTestUtils.java    |   4 +-
 .../java/org/apache/drill/test/ClusterFixture.java |   1 +
 exec/jdbc-all/pom.xml                              |   4 +
 28 files changed, 1259 insertions(+), 122 deletions(-)

diff --git a/contrib/format-iceberg/src/test/java/org/apache/drill/exec/store/iceberg/IcebergQueriesTest.java b/contrib/format-iceberg/src/test/java/org/apache/drill/exec/store/iceberg/IcebergQueriesTest.java
index 93c8ea5a54..afe17650fd 100644
--- a/contrib/format-iceberg/src/test/java/org/apache/drill/exec/store/iceberg/IcebergQueriesTest.java
+++ b/contrib/format-iceberg/src/test/java/org/apache/drill/exec/store/iceberg/IcebergQueriesTest.java
@@ -92,7 +92,7 @@ public class IcebergQueriesTest extends ClusterTest {
       pluginConfig.getConnection(),
       pluginConfig.getConfig(),
       pluginConfig.getWorkspaces(),
-      formats,
+      formats, null,
       PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     newPluginConfig.setEnabled(pluginConfig.isEnabled());
     pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig);
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/oauth/AccessTokenRepository.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/oauth/AccessTokenRepository.java
index 4886f8e327..2832a8d28e 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/oauth/AccessTokenRepository.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/oauth/AccessTokenRepository.java
@@ -48,6 +48,7 @@ public class AccessTokenRepository {
   private PersistentTokenTable tokenTable;
   private String accessToken;
   private String refreshToken;
+  private String expiresIn;
 
   public AccessTokenRepository(HttpProxyConfig proxyConfig,
                                HttpStoragePluginConfig pluginConfig,
@@ -58,6 +59,7 @@ public class AccessTokenRepository {
     this.credentialsProvider = pluginConfig.getCredentialsProvider();
     accessToken = tokenTable.getAccessToken();
     refreshToken = tokenTable.getRefreshToken();
+    expiresIn = tokenTable.getExpiresIn();
 
     this.credentials = new OAuthTokenCredentials.Builder()
       .setCredentialsProvider(credentialsProvider)
@@ -89,6 +91,10 @@ public class AccessTokenRepository {
     return accessToken;
   }
 
+  public String getExpiresIn() {
+    return this.expiresIn;
+  }
+
   /**
    * Refreshes the access token using the code and other information from the HTTP OAuthConfig.
    * This executes a POST request.  This method will throw exceptions if any of the required fields
@@ -124,6 +130,12 @@ public class AccessTokenRepository {
       refreshToken = updatedTokens.get(OAuthTokenCredentials.REFRESH_TOKEN);
     }
 
+    // If we get a new expires in value, update that also
+    if (updatedTokens.containsKey(OAuthTokenCredentials.EXPIRES_IN)) {
+      tokenTable.setExpiresIn(updatedTokens.get(OAuthTokenCredentials.EXPIRES_IN));
+      expiresIn = updatedTokens.get(OAuthTokenCredentials.EXPIRES_IN);
+    }
+
     if (updatedTokens.containsKey("accessToken")) {
       accessToken = updatedTokens.get("accessToken");
     }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
index 695c60f5cb..57f2e8125b 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
@@ -137,6 +137,7 @@ public class TestOAuthProcess extends ClusterTest {
 
       assertEquals("you_have_access", tokenTable.getAccessToken());
       assertEquals("refresh_me", tokenTable.getRefreshToken());
+      assertEquals("3600", tokenTable.getExpiresIn());
 
     } catch (Exception e) {
       logger.error(e.getMessage());
@@ -163,6 +164,8 @@ public class TestOAuthProcess extends ClusterTest {
 
       assertEquals("you_have_access", tokenTable.getAccessToken());
       assertEquals("refresh_me", tokenTable.getRefreshToken());
+      assertEquals("3600", tokenTable.getExpiresIn());
+
       // Now execute a query and get query results.
       server.enqueue(new MockResponse()
         .setResponseCode(200)
@@ -206,6 +209,7 @@ public class TestOAuthProcess extends ClusterTest {
 
       assertEquals("you_have_access", tokenTable.getAccessToken());
       assertEquals("refresh_me", tokenTable.getRefreshToken());
+      assertEquals("3600", tokenTable.getExpiresIn());
 
       // Now execute a query and get a refresh token
       // The API should return a 401 error.  This should trigger Drill to automatically
@@ -223,6 +227,7 @@ public class TestOAuthProcess extends ClusterTest {
       // Verify that the access and refresh tokens were saved
       assertEquals("token 2.0", tokenTable.getAccessToken());
       assertEquals("refresh 2.0", tokenTable.getRefreshToken());
+      assertEquals("3800", tokenTable.getExpiresIn());
 
       TupleMetadata expectedSchema = new SchemaBuilder()
         .add("col_1", MinorType.FLOAT8, DataMode.OPTIONAL)
diff --git a/contrib/storage-http/src/test/resources/data/token_refresh.json b/contrib/storage-http/src/test/resources/data/token_refresh.json
index 78bb84d48f..f6467a4b5a 100644
--- a/contrib/storage-http/src/test/resources/data/token_refresh.json
+++ b/contrib/storage-http/src/test/resources/data/token_refresh.json
@@ -2,5 +2,5 @@
   "access_token": "token 2.0",
   "refresh_token": "refresh 2.0",
   "token_type": "Bearer",
-  "expires": 3600
+  "expires_in": 3800
 }
diff --git a/docs/dev/Box.md b/docs/dev/Box.md
new file mode 100644
index 0000000000..5c1023d2ae
--- /dev/null
+++ b/docs/dev/Box.md
@@ -0,0 +1,84 @@
+# Box and Drill
+As of Drill 2.0 it is possible to connect Drill to a Box account and query files stored there.  Clearly, the performance will be much better if the files are stored  locally, however, if your data is located in box, Drill makes it easy to explore that data.
+
+## Setting up your Box Account
+Box uses OAuth 2.0 for authorization and authentication. In order to connect Drill to Box.com, you will first need to obtain API Keys from Box.
+
+You can follow the instructions here: https://developer.box.com/guides/authentication/oauth2/oauth2-setup/ to obtain the necessary tokens.
+
+Once you have obtained the client tokens, the next step is to configure Drill to connect to Box.
+
+## Configuring Drill to Connect to Box
+Connecting Drill with Box is basically the same as any other file system.  The default configuration below doesn't list the formats, but you can use this as as a template.
+
+The only fields that will need to be populated are the `clientID`, `clientSecret` and `callbackURL`.
+
+You may also specify the connection and read timeouts as shown below.  The times are in milliseconds and will default to 5 seconds.
+
+
+```json
+{
+  "type": "file",
+  "connection": "box:///",
+  "config": {
+    "boxConnectionTimeout": 5000,
+    "boxReadTimeout": 5000
+  },
+  "workspaces": {
+    "root": {
+      "location": "/",
+      "writable": false,
+      "defaultInputFormat": null,
+      "allowAccessOutsideWorkspace": false
+    }
+  },
+  "formats": {
+    ...
+  },
+  "oAuthConfig": {
+    "callbackURL": "http://localhost:8047/credentials/<your plugin name>/update_oauth2_authtoken",
+    "authorizationURL": "https://account.box.com/api/oauth2/authorize",
+    "authorizationParams": {
+      "response_type": "code"
+    }
+  },
+  "authMode": "SHARED_USER",
+  "credentialsProvider": {
+    "credentialsProviderType": "PlainCredentialsProvider",
+    "credentials": {
+      "clientID": "<Your client ID here>",
+      "clientSecret": "<Your client secret here>",
+      "tokenURI": "https://api.box.com/oauth2/token"
+    },
+    "userCredentials": {}
+  },
+  "enabled": true
+}
+
+```
+
+## User Impersonation / User Translation Support
+When using OAuth 2.0 Box supports user translation.  Simply set the `authMode` to `USER_TRANSLATION`.
+
+## Testing
+Box's OAuth tokens are very short-lived and make testing much more difficult. The unit tests therefore use a Box developer token.  These tokens are only valid for one hour.  They should only be used for testing.  You can obtain a developer token in the same page as the `clientID` and `clientSecret`.
+
+If you wish to run unit tests using a developer token, use the following configuration:
+
+```json
+"type": "file",
+  "connection": "box:///",
+  "config": {
+    "boxAccessToken": "<your access token here>"
+  },
+  "workspaces": {
+    "root": {
+      "location": "/",
+      "writable": false,
+      "defaultInputFormat": null,
+      "allowAccessOutsideWorkspace": false
+    }
+  }
+}
+```
+
diff --git a/docs/dev/Dropbox.md b/docs/dev/Dropbox.md
index 20f2c950a9..c95d8cd603 100644
--- a/docs/dev/Dropbox.md
+++ b/docs/dev/Dropbox.md
@@ -1,19 +1,19 @@
 #Dropbox and Drill
-As of Drill 1.20.0 it is possible to connect Drill to a Dropbox account and query files stored there.  Clearly, the performance will be much better if the files are stored 
+As of Drill 1.20.0 it is possible to connect Drill to a Dropbox account and query files stored there.  Clearly, the performance will be much better if the files are stored
 locally, however, if your data is located in dropbox Drill makes it easy to explore that data.
 
 ## Creating an API Token
 The first step to enabling Drill to query Dropbox is creating an API token.
 1. Navigate to https://www.dropbox.com/developers/apps/create
-2. Choose `Scoped Access` under Choose an API. 
+2. Choose `Scoped Access` under Choose an API.
 3. Depending on the access limitations you are looking for select either full or limited to a particular folder.
-4. In the permissions tab, make sure all the permissions associated with reading data are enabled.  
+4. In the permissions tab, make sure all the permissions associated with reading data are enabled.
 
-Once you've done that, and hit submit, you'll see a section in your newly created Dropbox App called `Generated Access Token`.  Copy the value here and that is what you will 
+Once you've done that, and hit submit, you'll see a section in your newly created Dropbox App called `Generated Access Token`.  Copy the value here and that is what you will
 use in your Drill configuration.
 
 ## Configuring Drill
-Once you've created a Dropbox access token, you are now ready to configure Drill to query Dropbox.  To create a dropbox connection, in Drill's UI, navigate to the Storage tab, 
+Once you've created a Dropbox access token, you are now ready to configure Drill to query Dropbox.  To create a dropbox connection, in Drill's UI, navigate to the Storage tab,
 click on `Create New Storage Plugin` and add the items below:
 
 ```json
@@ -32,13 +32,60 @@ click on `Create New Storage Plugin` and add the items below:
   }
 }
 ```
-Paste your access token in the appropriate field and at that point you should be able to query Dropbox.  Drill treats Dropbox as any other file system, so all the instructions 
+Paste your access token in the appropriate field and at that point you should be able to query Dropbox.  Drill treats Dropbox as any other file system, so all the instructions
 here (https://drill.apache.org/docs/file-system-storage-plugin/) and here (https://drill.apache.org/docs/workspaces/)
 about configuring a workspace, and adding format plugins are exactly the same as any other on Drill.
 
+## OAuth 2.0 Support
+The connection to Dropbox also supports using OAuth 2.0 for authorization.  See below for a sample configuration using OAuth.  OAuth should be used for production systems.
+
+```json
+{
+  "type": "file",
+  "connection": "dropbox:///",
+  "workspaces": {
+    "csv": {
+      "location": "/csv",
+      "writable": false,
+      "defaultInputFormat": null,
+      "allowAccessOutsideWorkspace": false
+    },
+    "root": {
+      "location": "/",
+      "writable": false,
+      "defaultInputFormat": null,
+      "allowAccessOutsideWorkspace": false
+    }
+  },
+  "formats": {},
+  "oAuthConfig": {
+    "callbackURL": "http://localhost:8047/credentials/<plugin name>/update_oauth2_authtoken",
+    "authorizationURL": "https:/www.dropbox.com/oauth2/authorize",
+    "authorizationParams": {
+      "response_type": "code",
+      "token_access_type": "offline"
+    }
+  },
+  "authMode": "SHARED_USER",
+  "credentialsProvider": {
+    "credentialsProviderType": "PlainCredentialsProvider",
+    "credentials": {
+      "clientID": "<your client id>",
+      "clientSecret": "<your client secret>",
+      "tokenURI": "https://www.dropbox.com/oauth2/token"
+    },
+    "userCredentials": {}
+  },
+  "enabled": true
+}
+```
+
+## User Impersonation / User Translation Support
+When using OAuth 2.0 Dropbox supports user translation.  Simply set the `authMode` to `USER_TRANSLATION`.
+
 ### Securing Dropbox Credentials
-As with any other storage plugin, you have a few options as to how to store the credentials. See [Drill Credentials Provider](./PluginCredentialsProvider.md) for more 
-information about how you can store your credentials securely in Drill. 
+As with any other storage plugin, you have a few options as to how to store the credentials. See [Drill Credentials Provider](./PluginCredentialsProvider.md) for more
+information about how you can store your credentials securely in Drill.
 
 ## Running the Unit Tests
 Unfortunately, in order to run the unit tests, it is necessary to have an external API token.  Therefore, the unit tests have to be run manually.  To run the unit tests:
@@ -55,5 +102,5 @@ folder.  Simply copy these files in the structure there into your dropbox accoun
 
 ## Limitations
 1. It is not possible to save files to Dropbox from Drill, thus CTAS queries will fail.
-2. Dropbox does not expose directory metadata, so it is not possible to obtain the directory size, modification date or access dates. 
-3. Dropbox does not maintain the last access date as distinct from the modification date of files. 
+2. Dropbox does not expose directory metadata, so it is not possible to obtain the directory size, modification date or access dates.
+3. Dropbox does not maintain the last access date as distinct from the modification date of files.
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index dd5299a937..443bb07c5a 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -101,7 +101,12 @@
     <dependency>
       <groupId>com.dropbox.core</groupId>
       <artifactId>dropbox-core-sdk</artifactId>
-      <version>4.0.1</version>
+      <version>5.4.4</version>
+    </dependency>
+    <dependency>
+      <groupId>com.box</groupId>
+      <artifactId>box-java-sdk</artifactId>
+      <version>3.7.1</version>
     </dependency>
     <dependency>
       <groupId>org.apache.drill.contrib.data</groupId>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/OAuthRequests.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/OAuthRequests.java
index b4aaf7b021..0ffddea293 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/OAuthRequests.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/OAuthRequests.java
@@ -115,6 +115,7 @@ public class OAuthRequests {
       // Set the access and refresh token
       tokenTable.setAccessToken(tokenContainer.getAccessToken());
       tokenTable.setRefreshToken(tokenContainer.getRefreshToken());
+      tokenTable.setExpiresIn(tokenContainer.getExpiresIn());
 
       return Response.status(Status.OK)
         .entity("Access tokens have been updated.")
@@ -155,6 +156,9 @@ public class OAuthRequests {
       // Add tokens to persistent storage
       tokenTable.setAccessToken(updatedTokens.get(OAuthTokenCredentials.ACCESS_TOKEN));
       tokenTable.setRefreshToken(updatedTokens.get(OAuthTokenCredentials.REFRESH_TOKEN));
+      if (updatedTokens.containsKey(OAuthTokenCredentials.EXPIRES_IN)) {
+        tokenTable.setExpiresIn(updatedTokens.get(OAuthTokenCredentials.EXPIRES_IN));
+      }
 
       // Get success page
       String successPage = null;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/OAuthTokenContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/OAuthTokenContainer.java
index 1b7523086f..678cb79fd8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/OAuthTokenContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/OAuthTokenContainer.java
@@ -27,12 +27,15 @@ import javax.xml.bind.annotation.XmlRootElement;
 public class OAuthTokenContainer {
   private final String accessToken;
   private final String refreshToken;
+  private final String expiresIn;
 
   @JsonCreator
   public OAuthTokenContainer(@JsonProperty("accessToken") String accessToken,
-                             @JsonProperty("refreshToken") String refreshToken) {
+                             @JsonProperty("refreshToken") String refreshToken,
+                             @JsonProperty("expiresIn") String expiresIn) {
     this.accessToken = accessToken;
     this.refreshToken = refreshToken;
+    this.expiresIn = expiresIn;
   }
 
   public String getAccessToken() {
@@ -42,4 +45,8 @@ public class OAuthTokenContainer {
   public String getRefreshToken() {
     return refreshToken;
   }
+
+  public String getExpiresIn() {
+    return expiresIn;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BoxFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BoxFileSystem.java
new file mode 100644
index 0000000000..8970cf5d8d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BoxFileSystem.java
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.dfs;
+
+import com.box.sdk.BoxAPIConnection;
+import com.box.sdk.BoxFile;
+import com.box.sdk.BoxFolder;
+import com.box.sdk.BoxFolder.Info;
+import com.box.sdk.BoxItem;
+import com.box.sdk.BoxSearch;
+import com.box.sdk.BoxSearchParameters;
+import com.box.sdk.PartialCollection;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.oauth.PersistentTokenTable;
+import org.apache.drill.exec.store.security.oauth.OAuthTokenCredentials;
+import org.apache.drill.exec.store.security.oauth.OAuthTokenCredentials.Builder;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class BoxFileSystem extends OAuthEnabledFileSystem {
+
+  private static final Logger logger = LoggerFactory.getLogger(BoxFileSystem.class);
+  private static final String TIMEOUT_DEFAULT = "5000";
+  private static final List<String> SEARCH_CONTENT_TYPES = new ArrayList<>(Collections.singletonList("name"));
+  private Path workingDirectory;
+  private BoxAPIConnection client;
+  private String workingDirectoryID;
+  private BoxFolder rootFolder;
+  private boolean usesDeveloperToken;
+  private final List<String> ancestorFolderIDs = new ArrayList<>();
+  private final Map<Path, BoxItem> itemCache = new HashMap<>();
+
+  /**
+   * Returns a URI which identifies this FileSystem.
+   *
+   * @return the URI of this filesystem.
+   */
+  @Override
+  public URI getUri() {
+    try {
+      return new URI("box:///");
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Opens an FSDataInputStream at the indicated Path.
+   *
+   * @param inputPath the file name to open
+   * @param bufferSize the size of the buffer to be used.
+   * @throws IOException IO failure
+   */
+  @Override
+  public FSDataInputStream open(Path inputPath, int bufferSize) throws IOException {
+    client = getClient();
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+    BoxItem item = getItem(inputPath);
+    if (item instanceof BoxFile) {
+      BoxFile file = (BoxFile) item;
+      updateTokens();
+
+      file.download(out);
+      updateTokens();
+
+      FSDataInputStream fsDataInputStream = new FSDataInputStream(new SeekableByteArrayInputStream(out.toByteArray()));
+      out.close();
+      return fsDataInputStream;
+    } else {
+      throw new IOException("Attempted to read " + inputPath + " which is not a file.  Only files can be read by Box.");
+    }
+  }
+
+  /**
+   * Create an FSDataOutputStream at the indicated Path with write-progress
+   * reporting.
+   *
+   * @param f           the file name to open
+   * @param permission  file permission
+   * @param overwrite   if a file with this name already exists, then if true,
+   *                    the file will be overwritten, and if false an error will be thrown.
+   * @param bufferSize  the size of the buffer to be used.
+   * @param replication required block replication for the file.
+   * @param blockSize   block size
+   * @param progress    the progress reporter
+   * @throws IOException IO failure
+   * @see #setPermission(Path, FsPermission)
+   */
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
+    int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
+    throw new UnsupportedOperationException("Box is read only.");
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+    throw new UnsupportedOperationException("Box does not support append.");
+  }
+
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    throw new UnsupportedOperationException("Box does not support rename.");
+  }
+
+  @Override
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    return false;
+  }
+
+  /**
+   * List the statuses of the files/directories in the given path if the path is
+   * a directory.
+   * <p>
+   * Does not guarantee to return the List of files/directories status in a
+   * sorted order.
+   * <p>
+   * Will not return null. Expect IOException upon access error.
+   *
+   * @param f given path
+   * @return the statuses of the files/directories in the given patch
+   * @throws FileNotFoundException when the path does not exist
+   * @throws IOException           see specific implementation
+   */
+  @Override
+  public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
+    BoxItem incoming = getItem(f);
+
+    // If the incoming item is a folder, find all the items in it,
+    // and iterate over them.  This is not recursive, so it will only
+    // iterate over the current folder.
+    if (incoming instanceof BoxFolder) {
+      BoxFolder folder = (BoxFolder) incoming;
+
+      // Iterate over the children
+      int itemCount = 0;
+      FileStatus status;
+      List<FileStatus> fileStatusList = new ArrayList<>();
+
+      for (BoxItem.Info childInfo : folder.getChildren()) {
+        Path newPath = new Path(f + childInfo.getName());
+        if (childInfo instanceof BoxFolder.Info) {
+          status = new FileStatus(childInfo.getSize(), true, 1,0,
+            getModifiedMillis(childInfo), newPath);
+          fileStatusList.add(status);
+          itemCount++;
+        } else if (childInfo instanceof BoxFile.Info) {
+          status = new FileStatus(childInfo.getSize(), false, 1,0,
+            getModifiedMillis(childInfo), newPath);
+          fileStatusList.add(status);
+          itemCount++;
+        }
+      }
+      return fileStatusList.toArray(new FileStatus[itemCount]);
+    } else if (incoming instanceof BoxFile) {
+      // If the incoming object is a file, return the file info
+      BoxFile infile = (BoxFile)incoming;
+
+      FileStatus[] results = new FileStatus[1];
+      results[0] = new FileStatus(infile.getInfo().getSize(), false, 1, 0,
+        getModifiedMillis(infile), f);
+      return results;
+    }
+    // Ignore web links
+    return new FileStatus[0];
+  }
+
+  /**
+   * Set the current working directory for the given FileSystem. All relative
+   * paths will be resolved relative to it.
+   *
+   * @param new_dir Path of new working directory
+   */
+  @Override
+  public void setWorkingDirectory(Path new_dir) {
+    logger.debug("Setting working directory to: " + new_dir.getName());
+    workingDirectory = new_dir;
+
+    ancestorFolderIDs.clear();
+
+    // Set the working directory id.
+    if (StringUtils.isEmpty(workingDirectoryID) || new_dir.toString().contentEquals("/")) {
+      workingDirectoryID = "0";
+      ancestorFolderIDs.add("0");
+    } else {
+      // Split the path by the slash
+      List<String> pathParts = new ArrayList<>(Arrays.asList(new_dir.toString().split("/")));
+      for (String pathPart : pathParts) {
+        BoxSearch search = new BoxSearch(client);
+        updateTokens();
+
+        BoxSearchParameters searchParams = new BoxSearchParameters();
+        searchParams.setQuery(pathPart);
+        searchParams.setContentTypes(SEARCH_CONTENT_TYPES);
+        searchParams.setType("folder");
+        searchParams.setAncestorFolderIds(ancestorFolderIDs);
+
+        PartialCollection<BoxItem.Info> searchResults = search.searchRange(1, 3, searchParams);
+        updateTokens();
+
+        // Iterate over search results
+        for (BoxItem.Info result : searchResults) {
+          // Get the ID of the folder and add it to the ancestorFolderId list
+          String id = result.getID();
+          ancestorFolderIDs.add(id);
+          break;
+        }
+      }
+    }
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    // If the working directory is empty, assume it is the root directory and set both
+    // the id and working directory to root.
+    if (StringUtils.isEmpty(workingDirectoryID)) {
+      workingDirectory = new Path("/");
+      workingDirectoryID = "0";
+    }
+    return workingDirectory;
+  }
+
+
+  @Override
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    return false;
+  }
+
+  /**
+   * Return a file status object that represents the path.
+   *
+   * @param f The path we want information from
+   * @return a FileStatus object
+   * @throws FileNotFoundException when the path does not exist
+   * @throws IOException           see specific implementation
+   */
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+    this.client = getClient();
+
+    BoxItem pathItem = getItem(f);
+
+    if (pathItem instanceof BoxFolder) {
+      BoxFolder folder = (BoxFolder) pathItem;
+      updateTokens();
+
+      // Folders may have a modified date of zero.
+      Info folderInfo = folder.getInfo();
+      long size = folderInfo.getSize();
+
+      return new FileStatus(size, true, 1,0, getModifiedMillis(pathItem), f);
+    } else if (pathItem instanceof BoxFile) {
+      BoxFile boxFile = (BoxFile) pathItem;
+      BoxFile.Info fileInfo = boxFile.getInfo();
+      long fileSize = fileInfo.getSize();
+
+      return new FileStatus(fileSize, false, 1,0,
+        getModifiedMillis(pathItem), f);
+    } else {
+      // The only other option here would be a BoxWebLink and Drill can't do anything with that.
+      return new FileStatus();
+    }
+  }
+
+  private BoxItem getItem(Path path) {
+    if (itemCache.containsKey(path)) {
+      return itemCache.get(path);
+    }
+
+    // Make sure the client is initialized
+    this.client = getClient();
+
+    // Check to see if it is the root
+    if (path.isRoot()) {
+      if (rootFolder == null) {
+        rootFolder = BoxFolder.getRootFolder(client);
+        updateTokens();
+        itemCache.put(path, rootFolder);
+      }
+      return rootFolder;
+    }
+
+    // Next check to see if the item ends in a file extension.
+    // In Box, you can only access items via their ID, so rather than making numerous API
+    // calls to traverse a directory path, we will use Box's search API to find candidates.
+    // If there is one candidate, return that.  If not, we will use the item's parents to make
+    // sure it is the correct item.
+    long offsetValue = 0;
+    long limitValue = 100;
+
+    BoxSearch search = new BoxSearch(client);
+    updateTokens();
+    BoxSearchParameters searchParams = new BoxSearchParameters();
+    searchParams.setQuery(path.getName());
+    searchParams.setContentTypes(SEARCH_CONTENT_TYPES);
+
+    // Get the file extension and use it as a search parameter
+    String fileExtension = FilenameUtils.getExtension(path.getName());
+
+    // If there is no extension, assume the path is a folder
+    if (StringUtils.isEmpty(fileExtension)) {
+      searchParams.setType("folder");
+    } else {
+      searchParams.setType("file");
+      searchParams.setFileExtensions(Collections.singletonList(fileExtension));
+    }
+
+    if (ancestorFolderIDs.size() > 0) {
+      searchParams.setAncestorFolderIds(ancestorFolderIDs);
+    }
+
+    PartialCollection<BoxItem.Info> searchResults =
+      search.searchRange(offsetValue, limitValue, searchParams);
+    updateTokens();
+
+    // Assuming that the first result is the one we want, return the item for that.
+    for (BoxItem.Info resultInfo : searchResults) {
+      String id = resultInfo.getID();
+
+      if (resultInfo.getType().contentEquals("file")) {
+        BoxFile file = new BoxFile(client, id);
+        updateTokens();
+        itemCache.put(path, file);
+        return file;
+      } else if (resultInfo.getType().contentEquals("folder")) {
+        BoxFolder folder = new BoxFolder(client, id);
+        updateTokens();
+        itemCache.put(path, folder);
+        return folder;
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Updates the OAuth tokens.  Box API tokens seem to be very short-lived and change after every
+   * API call. Be sure to call this method after any call that actually calls the Box API.
+   * <p>
+   * If the Box API client uses a developer token, the function will simply return.
+   */
+  private void updateTokens() {
+    if (client == null || usesDeveloperToken) {
+      return;
+    } else if (client.canRefresh() && client.needsRefresh()) {
+      if (!client.canRefresh()) {
+        throw UserException.connectionError()
+          .message("Box file system missing refresh token. Please reauthenticate to obtain a refresh token.")
+          .build(logger);
+      }
+      super.updateTokens(client.getAccessToken(), client.getRefreshToken(), String.valueOf(client.getExpires()));
+    }
+  }
+
+
+  /**
+   * The Box client can use either OAuth 2.0 or a static developer token. The OAuth tokens are very
+   * short lived and hence are difficult to use for testing.
+   * @return An authenticated {@link BoxAPIConnection} Box client
+   */
+  private BoxAPIConnection getClient() {
+    if (this.client != null) {
+      return client;
+    }
+
+    // Get timeout values from configuration.
+    int connectionTimeout = Integer.parseInt(this.getConf().get("boxConnectionTimeout", TIMEOUT_DEFAULT));
+    int readTimeout = Integer.parseInt(this.getConf().get("boxReadTimeout", TIMEOUT_DEFAULT));
+
+    // If the developer token is populated, use this rather than the OAuth tokens to create the client
+    // This should only be used for testing.
+    String developerToken = this.getConf().get("boxAccessToken", "");
+    if (StringUtils.isNotEmpty(developerToken)) {
+      BoxAPIConnection client = new BoxAPIConnection(developerToken);
+      client.setConnectTimeout(connectionTimeout);
+      client.setReadTimeout(readTimeout);
+      this.usesDeveloperToken = true;
+      return client;
+    }
+
+    CredentialsProvider credentialsProvider = getCredentialsProvider();
+    PersistentTokenTable tokenTable = getTokenTable();
+    OAuthTokenCredentials credentials = new Builder()
+      .setCredentialsProvider(credentialsProvider)
+        .setTokenTable(tokenTable)
+        .build()
+        .get();
+
+    BoxAPIConnection newClient = new BoxAPIConnection(credentials.getClientID(), credentials.getClientSecret(),
+      tokenTable.getAccessToken(), tokenTable.getRefreshToken());
+    newClient.setConnectTimeout(connectionTimeout);
+    newClient.setReadTimeout(readTimeout);
+    return newClient;
+  }
+
+  /**
+   * Returns the milliseconds of the modified time. In some cases, this can be null, so these functions
+   * avoid having multiple null checks all over the place.
+   * @param item {@link BoxItem} of the input
+   * @return The milliseconds of the modified time.
+   */
+  private long getModifiedMillis(BoxItem item) {
+    return getModifiedMillis(item.getInfo());
+  }
+
+  /**
+   * Returns the milliseconds of the modified time. In some cases, this can be null, so these functions
+   * avoid having multiple null checks all over the place.
+   * @param info {@link BoxItem.Info} of the input
+   * @return The milliseconds of the modified time.
+   */
+  private long getModifiedMillis(BoxItem.Info info ) {
+    Date modifiedDate = info.getModifiedAt();
+    if (modifiedDate == null) {
+      return 0;
+    } else {
+      return modifiedDate.getTime();
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index 0da534b4b7..d0c049d787 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -125,6 +125,14 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
     return new Configuration(this.underlyingFs.getConf());
   }
 
+  /**
+   * Returns a copy of the underlying file system.  This class implements {@link FileSystem}
+   * @return The underlying {@link FileSystem}
+   */
+  public FileSystem getUnderlyingFs() {
+    return underlyingFs;
+  }
+
   /**
    * If OperatorStats are provided return a instrumented {@link org.apache.hadoop.fs.FSDataInputStream}.
    */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DropboxFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DropboxFileSystem.java
index 9e3fdb9244..6d4a39df03 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DropboxFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DropboxFileSystem.java
@@ -18,26 +18,30 @@
 
 package org.apache.drill.exec.store.dfs;
 
+import com.dropbox.core.DbxDownloader;
 import com.dropbox.core.DbxException;
 import com.dropbox.core.DbxRequestConfig;
+import com.dropbox.core.oauth.DbxCredential;
 import com.dropbox.core.v2.DbxClientV2;
 import com.dropbox.core.v2.files.FileMetadata;
 import com.dropbox.core.v2.files.FolderMetadata;
 import com.dropbox.core.v2.files.ListFolderResult;
 import com.dropbox.core.v2.files.Metadata;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.oauth.PersistentTokenTable;
+import org.apache.drill.exec.store.security.oauth.OAuthTokenCredentials;
+import org.apache.drill.exec.vector.complex.fn.SeekableBAIS;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Progressable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.URI;
@@ -47,14 +51,18 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class DropboxFileSystem extends FileSystem {
+public class DropboxFileSystem extends OAuthEnabledFileSystem {
   private static final Logger logger = LoggerFactory.getLogger(DropboxFileSystem.class);
 
   private static final String ERROR_MSG = "Dropbox is read only.";
+  private static final String APP_IDENTIFIER = "Apache/Drill";
   private Path workingDirectory;
   private DbxClientV2 client;
+  private DbxCredential dbxCredential;
+  private DbxRequestConfig config;
   private FileStatus[] fileStatuses;
   private final Map<String,FileStatus> fileStatusCache = new HashMap<>();
+  private boolean usesDeveloperToken;
 
   @Override
   public URI getUri() {
@@ -71,9 +79,10 @@ public class DropboxFileSystem extends FileSystem {
     String filename = getFileName(path);
     client = getClient();
     ByteArrayOutputStream out = new ByteArrayOutputStream();
-    try {
-      client.files().download(filename).download(out);
-      fsDataInputStream = new FSDataInputStream(new SeekableByteArrayInputStream(out.toByteArray()));
+    try (DbxDownloader<FileMetadata> downloader = client.files().download(filename)) {
+      downloader.download(out);
+      updateTokens();
+      fsDataInputStream = new FSDataInputStream(new SeekableBAIS(out.toByteArray()));
     } catch (DbxException e) {
       throw new IOException(e.getMessage());
     }
@@ -113,7 +122,15 @@ public class DropboxFileSystem extends FileSystem {
 
     // Get files and folder metadata from Dropbox root directory
     try {
-      ListFolderResult result = client.files().listFolder("");
+      String pathString;
+      if (path.isRoot()) {
+        pathString = "";
+      } else {
+        pathString = path.toString().replace("dropbox:", "");
+      }
+
+      ListFolderResult result = client.files().listFolder(pathString);
+      updateTokens();
       while (true) {
         for (Metadata metadata : result.getEntries()) {
           fileStatusList.add(getFileInformation(metadata));
@@ -122,6 +139,7 @@ public class DropboxFileSystem extends FileSystem {
           break;
         }
         result = client.files().listFolderContinue(result.getCursor());
+        updateTokens();
       }
     } catch (DbxException e) {
       throw new IOException(e.getMessage());
@@ -164,6 +182,7 @@ public class DropboxFileSystem extends FileSystem {
     client = getClient();
     try {
       Metadata metadata = client.files().getMetadata(filePath);
+      updateTokens();
       return getFileInformation(metadata);
     } catch (Exception e) {
       throw new IOException("Error accessing file " + filePath + "\n" + e.getMessage());
@@ -195,86 +214,67 @@ public class DropboxFileSystem extends FileSystem {
     }
 
     // read preferred client identifier from config or use "Apache/Drill"
-    String clientIdentifier = this.getConf().get("clientIdentifier", "Apache/Drill");
+    String clientIdentifier = this.getConf().get("clientIdentifier", APP_IDENTIFIER);
     logger.info("Creating dropbox client with client identifier: {}", clientIdentifier);
-    DbxRequestConfig config = DbxRequestConfig.newBuilder(clientIdentifier).build();
+
+    config = DbxRequestConfig.newBuilder(clientIdentifier)
+      .withAutoRetryEnabled(5)
+      .build();
 
     // read access token from config or credentials provider
     logger.info("Reading dropbox access token from configuration or credentials provider");
     String accessToken = this.getConf().get("dropboxAccessToken", "");
 
-    this.client = new DbxClientV2(config, accessToken);
-    return this.client;
-  }
-
-  private boolean isDirectory(Metadata metadata) {
-    return metadata instanceof FolderMetadata;
-  }
-
-  private boolean isFile(Metadata metadata) {
-    return metadata instanceof FileMetadata;
-  }
-
-  private String getFileName(Path path){
-    return path.toUri().getPath();
-  }
-
-  static class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable {
-
-    public SeekableByteArrayInputStream(byte[] buf)
-    {
-      super(buf);
-    }
-    @Override
-    public long getPos() throws IOException{
-      return pos;
-    }
-
-    @Override
-    public void seek(long pos) throws IOException {
-      if (mark != 0) {
-        throw new IllegalStateException();
+    // If the user is using a static developer token, return a client with that.
+    if (StringUtils.isNotEmpty(accessToken)) {
+      client = new DbxClientV2(config, accessToken);
+      usesDeveloperToken = true;
+    } else {
+      // Otherwise, use OAuth tokens
+      CredentialsProvider credentialsProvider = getCredentialsProvider();
+      PersistentTokenTable tokenTable = getTokenTable();
+      OAuthTokenCredentials credentials = new OAuthTokenCredentials.Builder()
+        .setCredentialsProvider(credentialsProvider)
+        .setTokenTable(tokenTable)
+        .build()
+        .get();
+
+      long expiresIn = 0;
+      if (StringUtils.isNotEmpty(credentials.getExpiresIn())) {
+        expiresIn = Long.parseLong(credentials.getExpiresIn());
       }
 
-      reset();
-      long skipped = skip(pos);
-
-      if (skipped != pos) {
-        throw new IOException();
-      }
-    }
+      dbxCredential = new DbxCredential(credentials.getAccessToken(), expiresIn, credentials.getRefreshToken(),
+        credentials.getClientID(), credentials.getClientSecret());
 
-    @Override
-    public boolean seekToNewSource(long targetPos) throws IOException {
-      return false;
+      client = new DbxClientV2(config, dbxCredential);
+      usesDeveloperToken = false;
     }
 
-    @Override
-    public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+    return client;
+  }
 
-      if (position >= buf.length) {
-        throw new IllegalArgumentException();
-      }
-      if (position + length > buf.length) {
-        throw new IllegalArgumentException();
+  private void updateTokens() {
+    if (client == null || usesDeveloperToken) {
+      return;
+    } else if (dbxCredential.aboutToExpire()) {
+      try {
+        dbxCredential.refresh(config);
+      } catch (DbxException e) {
+        throw UserException.connectionError(e)
+          .message("Error refreshing Dropbox OAuth tokens: " + e.getMessage())
+          .build(logger);
       }
-      if (length > buffer.length) {
-        throw new IllegalArgumentException();
-      }
-
-      System.arraycopy(buf, (int) position, buffer, offset, length);
-      return length;
+      // Update the tokens in Drill
+      updateTokens(dbxCredential.getAccessToken(), dbxCredential.getRefreshToken(), String.valueOf(dbxCredential.getExpiresAt()));
     }
+  }
 
-    @Override
-    public void readFully(long position, byte[] buffer) throws IOException {
-      read(position, buffer, 0, buffer.length);
-
-    }
+  private boolean isDirectory(Metadata metadata) {
+    return metadata instanceof FolderMetadata;
+  }
 
-    @Override
-    public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
-      read(position, buffer, offset, length);
-    }
+  private String getFileName(Path path){
+    return path.toUri().getPath();
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
index b8b784c859..d9cc1200b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -34,6 +34,7 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
+import org.apache.drill.common.logical.OAuthConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.logical.security.CredentialsProvider;
@@ -56,14 +57,36 @@ public class FileSystemConfig extends StoragePluginConfig {
   private final Map<String, String> config;
   private final Map<String, WorkspaceConfig> workspaces;
   private final Map<String, FormatPluginConfig> formats;
+  private final OAuthConfig oAuthConfig;
+
+  public FileSystemConfig(String connection,
+    Map<String, String> config,
+    Map<String, WorkspaceConfig> workspaces,
+    Map<String, FormatPluginConfig> formats,
+    OAuthConfig oAuthConfig,
+    CredentialsProvider credentialsProvider) {
+    this(connection, config, workspaces, formats, oAuthConfig, null,
+      credentialsProvider);
+  }
+
+  public FileSystemConfig(String connection,
+    Map<String, String> config,
+    Map<String, WorkspaceConfig> workspaces,
+    Map<String, FormatPluginConfig> formats,
+    CredentialsProvider credentialsProvider) {
+    this(connection, config, workspaces, formats, null, null,
+      credentialsProvider);
+  }
 
   @JsonCreator
   public FileSystemConfig(@JsonProperty("connection") String connection,
                           @JsonProperty("config") Map<String, String> config,
                           @JsonProperty("workspaces") Map<String, WorkspaceConfig> workspaces,
                           @JsonProperty("formats") Map<String, FormatPluginConfig> formats,
+                          @JsonProperty("oAuthConfig") OAuthConfig oAuthConfig,
+                          @JsonProperty("authMode") String authMode,
                           @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
-    super(getCredentialsProvider(config, credentialsProvider), credentialsProvider == null);
+    super(credentialsProvider, credentialsProvider == null, AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER), oAuthConfig);
     this.connection = connection;
 
     // Force creation of an empty map so that configs compare equal
@@ -75,6 +98,7 @@ public class FileSystemConfig extends StoragePluginConfig {
     Map<String, WorkspaceConfig> caseInsensitiveWorkspaces = CaseInsensitiveMap.newHashMap();
     Optional.ofNullable(workspaces).ifPresent(caseInsensitiveWorkspaces::putAll);
     this.workspaces = caseInsensitiveWorkspaces;
+    this.oAuthConfig = oAuthConfig;
     this.formats = formats != null ? formats : new LinkedHashMap<>();
   }
 
@@ -161,7 +185,8 @@ public class FileSystemConfig extends StoragePluginConfig {
       formatsCopy.putAll(newFormats);
     }
     FileSystemConfig newConfig =
-        new FileSystemConfig(connection, configCopy, workspaces, formatsCopy, credentialsProvider);
+        new FileSystemConfig(connection, configCopy, workspaces, formatsCopy, oAuthConfig,
+          authMode.name(), credentialsProvider);
     newConfig.setEnabled(isEnabled());
     return newConfig;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 4c45a42c8c..4daf30dd7a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -38,8 +38,12 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.oauth.OAuthTokenProvider;
+import org.apache.drill.exec.oauth.PersistentTokenTable;
+import org.apache.drill.exec.oauth.TokenRegistry;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.planner.PlannerPhase;
@@ -49,6 +53,7 @@ import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.ClassPathFileSystem;
 import org.apache.drill.exec.store.LocalSyncableFileSystem;
 import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.shaded.guava.com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -80,6 +85,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin {
   private final Map<FormatPluginConfig, FormatPlugin> formatPluginsByConfig;
   private final FileSystemConfig config;
   private final Configuration fsConf;
+  private TokenRegistry tokenRegistry;
 
   public FileSystemPlugin(FileSystemConfig config, DrillbitContext context, String name) throws ExecutionSetupException {
     super(context, name);
@@ -93,6 +99,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin {
       fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.getConnection());
       fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
       fsConf.set("fs.dropbox.impl", DropboxFileSystem.class.getName());
+      fsConf.set("fs.box.impl", BoxFileSystem.class.getName());
       fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName());
       CredentialsProvider credentialsProvider = config.getCredentialsProvider();
       if (credentialsProvider != null) {
@@ -103,6 +110,8 @@ public class FileSystemPlugin extends AbstractStoragePlugin {
 
       if (isS3Connection(fsConf)) {
         handleS3Credentials(fsConf);
+      } else if (config.oAuthConfig() != null && config.getAuthMode() == AuthMode.SHARED_USER) {
+        initializeOauthTokenTable(null);
       }
 
       formatCreator = newFormatCreator(config, context, fsConf);
@@ -183,6 +192,31 @@ public class FileSystemPlugin extends AbstractStoragePlugin {
     }
   }
 
+  @VisibleForTesting
+  public void initializeOauthTokenTable(String username) {
+    OAuthTokenProvider tokenProvider = context.getOauthTokenProvider();
+    tokenRegistry = tokenProvider.getOauthTokenRegistry(username);
+    tokenRegistry.createTokenTable(getName());
+  }
+
+  public TokenRegistry getTokenRegistry() {
+    return tokenRegistry;
+  }
+
+  /**
+   * This method returns the {@link TokenRegistry} for a given user.  It is only used for testing user translation
+   * with OAuth 2.0.
+   * @param username A {@link String} of the current active user.
+   * @return A {@link TokenRegistry} for the given user.
+   */
+  @VisibleForTesting
+  public TokenRegistry getTokenRegistry(String username) {
+    initializeOauthTokenTable(username);
+    return tokenRegistry;
+  }
+
+  public PersistentTokenTable getTokenTable() { return tokenRegistry.getTokenTable(getName()); }
+
   /**
    * Creates a new FormatCreator instance.
    *
@@ -240,6 +274,12 @@ public class FileSystemPlugin extends AbstractStoragePlugin {
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    // For user translation mode, this is moved here because we don't have the
+    // active username in the constructor.  Removing it from the constructor makes
+    // it difficult to test, so we do the check and leave it in both places.
+    if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
+      initializeOauthTokenTable(schemaConfig.getUserName());
+    }
     schemaFactory.registerSchemas(schemaConfig, parent);
   }
 
@@ -282,4 +322,14 @@ public class FileSystemPlugin extends AbstractStoragePlugin {
   public Configuration getFsConf() {
     return new Configuration(fsConf);
   }
+
+  /**
+   * This function is only used for testing and creates the necessary token tables.  Note that
+   * the token tables still need to be populated.
+   */
+  @VisibleForTesting
+  public void initializeTokenTableForTesting() {
+    OAuthTokenProvider tokenProvider = context.getOauthTokenProvider();
+    tokenRegistry = tokenProvider.getOauthTokenRegistry(null);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index 98d46a1e00..f7b35aa3b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -29,6 +29,7 @@ import org.apache.calcite.schema.Function;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 
+import org.apache.drill.common.logical.OAuthConfig;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
@@ -82,13 +83,21 @@ public class FileSystemSchemaFactory extends AbstractSchemaFactory {
     public FileSystemSchema(String name, SchemaConfig schemaConfig) throws IOException {
       super(Collections.emptyList(), name);
       final DrillFileSystem fs = ImpersonationUtil.createFileSystem(schemaConfig.getUserName(), plugin.getFsConf());
+      // Set OAuth Information
+      OAuthConfig oAuthConfig = plugin.getConfig().oAuthConfig();
+      if (oAuthConfig != null) {
+        OAuthEnabledFileSystem underlyingFileSystem = (OAuthEnabledFileSystem) fs.getUnderlyingFs();
+        underlyingFileSystem.setPluginConfig(plugin.getConfig());
+        underlyingFileSystem.setTokenTable(plugin.getTokenTable());
+        underlyingFileSystem.setoAuthConfig(plugin.getConfig().oAuthConfig());
+      }
+
       for(WorkspaceSchemaFactory f :  factories){
         WorkspaceSchema s = f.createSchema(getSchemaPath(), schemaConfig, fs);
         if (s != null) {
           schemaMap.put(s.getName(), s);
         }
       }
-
       defaultSchema = schemaMap.get(DEFAULT_WS_NAME);
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/OAuthEnabledFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/OAuthEnabledFileSystem.java
new file mode 100644
index 0000000000..502204e5c6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/OAuthEnabledFileSystem.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.logical.OAuthConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.oauth.PersistentTokenTable;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class enables Drill to access file systems which use OAuth 2.0 for
+ * authorization. The class contains methods to interact with Drill's token management
+ * which makes use of a persistent store for the access and refresh tokens.
+ */
+public abstract class OAuthEnabledFileSystem extends FileSystem {
+  private static final Logger logger = LoggerFactory.getLogger(OAuthEnabledFileSystem.class);
+
+  private StoragePluginConfig pluginConfig;
+  private PersistentTokenTable tokenTable;
+  private CredentialsProvider credentialsProvider;
+  private OAuthConfig oAuthConfig;
+
+  public StoragePluginConfig getPluginConfig() {
+    return pluginConfig;
+  }
+
+  public void setPluginConfig(StoragePluginConfig pluginConfig) {
+    this.pluginConfig = pluginConfig;
+    this.credentialsProvider = pluginConfig.getCredentialsProvider();
+  }
+
+  public AuthMode getAuthMode() {
+    if (pluginConfig != null) {
+      return this.pluginConfig.getAuthMode();
+    } else {
+      return null;
+    }
+  }
+
+  public void setTokenTable(PersistentTokenTable tokenTable) {
+    this.tokenTable = tokenTable;
+  }
+
+  public PersistentTokenTable getTokenTable() {
+    return this.tokenTable;
+  }
+
+  public void setoAuthConfig(OAuthConfig oAuthConfig) {
+    this.oAuthConfig = oAuthConfig;
+  }
+
+  public OAuthConfig getoAuthConfig() {
+    return this.oAuthConfig;
+  }
+
+  public CredentialsProvider getCredentialsProvider() {
+    return this.credentialsProvider;
+  }
+
+  /**
+   * This function must be called by the inheritor class after every operation to make sure
+   * that the tokens stay current.  This method compares the access token with the one from the
+   * persistent store.  If the incoming tokens are different, it will update the persistent store.
+   * @param accessToken The new access token
+   * @param refreshToken The new refresh token
+   */
+  public void updateTokens(String accessToken, String refreshToken) {
+    if (StringUtils.isNotEmpty(accessToken) && ! accessToken.contentEquals(tokenTable.getAccessToken())) {
+      logger.debug("Updating access token for OAuth File System");
+      tokenTable.setAccessToken(accessToken);
+    }
+
+    if (StringUtils.isNotEmpty(refreshToken) && ! refreshToken.contentEquals(tokenTable.getRefreshToken())) {
+      logger.debug("Updating refresh token for OAuth File System");
+      tokenTable.setRefreshToken(refreshToken);
+    }
+  }
+
+  /**
+   * This function must be called by the inheritor class after every operation to make sure
+   * that the tokens stay current.  This method compares the access token with the one from the
+   * persistent store.  If the incoming tokens are different, it will update the persistent store.
+   * @param accessToken The new access token
+   * @param refreshToken The new refresh token
+   * @param expiresAt  The new expires at value.
+   */
+  public void updateTokens(String accessToken, String refreshToken, String expiresAt) {
+    updateTokens(accessToken, refreshToken);
+    if (StringUtils.isNotEmpty(expiresAt)) {
+      logger.debug("Updating expires at for OAuth File System.");
+      tokenTable.setExpiresIn(expiresAt);
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/SeekableBAIS.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/SeekableByteArrayInputStream.java
similarity index 50%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/SeekableBAIS.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/SeekableByteArrayInputStream.java
index ddc54919d3..1ed44a43c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/SeekableBAIS.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/SeekableByteArrayInputStream.java
@@ -15,42 +15,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.vector.complex.fn;
 
-import java.io.ByteArrayInputStream;
-import java.io.EOFException;
-import java.io.IOException;
+package org.apache.drill.exec.store.dfs;
 
+import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
 
-/**
- * A ByteArrayInputStream that supports the HDFS Seekable API.
- */
-public class SeekableBAIS extends ByteArrayInputStream implements Seekable {
-
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SeekableBAIS.class);
-
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 
-  public SeekableBAIS(byte[] buf, int offset, int length) {
-    super(buf, offset, length);
-  }
+public class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable {
 
-  public SeekableBAIS(byte[] buf) {
+  public SeekableByteArrayInputStream(byte[] buf)
+  {
     super(buf);
   }
+  @Override
+  public long getPos() throws IOException {
+    return pos;
+  }
 
   @Override
   public void seek(long pos) throws IOException {
-    if(pos > buf.length){
-      throw new EOFException();
+    if (mark != 0) {
+      throw new IllegalStateException();
     }
-    this.pos = (int) pos;
-    this.count = buf.length - (int) pos;
-  }
 
-  @Override
-  public long getPos() throws IOException {
-    return pos;
+    reset();
+    long skipped = skip(pos);
+
+    if (skipped != pos) {
+      throw new IOException();
+    }
   }
 
   @Override
@@ -58,6 +54,31 @@ public class SeekableBAIS extends ByteArrayInputStream implements Seekable {
     return false;
   }
 
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length) throws IOException {
 
+    if (position >= buf.length) {
+      throw new IllegalArgumentException();
+    }
+    if (position + length > buf.length) {
+      throw new IllegalArgumentException();
+    }
+    if (length > buffer.length) {
+      throw new IllegalArgumentException();
+    }
 
+    System.arraycopy(buf, (int) position, buffer, offset, length);
+    return length;
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer) throws IOException {
+    read(position, buffer, 0, buffer.length);
+
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+    read(position, buffer, offset, length);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java
index 8b80499bb6..b7c09f628f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java
@@ -126,6 +126,7 @@ public class OAuthUtils {
   public static Map<String, String> getOAuthTokens(OkHttpClient client, Request request) {
     String accessToken;
     String refreshToken;
+    String expiresIn;
     Map<String, String> tokens = new HashMap<>();
     Response response = null;
 
@@ -165,6 +166,13 @@ public class OAuthUtils {
         refreshToken = (String) parsedJson.get("refresh_token");
         tokens.put(OAuthTokenCredentials.REFRESH_TOKEN, refreshToken);
       }
+
+      // If we get an updated expires in time, update that as well.
+      if (parsedJson.containsKey("expires_in")) {
+        expiresIn = String.valueOf(parsedJson.get("expires_in"));
+        tokens.put(OAuthTokenCredentials.EXPIRES_IN, expiresIn);
+      }
+
       return tokens;
 
     } catch (NullPointerException | IOException e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/SeekableBAIS.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/SeekableBAIS.java
index ddc54919d3..bdfccbe8ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/SeekableBAIS.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/SeekableBAIS.java
@@ -21,12 +21,13 @@ import java.io.ByteArrayInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 
+import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
 
 /**
  * A ByteArrayInputStream that supports the HDFS Seekable API.
  */
-public class SeekableBAIS extends ByteArrayInputStream implements Seekable {
+public class SeekableBAIS extends ByteArrayInputStream implements Seekable, PositionedReadable {
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SeekableBAIS.class);
 
@@ -59,5 +60,71 @@ public class SeekableBAIS extends ByteArrayInputStream implements Seekable {
   }
 
 
+  /**
+   * Read up to the specified number of bytes, from a given
+   * position within a file, and return the number of bytes read. This does not
+   * change the current offset of a file, and is thread-safe.
+   *
+   * <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
+   *
+   * @param position position within file
+   * @param buffer   destination buffer
+   * @param offset   offset in the buffer
+   * @param length   number of bytes to read
+   * @return actual number of bytes read; -1 means "none"
+   * @throws IOException IO problems.
+   */
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+    if (position >= buf.length) {
+      throw new IllegalArgumentException();
+    }
+    if (position + length > buf.length) {
+      throw new IllegalArgumentException();
+    }
+    if (length > buffer.length) {
+      throw new IllegalArgumentException();
+    }
+
+    System.arraycopy(buf, (int) position, buffer, offset, length);
+    return length;
+  }
 
+  /**
+   * Read the specified number of bytes, from a given
+   * position within a file. This does not
+   * change the current offset of a file, and is thread-safe.
+   *
+   * <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
+   *
+   * @param position position within file
+   * @param buffer   destination buffer
+   * @param offset   offset in the buffer
+   * @param length   number of bytes to read
+   * @throws IOException  IO problems.
+   * @throws EOFException the end of the data was reached before
+   *                      the read operation completed
+   */
+  @Override
+  public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+    read(position, buffer, offset, length);
+  }
+
+  /**
+   * Read number of bytes equal to the length of the buffer, from a given
+   * position within a file. This does not
+   * change the current offset of a file, and is thread-safe.
+   *
+   * <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
+   *
+   * @param position position within file
+   * @param buffer   destination buffer
+   * @throws IOException  IO problems.
+   * @throws EOFException the end of the data was reached before
+   *                      the read operation completed
+   */
+  @Override
+  public void readFully(long position, byte[] buffer) throws IOException {
+    read(position, buffer, 0, buffer.length);
+  }
 }
diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
index eb6d5ceb43..1a928a0c05 100644
--- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
@@ -244,6 +244,62 @@
         }
       },
       "enabled" : false
+    },
+    "box" : {
+      "type" : "file",
+      "connection" : "box:///",
+      "workspaces" : {
+        "root" : {
+          "location" : "/",
+          "writable" : false
+        }
+      },
+      "formats" : {
+        "csv" : {
+          "type" : "text",
+          "extensions" : [ "csv" ],
+          "fieldDelimiter" : ","
+        },
+        "tsv" : {
+          "type" : "text",
+          "extensions" : [ "tsv" ],
+          "fieldDelimiter" : "\t"
+        },
+        "json" : {
+          "type" : "json",
+          "extensions" : [ "json" ]
+        },
+        "parquet" : {
+          "type" : "parquet"
+        },
+        "avro" : {
+          "type" : "avro"
+        },
+        "csvh" : {
+          "type" : "text",
+          "extensions" : [ "csvh" ],
+          "fieldDelimiter" : ",",
+          "extractHeader" : true
+        }
+      },
+      "oAuthConfig": {
+        "callbackURL": "http://localhost:8047/credentials/box/update_oauth2_authtoken",
+        "authorizationURL": "https://account.box.com/api/oauth2/authorize",
+        "authorizationParams": {
+          "response_type": "code"
+        }
+      },
+      "credentialsProvider": {
+        "credentialsProviderType": "PlainCredentialsProvider",
+        "credentials": {
+          "clientID": "<YOUR CLIENT ID>",
+          "clientSecret": "<YOUR CLIENT SECRET>",
+          "tokenURI": "https://api.box.com/oauth2/token"
+        },
+        "userCredentials": {}
+      },
+      "enabled" : false,
+      "authMode": "SHARED_USER"
     }
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
index 6fd881f87d..6ae2869b71 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
@@ -131,7 +131,7 @@ public class BaseTestImpersonation extends PlanTestBase {
     createAndAddWorkspace("tmp", "/tmp", (short) 0777, processUser, processUser, workspaces);
 
     FileSystemConfig miniDfsPluginConfig = new FileSystemConfig(connection, null,
-        workspaces, lfsPluginConfig.getFormats(), null);
+        workspaces, lfsPluginConfig.getFormats(), null, null);
     miniDfsPluginConfig.setEnabled(true);
     pluginRegistry.put(MINI_DFS_STORAGE_PLUGIN_NAME, miniDfsPluginConfig);
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
index 80d2fd75ca..187e794551 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
@@ -66,7 +66,7 @@ public class TestCTTAS extends BaseTestQuery {
         pluginConfig.getConnection(),
         pluginConfig.getConfig(),
         newWorkspaces,
-        pluginConfig.getFormats(),
+        pluginConfig.getFormats(), null,
         PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     newPluginConfig.setEnabled(pluginConfig.isEnabled());
     pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/BoxFileSystemTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/BoxFileSystemTest.java
new file mode 100644
index 0000000000..f0100dc8f0
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/BoxFileSystemTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.OAuthConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.drill.exec.store.easy.json.JSONFormatConfig;
+import org.apache.drill.exec.store.easy.text.TextFormatConfig;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+/**
+ * This class tests the Box / Drill connectivity.  This class requires a Box connection, so these
+ * tests must be run manually. Additionally, Box access and refresh tokens seem to be reset after
+ * every API call, so it is not possible, unlike GoogleSheets, to store access tokens for testing.
+ *<p>
+ * Testing instructions can be found in the developer documentation in the docs folder.  TL;DR, you
+ * will need a developer token.
+ */
+@Ignore("Please create a Box API key and run these tests manually")
+public class BoxFileSystemTest extends ClusterTest {
+
+  private static final String ACCESS_TOKEN = "<Your Box Access Token Here>";
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    assertFalse(ACCESS_TOKEN.equalsIgnoreCase("<Your Box Access Token Here>"));
+
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    Map<String, String> boxConfigVars = new HashMap<>();
+    boxConfigVars.put("boxAccessToken", ACCESS_TOKEN);
+
+    // Create workspaces
+    WorkspaceConfig rootWorkspace = new WorkspaceConfig("/", false, null, false);
+    WorkspaceConfig csvWorkspace = new WorkspaceConfig("/csv", false, null, false);
+    Map<String, WorkspaceConfig> workspaces = new HashMap<>();
+    workspaces.put("root", rootWorkspace);
+    workspaces.put("csv", csvWorkspace);
+
+    // Add formats
+    Map<String, FormatPluginConfig> formats = new HashMap<>();
+    List<String> jsonExtensions = new ArrayList<>();
+    jsonExtensions.add("json");
+    FormatPluginConfig jsonFormatConfig = new JSONFormatConfig(jsonExtensions, null, null, null, null, null);
+
+    // CSV Format
+    List<String> csvExtensions = new ArrayList<>();
+    csvExtensions.add("csv");
+    csvExtensions.add("csvh");
+    FormatPluginConfig csvFormatConfig = new TextFormatConfig(csvExtensions, "\n", ",", "\"", null, null, false, true);
+
+
+    Map<String, String> authParams = new HashMap<>();
+    authParams.put("response_type", "code");
+
+    OAuthConfig oAuthConfig = OAuthConfig.builder()
+      .authorizationURL("https://account.box.com/api/oauth2/authorize")
+      .callbackURL("http://localhost:8047/credentials/box_test/update_oauth2_authtoken")
+      .authorizationParams(authParams)
+      .build();
+
+
+    Map<String,String> credentials = new HashMap<>();
+    credentials.put("clientID", "<your client ID>");
+    credentials.put("clientSecret", "<your client secret>");
+    credentials.put("tokenURI", "https://api.box.com/oauth2/token");
+
+    CredentialsProvider credentialsProvider = new PlainCredentialsProvider(credentials);
+
+    StoragePluginConfig boxConfig = new FileSystemConfig("box:///", boxConfigVars,
+      workspaces, formats, oAuthConfig, AuthMode.SHARED_USER.name(), credentialsProvider);
+    boxConfig.setEnabled(true);
+
+    cluster.defineStoragePlugin("box_test", boxConfig);
+    cluster.defineFormat("box_test", "json", jsonFormatConfig);
+    cluster.defineFormat("box_test", "csv", csvFormatConfig);
+  }
+
+
+  @Test
+  public void testListFiles() throws Exception {
+    String sql = "SHOW FILES IN box_test.root";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    assertEquals(6, results.rowCount());
+    results.clear();
+  }
+
+  @Test
+  public void testStarQuery() throws Exception {
+    String sql = "SELECT * FROM box_test.root.`hdf-test.csv` LIMIT 10";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    assertEquals(10, results.rowCount());
+    results.clear();
+  }
+
+  @Test
+  public void testCSVQueryWithWorkspace() throws Exception {
+    String sql = "select * from `box_test`.`csv`.`hdf-test.csv` LIMIT 5";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    assertEquals(5, results.rowCount());
+    results.clear();
+  }
+
+  @Test
+  public void testJSONQuery() throws Exception {
+    String sql = "SELECT * FROM `box_test`.root.`http-pcap.json` LIMIT 5";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    assertEquals(5, results.rowCount());
+    assertEquals(7,results.batchSchema().getFieldCount());
+    results.clear();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java
index ae198ad731..23318f2cf3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java
@@ -88,7 +88,8 @@ public class DropboxFileSystemTest extends ClusterTest {
     FormatPluginConfig csvFormatConfig = new TextFormatConfig(csvExtensions, "\n", ",", "\"", null, null, false, true);
 
 
-    StoragePluginConfig dropboxConfig = new FileSystemConfig("dropbox:///", dropboxConfigVars, workspaces, formats, null);
+    StoragePluginConfig dropboxConfig = new FileSystemConfig("dropbox:///", dropboxConfigVars,
+      workspaces, formats, null, null);
     dropboxConfig.setEnabled(true);
 
     cluster.defineStoragePlugin("dropbox_test", dropboxConfig);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
index ab315c6571..c5535bc476 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
@@ -84,7 +84,7 @@ public class TestPluginRegistry extends BaseTest {
 
   private FileSystemConfig myConfig1() {
     FileSystemConfig config = new FileSystemConfig("myConn",
-        new HashMap<>(), new HashMap<>(), new HashMap<>(),
+        new HashMap<>(), new HashMap<>(), new HashMap<>(), null,
         PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     config.setEnabled(true);
     return config;
@@ -94,7 +94,7 @@ public class TestPluginRegistry extends BaseTest {
     Map<String, String> props = new HashMap<>();
     props.put("foo", "bar");
     FileSystemConfig config = new FileSystemConfig("myConn",
-        props, new HashMap<>(), new HashMap<>(),
+        props, new HashMap<>(), new HashMap<>(), null,
         PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     config.setEnabled(true);
     return config;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/StoragePluginTestUtils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
index 9ca931ab84..2d48dbfbb1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
@@ -77,7 +77,7 @@ public class StoragePluginTestUtils {
         pluginConfig.getConnection(),
         pluginConfig.getConfig(),
         newWorkspaces,
-        pluginConfig.getFormats(),
+        pluginConfig.getFormats(), null,
         PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     newPluginConfig.setEnabled(pluginConfig.isEnabled());
     pluginRegistry.put(pluginName, newPluginConfig);
@@ -114,7 +114,7 @@ public class StoragePluginTestUtils {
         fileSystemConfig.getConnection(),
         fileSystemConfig.getConfig(),
         fileSystemConfig.getWorkspaces(),
-        newFormats,
+        newFormats, null,
         PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     newFileSystemConfig.setEnabled(fileSystemConfig.isEnabled());
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index 3c94f4f4ea..c1f17b1084 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -583,6 +583,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
       pluginConfig.getConfig(),
       newWorkspaces == null ? pluginConfig.getWorkspaces() : newWorkspaces,
       newFormats == null ? pluginConfig.getFormats() : newFormats,
+      null,
       PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     newPluginConfig.setEnabled(pluginConfig.isEnabled());
 
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index 6831b69e9c..0d93fce150 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -215,6 +215,10 @@
           <groupId>com.dropbox.core</groupId>
           <artifactId>dropbox-core-sdk</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.box</groupId>
+          <artifactId>box-java-sdk</artifactId>
+        </exclusion>
         <exclusion>
           <groupId>org.apache.commons</groupId>
           <artifactId>commons-compress</artifactId>