You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2020/04/14 02:33:00 UTC

[drill] branch master updated: DRILL-7437: Storage Plugin for Generic HTTP REST API

This is an automated email from the ASF dual-hosted git repository.

progers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d47a61  DRILL-7437: Storage Plugin for Generic HTTP REST API
4d47a61 is described below

commit 4d47a61aefa77004f7541e59820c68a8ed8bad80
Author: Charles Givre <cg...@apache.org>
AuthorDate: Mon Apr 13 10:16:36 2020 -0400

    DRILL-7437: Storage Plugin for Generic HTTP REST API
---
 common/pom.xml                                     |   2 -
 .../native/client/src/protobuf/UserBitShared.pb.cc |  15 +-
 .../native/client/src/protobuf/UserBitShared.pb.h  |   5 +-
 contrib/pom.xml                                    |   1 +
 contrib/storage-http/README.md                     | 301 +++++++++++++
 contrib/storage-http/images/issue_count.png        | Bin 0 -> 50256 bytes
 contrib/storage-http/pom.xml                       | 101 +++++
 .../drill/exec/store/http/HttpAPIConfig.java       | 165 +++++++
 .../exec/store/http/HttpAPIConnectionSchema.java   |  83 ++++
 .../drill/exec/store/http/HttpBatchReader.java     |  97 +++++
 .../drill/exec/store/http/HttpGroupScan.java       | 168 ++++++++
 .../exec/store/http/HttpScanBatchCreator.java      | 104 +++++
 .../apache/drill/exec/store/http/HttpScanSpec.java |  72 ++++
 .../drill/exec/store/http/HttpSchemaFactory.java   | 101 +++++
 .../drill/exec/store/http/HttpStoragePlugin.java}  |  54 +--
 .../exec/store/http/HttpStoragePluginConfig.java   | 170 ++++++++
 .../apache/drill/exec/store/http/HttpSubScan.java  | 130 ++++++
 .../exec/store/http/util/HttpProxyConfig.java      | 220 ++++++++++
 .../drill/exec/store/http/util/SimpleHttp.java     | 267 ++++++++++++
 .../main/resources/bootstrap-storage-plugins.json  |   9 +
 .../src/main/resources/drill-module.conf           |  27 ++
 .../drill/exec/store/http/TestHttpPlugin.java      | 479 +++++++++++++++++++++
 .../drill/exec/store/http/TestHttpProxy.java       | 233 ++++++++++
 .../src/test/resources/data/response.json          |  14 +
 .../drill/exec/store/kudu/KuduStoragePlugin.java   |   2 +-
 distribution/pom.xml                               |   5 +
 distribution/src/assemble/component.xml            |   1 +
 .../src/main/resources/drill-override-example.conf |  31 ++
 .../java/org/apache/drill/exec/ExecConstants.java  |  17 +
 .../exec/store/easy/json/JSONRecordReader.java     |  36 +-
 .../drill/exec/vector/complex/fn/JsonReader.java   |   2 +-
 .../java-exec/src/main/resources/drill-module.conf |  39 ++
 .../easy/json/parser/TestJsonParserBasics.java     |  32 +-
 .../apache/drill/common/expression/SchemaPath.java |  21 +
 .../org/apache/drill/exec/proto/UserBitShared.java |  21 +-
 protocol/src/main/protobuf/UserBitShared.proto     |   1 +
 36 files changed, 2948 insertions(+), 78 deletions(-)

diff --git a/common/pom.xml b/common/pom.xml
index 889275e..8198be6 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -37,14 +37,12 @@
       <artifactId>drill-protocol</artifactId>
       <version>${project.version}</version>
     </dependency>
-
     <dependency>
       <!-- add as provided scope so that we can compile TestTools.  Should only be ever used in a test scenario where someone else is bringing JUnit in. -->
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>${junit.version}</version>
     </dependency>
-
     <dependency>
       <groupId>com.typesafe</groupId>
       <artifactId>config</artifactId>
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
index 5f2186f..9ce65c2 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
@@ -1034,7 +1034,7 @@ void AddDescriptorsImpl() {
       "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000"
       "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014"
       "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022"
-      "\032\n\026CANCELLATION_REQUESTED\020\006*\344\n\n\020CoreOper"
+      "\032\n\026CANCELLATION_REQUESTED\020\006*\367\n\n\020CoreOper"
       "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST"
       "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020"
       "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH"
@@ -1069,14 +1069,14 @@ void AddDescriptorsImpl() {
       "MERGE\020=\022\021\n\rLTSV_SUB_SCAN\020>\022\021\n\rHDF5_SUB_S"
       "CAN\020\?\022\022\n\016EXCEL_SUB_SCAN\020@\022\020\n\014SHP_SUB_SCA"
       "N\020A\022\024\n\020METADATA_HANDLER\020B\022\027\n\023METADATA_CO"
-      "NTROLLER\020C*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN"
-      "\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002"
-      "\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033o"
-      "rg.apache.drill.exec.protoB\rUserBitShare"
-      "dH\001"
+      "NTROLLER\020C\022\021\n\rHTTP_SUB_SCAN\020F*g\n\nSaslSta"
+      "tus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n"
+      "\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n"
+      "\013SASL_FAILED\020\004B.\n\033org.apache.drill.exec."
+      "protoB\rUserBitSharedH\001"
   };
   ::google::protobuf::DescriptorPool::InternalAddGeneratedFile(
-      descriptor, 5763);
+      descriptor, 5782);
   ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
     "UserBitShared.proto", &protobuf_RegisterTypes);
   ::protobuf_Types_2eproto::AddDescriptors();
@@ -1323,6 +1323,7 @@ bool CoreOperatorType_IsValid(int value) {
     case 65:
     case 66:
     case 67:
+    case 70:
       return true;
     default:
       return false;
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h b/contrib/native/client/src/protobuf/UserBitShared.pb.h
index 0aa41cf..d5483bc 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.h
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h
@@ -358,11 +358,12 @@ enum CoreOperatorType {
   EXCEL_SUB_SCAN = 64,
   SHP_SUB_SCAN = 65,
   METADATA_HANDLER = 66,
-  METADATA_CONTROLLER = 67
+  METADATA_CONTROLLER = 67,
+  HTTP_SUB_SCAN = 70
 };
 bool CoreOperatorType_IsValid(int value);
 const CoreOperatorType CoreOperatorType_MIN = SINGLE_SENDER;
-const CoreOperatorType CoreOperatorType_MAX = METADATA_CONTROLLER;
+const CoreOperatorType CoreOperatorType_MAX = HTTP_SUB_SCAN;
 const int CoreOperatorType_ARRAYSIZE = CoreOperatorType_MAX + 1;
 
 const ::google::protobuf::EnumDescriptor* CoreOperatorType_descriptor();
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 8f81d48..15f3dd5 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -54,6 +54,7 @@
     <module>storage-kafka</module>
     <module>storage-kudu</module>
     <module>storage-opentsdb</module>
+    <module>storage-http</module>
   </modules>
 
 </project>
diff --git a/contrib/storage-http/README.md b/contrib/storage-http/README.md
new file mode 100644
index 0000000..3b9965c
--- /dev/null
+++ b/contrib/storage-http/README.md
@@ -0,0 +1,301 @@
+# Generic REST API Storage Plugin
+
+This plugin is intended to enable you to query APIs over HTTP/REST. At this point, the API reader will only accept JSON as input however in the future, it may be possible to
+ add additional format readers to allow for APIs which return XML, CSV or other formats.
+
+Note:  This plugin should **NOT** be used for interacting with tools which have REST APIs such as Splunk or Solr. It will not be performant for those use cases.
+
+## Configuration
+
+To configure the plugin, create a new storage plugin, and add the following configuration options which apply to ALL connections defined in this plugin:
+
+```json
+{
+  "type": "http",
+  "cacheResults": true,
+  "connections": {},
+  "timeout": 0,
+  "proxyHost": null, 
+  "proxyPort": 0,
+  "proxyType": null,
+  "proxyUsername": null,
+  "proxyPassword": null,
+  "enabled": true
+}
+```
+The required options are:
+* `type`:  This should be `http`
+* `cacheResults`:  Enable caching of the HTTP responses.  Defaults to `false`
+* `timeout`:  Sets the response timeout in seconds. Defaults to `0` which is no timeout.
+* `connections`:  This field contains the details for individual connections. See the section *Configuring API Connections for Details*.
+
+You can configure Drill to work behind a corporate proxy. Details are listed below. 
+
+### Configuring the API Connections
+
+The HTTP Storage plugin allows you to configure multiple APIS which you can query directly from this plugin. To do so, first add a `connections` parameter to the configuration
+. Next give the connection a name, which will be used in queries.  For instance `stockAPI` or `jira`.
+
+The `connection` can accept the following options:
+* `url`: The base URL which Drill will query. You should include the ending slash if there are additional arguments which you are passing.
+* `method`: The request method. Must be `get` or `post`. Other methods are not allowed and will default to `GET`.
+* `headers`: Often APIs will require custom headers as part of the authentication. This field allows you to define key/value pairs which are submitted with the http request
+.  The format is:
+```json
+headers: {
+   "key1": "Value1",
+   "key2": "Value2"
+}
+```
+* `authType`: If your API requires authentication, specify the authentication type. At the time of implementation, the plugin only supports basic authentication, however, the
+ plugin will likely support OAUTH2 in the future. Defaults to `none`. If the `authType` is set to `basic`, `username` and `password` must be set in the configuration as well.
+ * `username`: The username for basic authentication.
+ * `password`: The password for basic authentication.
+ * `postBody`: Contains data, in the form of key value pairs, which are sent during a `POST` request. Post body should be in the form:
+ ```
+key1=value1
+key2=value2
+```
+
+## Usage
+
+This plugin is different from other plugins in that it the table component of the `FROM` clause is different. In normal Drill queries, the `FROM` clause is constructed as follows:
+```sql
+FROM <storage plugin>.<schema>.<table>
+```
+For example, you might have:
+```sql
+FROM dfs.test.`somefile.csv`
+
+-- or
+
+FROM mongo.stats.sales_data
+```
+
+The HTTP/REST plugin the `FROM` clause enables you to pass arguments to your REST call. The structure is:
+```sql
+FROM <plugin>.<connection>.<arguments>
+--Actual example:
+ FROM http.sunrise.`/json?lat=36.7201600&lng=-4.4203400&date=today`
+```
+
+## Proxy Setup
+
+Some users access HTTP services from behind a proxy firewall. Drill provides three ways specify proxy
+configuration.
+
+### Proxy Environment Variables
+
+Drill recognizes the usual Linux proxy environment variables:
+
+* `http_proxy`, `HTTP_PROXY`
+* `https_proxy`, `HTTP_PROXY`
+* `all_proxy`, `ALL_PROXY`
+
+This technique works well if your system is already configured to
+handle proxies.
+
+### Boot Configuration
+
+You can also specify proxy configuration in the `drill-override.conf` file.
+See `drill-override-example.conf` for a template.
+
+First, you can use the same form of URL you would use with the environment
+variables:
+
+```
+drill.exec.net_proxy.http_url: "http://foo.com/1234"
+```
+
+There is one setting for HTTP, another for HTTPS.
+
+Alternatively, you can specify each field separately:
+
+```
+drill.exec.net_proxy.http: {
+      type: "none", # none, http, socks. Blank same as none.
+      host: "",
+      port: 80,
+      user_name: "",
+      password: ""
+    },
+```
+
+The valid proxy types are `none`, `http` and `socks`. Blank is the same
+as `none`.
+
+Again, there is a parallel section for HTTPS.
+
+Either of these approaches is preferred if the proxy is an attribute of your
+network environment and is the same for all external HTTP/HTTPS requests.
+
+### In the HTTP Storage Plugin Config
+
+The final way to configure proxy is in the HTTP storage plugin itself. The proxy
+applies to all connections defined in that plugin. Use this approach if the proxy
+applies only to some external services, or if each service has a different proxy
+(defined by creating a separate plugin config for each service.)
+
+```json
+      proxy_type: "direct",
+      proxy_host: "",
+      proxy_port: 80,
+      proxy_user_name: "",
+      proxy_password: ""
+```
+
+The valid proxy types are `direct`, `http` or `socks`. Blank is the same
+as `direct`.
+
+## Examples
+
+### Example 1: Reference Data, A Sunrise/Sunset API
+
+The API sunrise-sunset.org returns data in the following format:
+
+ ```json
+ "results":
+ {
+   "sunrise":"7:27:02 AM",
+   "sunset":"5:05:55 PM",
+   "solar_noon":"12:16:28 PM",
+   "day_length":"9:38:53",
+   "civil_twilight_begin":"6:58:14 AM",
+   "civil_twilight_end":"5:34:43 PM",
+   "nautical_twilight_begin":"6:25:47 AM",
+   "nautical_twilight_end":"6:07:10 PM",
+   "astronomical_twilight_begin":"5:54:14 AM",
+   "astronomical_twilight_end":"6:38:43 PM"
+ },
+  "status":"OK"
+}
+```
+To query this API, set the configuration as follows:
+
+```json
+
+ {
+   "type": "http",
+   "cacheResults": false,
+   "enabled": true,
+   "timeout": 5,
+   "connections": {
+     "sunrise": {
+       "url": "https://api.sunrise-sunset.org/",
+       "method": "GET",
+       "headers": null,
+       "authType": "none",
+       "userName": null,
+       "password": null,
+       "postBody": null
+     }
+   }
+}
+
+```
+Then, to execute a query:
+```sql
+    SELECT api_results.results.sunrise AS sunrise,
+    api_results.results.sunset AS sunset
+    FROM http.sunrise.`/json?lat=36.7201600&lng=-4.4203400&date=today` AS api_results;
+```
+Which yields the following results:
+```
++------------+------------+
+|  sunrise   |   sunset   |
++------------+------------+
+| 7:17:46 AM | 5:01:33 PM |
++------------+------------+
+1 row selected (0.632 seconds)
+```
+
+### Example 2: JIRA
+
+JIRA Cloud has a REST API which is [documented here](https://developer.atlassian.com/cloud/jira/platform/rest/v3/?utm_source=%2Fcloud%2Fjira%2Fplatform%2Frest%2F&utm_medium=302).
+
+To connect Drill to JIRA Cloud, use the following configuration:
+```json
+{
+  "type": "http",
+  "cacheResults": false,
+  "timeout": 5,
+  "connections": {
+    "sunrise": {
+      "url": "https://api.sunrise-sunset.org/",
+      "method": "GET",
+      "headers": null,
+      "authType": "none",
+      "userName": null,
+      "password": null,
+      "postBody": null
+    },
+    "jira": {
+      "url": "https://<project>.atlassian.net/rest/api/3/",
+      "method": "GET",
+      "headers": {
+        "Accept": "application/json"
+      },
+      "authType": "basic",
+      "userName": "<username>",
+      "password": "<API Key>",
+      "postBody": null
+    }
+  },
+  "enabled": true
+}
+```
+
+Once you've configured Drill to query the API, you can now easily access any of your data in JIRA. The JIRA API returns highly nested data, however with a little preparation, it
+ is pretty straightforward to transform it into a more useful table. For instance, the
+ query below:
+```sql
+SELECT jira_data.issues.key AS key,
+jira_data.issues.fields.issueType.name AS issueType,
+SUBSTR(jira_data.issues.fields.created, 1, 10) AS created,
+SUBSTR(jira_data.issues.fields.updated, 1, 10) AS updated,
+jira_data.issues.fields.assignee.displayName as assignee,
+jira_data.issues.fields.creator.displayName as creator,
+jira_data.issues.fields.summary AS summary,
+jira_data.issues.fields.status.name AS currentStatus,
+jira_data.issues.fields.priority.name AS priority,
+jira_data.issues.fields.labels AS labels,
+jira_data.issues.fields.subtasks AS subtasks
+FROM (
+SELECT flatten(t1.issues) as issues
+FROM http.jira.`search?jql=project=<project>&&maxResults=100` AS t1
+) AS jira_data
+```
+The query below counts the number of issues by priority:
+
+```sql
+SELECT
+jira_data.issues.fields.priority.name AS priority,
+COUNT(*) AS issue_count
+FROM (
+SELECT flatten(t1.issues) as issues
+FROM http.jira.`search?jql=project=<project>&maxResults=100` AS t1
+) AS jira_data
+GROUP BY priority
+ORDER BY issue_count DESC
+```
+
+<img src="images/issue_count.png"  alt="Issue Count by Priority"/>
+
+
+## Limitations
+
+1. The plugin is supposed to follow redirects, however if you are using Authentication, you may encounter errors or empty responses if you are counting on the endpoint for
+   redirection.
+
+2. At this time, the plugin does not support any authentication other than basic authentication. Future functionality may include OAUTH2 authentication and/or PKI
+  authentication for REST APIs.
+
+3. This plugin does not implement filter pushdowns. Filter pushdown has the potential to improve performance.
+
+4. This plugin only reads JSON responses. Future functionality may include the ability to parse XML, CSV or other common rest responses.
+
+5. At this time `POST` bodies can only be in the format of key/value pairs. Some APIs accept JSON based `POST` bodies and this is not currently supported.
+
+6. The returned message should contain only records, as a JSON array of objects (or as a series of JSON objects as in a JSON file). The
+   present version does not yet have the ability to ignore message "overhead" such as status codes, etc.  You can of course, select individual fields in your query to ignore
+    "overhead" fields. 
diff --git a/contrib/storage-http/images/issue_count.png b/contrib/storage-http/images/issue_count.png
new file mode 100644
index 0000000..49bc04f
Binary files /dev/null and b/contrib/storage-http/images/issue_count.png differ
diff --git a/contrib/storage-http/pom.xml b/contrib/storage-http/pom.xml
new file mode 100644
index 0000000..0c2c875
--- /dev/null
+++ b/contrib/storage-http/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>1.18.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-storage-http</artifactId>
+  <name>contrib/http-storage-plugin</name>
+
+  <properties>
+    <okhttp.version>4.5.0</okhttp.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.squareup.okhttp3</groupId>
+      <artifactId>okhttp</artifactId>
+      <version>${okhttp.version}</version>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.squareup.okhttp3</groupId>
+      <artifactId>mockwebserver</artifactId>
+      <version>${okhttp.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-java-sources</id>
+            <phase>process-sources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${basedir}/target/classes/org/apache/drill/exec/store/http
+              </outputDirectory>
+              <resources>
+                <resource>
+                  <directory>src/main/java/org/apache/drill/exec/store/http</directory>
+                  <filtering>true</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConfig.java
new file mode 100644
index 0000000..a850d1d
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConfig.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.parquet.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class HttpAPIConfig {
+  private static final Logger logger = LoggerFactory.getLogger(HttpAPIConfig.class);
+
+  private final String url;
+
+  private final HttpMethods method;
+
+  private final Map<String, String> headers;
+
+  private final String authType;
+
+  private final String userName;
+
+  private final String password;
+
+  private final String postBody;
+
+  public enum HttpMethods {
+    /**
+     * Value for HTTP GET method
+     */
+    GET,
+    /**
+     * Value for HTTP POST method
+     */
+    POST;
+  }
+
+  public HttpAPIConfig(@JsonProperty("url") String url,
+                       @JsonProperty("method") String method,
+                       @JsonProperty("headers") Map<String, String> headers,
+                       @JsonProperty("authType") String authType,
+                       @JsonProperty("userName") String userName,
+                       @JsonProperty("password") String password,
+                       @JsonProperty("postBody") String postBody) {
+
+    this.headers = headers;
+    this.method = Strings.isNullOrEmpty(method)
+        ? HttpMethods.GET : HttpMethods.valueOf(method.trim().toUpperCase());
+
+    // Get the request method.  Only accept GET and POST requests.  Anything else will default to GET.
+    switch (this.method) {
+      case GET:
+      case POST:
+        break;
+      default:
+        throw UserException
+          .validationError()
+          .message("Invalid HTTP method: %s.  Drill supports 'GET' and , 'POST'.", method)
+          .build(logger);
+    }
+    if (Strings.isNullOrEmpty(url)) {
+      throw UserException
+        .validationError()
+        .message("URL is required for the HTTP storage plugin.")
+        .build(logger);
+    }
+
+    // Put a trailing slash on the URL if it is missing
+    if (url.charAt(url.length() - 1) != '/') {
+      this.url = url + "/";
+    } else {
+      this.url = url;
+    }
+
+    // Get the authentication method. Future functionality will include OAUTH2 authentication but for now
+    // Accept either basic or none.  The default is none.
+    this.authType = Strings.isNullOrEmpty(authType) ? "none" : authType;
+    this.userName = userName;
+    this.password = password;
+    this.postBody = postBody;
+  }
+
+  @JsonProperty("url")
+  public String url() { return url; }
+
+  @JsonProperty("method")
+  public String method() { return method.toString(); }
+
+  @JsonProperty("headers")
+  public Map<String, String> headers() { return headers; }
+
+  @JsonProperty("authType")
+  public String authType() { return authType; }
+
+  @JsonProperty("userName")
+  public String userName() { return userName; }
+
+  @JsonProperty("password")
+  public String password() { return password; }
+
+  @JsonProperty("postBody")
+  public String postBody() { return postBody; }
+
+  @JsonIgnore
+  public HttpMethods getMethodType() {
+    return HttpMethods.valueOf(this.method());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(url, method, headers, authType, userName, password, postBody);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("url", url)
+      .field("method", method)
+      .field("headers", headers)
+      .field("authType", authType)
+      .field("username", userName)
+      .field("password", password)
+      .field("postBody", postBody)
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    HttpAPIConfig other = (HttpAPIConfig) obj;
+    return Objects.equals(url, other.url)
+      && Objects.equals(method, other.method)
+      && Objects.equals(headers, other.headers)
+      && Objects.equals(authType, other.authType)
+      && Objects.equals(userName, other.userName)
+      && Objects.equals(password, other.password)
+      && Objects.equals(postBody, other.postBody);
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java
new file mode 100644
index 0000000..d3fa51f
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import org.apache.calcite.schema.Table;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * In the HTTP storage plugin, users can define specific connections or APIs.
+ * This class represents the database component of other storage plugins.
+ */
+public class HttpAPIConnectionSchema extends AbstractSchema {
+
+  private final Map<String, DynamicDrillTable> activeTables = new HashMap<>();
+
+  private final HttpStoragePlugin plugin;
+
+  private final String pluginName;
+
+  public HttpAPIConnectionSchema(HttpSchemaFactory.HttpSchema httpSchema,
+                                 String name,
+                                 HttpStoragePlugin plugin) {
+    super(httpSchema.getSchemaPath(), name);
+    this.plugin = plugin;
+    pluginName = plugin.getName();
+  }
+
+  @Override
+  public String getTypeName() {
+    return HttpStoragePluginConfig.NAME;
+  }
+
+  /**
+   * Gets the table that is received from the query. In this case, the table actually are arguments which are passed
+   * in the URL string.
+   *
+   * @param tableName
+   *          The "tableName" actually will contain the URL arguments passed to
+   *          the record reader
+   * @return the selected table
+   */
+  @Override
+  public Table getTable(String tableName) {
+    DynamicDrillTable table = activeTables.get(name);
+    if (table != null) {
+      // Return the found table
+      return table;
+    } else {
+      // Register a new table
+      return registerTable(name, new DynamicDrillTable(plugin, pluginName, new HttpScanSpec(pluginName, name, tableName, plugin.getConfig())));
+    }
+  }
+
+  @Override
+  public Set<String> getTableNames() {
+    return activeTables.keySet();
+  }
+
+  private DynamicDrillTable registerTable(String name, DynamicDrillTable table) {
+    activeTables.put(name, table);
+    return table;
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
new file mode 100644
index 0000000..7aaa1e8
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import java.io.File;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoader;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
+import org.apache.drill.exec.store.http.util.HttpProxyConfig;
+import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyBuilder;
+import org.apache.drill.exec.store.http.util.SimpleHttp;
+
+import com.typesafe.config.Config;
+
+public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
+  private final HttpStoragePluginConfig config;
+  private final HttpSubScan subScan;
+  private JsonLoader jsonLoader;
+
+  public HttpBatchReader(HttpStoragePluginConfig config, HttpSubScan subScan) {
+    this.config = config;
+    this.subScan = subScan;
+  }
+
+  @Override
+  public boolean open(SchemaNegotiator negotiator) {
+    CustomErrorContext errorContext = negotiator.parentErrorContext();
+
+    // Result set loader setup
+    String tempDirPath = negotiator
+        .drillConfig()
+        .getString(ExecConstants.DRILL_TMP_DIR);
+    ResultSetLoader loader = negotiator.build();
+
+    // Http client setup
+    SimpleHttp http = new SimpleHttp(config, new File(tempDirPath), subScan.tableSpec().database(), proxySettings(negotiator.drillConfig()), errorContext);
+
+    // JSON loader setup
+    jsonLoader = new JsonLoaderBuilder()
+        .resultSetLoader(loader)
+        .standardOptions(negotiator.queryOptions())
+        .errorContext(errorContext)
+        .fromStream(http.getInputStream(subScan.getFullURL()))
+        .build();
+
+    // Please read the first batch
+    return true;
+  }
+
+ private HttpProxyConfig proxySettings(Config drillConfig) {
+    ProxyBuilder builder = HttpProxyConfig.builder()
+        .fromConfigForURL(drillConfig, subScan.getFullURL());
+    String proxyType = config.proxyType();
+    if (proxyType != null && !"direct".equals(proxyType)) {
+      builder
+        .type(config.proxyType())
+        .host(config.proxyHost())
+        .port(config.proxyPort())
+        .username(config.proxyUsername())
+        .password(config.proxyPassword());
+    }
+    return builder.build();
+  }
+
+ @Override
+  public boolean next() {
+    return jsonLoader.readBatch();
+  }
+
+  @Override
+  public void close() {
+    if (jsonLoader != null) {
+      jsonLoader.close();
+      jsonLoader = null;
+    }
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
new file mode 100644
index 0000000..9e4e86d
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import java.util.List;
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+
+@JsonTypeName("http-scan")
+public class HttpGroupScan extends AbstractGroupScan {
+
+  private final List<SchemaPath> columns;
+  private final HttpScanSpec httpScanSpec;
+  private final HttpStoragePluginConfig config;
+
+  public HttpGroupScan (
+    HttpStoragePluginConfig config,
+    HttpScanSpec scanSpec,
+    List<SchemaPath> columns) {
+    super("no-user");
+    this.config = config;
+    this.httpScanSpec = scanSpec;
+    this.columns = columns;
+  }
+
+  public HttpGroupScan(HttpGroupScan that) {
+    super(that);
+    config = that.config();
+    httpScanSpec = that.httpScanSpec();
+    columns = that.getColumns();
+  }
+
+  public HttpGroupScan(HttpGroupScan that, List<SchemaPath> columns) {
+    super("no-user");
+    this.columns = columns;
+    this.config = that.config;
+    this.httpScanSpec = that.httpScanSpec;
+  }
+
+  @JsonCreator
+  public HttpGroupScan(
+    @JsonProperty("config") HttpStoragePluginConfig config,
+    @JsonProperty("columns") List<SchemaPath> columns,
+    @JsonProperty("httpScanSpec") HttpScanSpec httpScanSpec
+  ) {
+    super("no-user");
+    this.config = config;
+    this.columns = columns;
+    this.httpScanSpec = httpScanSpec;
+  }
+
+  @JsonProperty("config")
+  public HttpStoragePluginConfig config() { return config; }
+
+  @JsonProperty("columns")
+  public List<SchemaPath> columns() { return columns; }
+
+  @JsonProperty("httpScanSpec")
+  public HttpScanSpec httpScanSpec() { return httpScanSpec; }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+    // No filter pushdowns yet, so this method does nothing
+    return;
+  }
+
+  @Override
+  @JsonIgnore
+  public int getMaxParallelizationWidth() {
+    return 1;
+  }
+
+  @Override
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return true;
+  }
+
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId) {
+    return new HttpSubScan(config, httpScanSpec, columns);
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    return new HttpGroupScan(this, columns);
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new HttpGroupScan(this);
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    int estRowCount = 10_000;
+    int rowWidth = columns == null ? 200 : 100;
+    int estDataSize = estRowCount * 200 * rowWidth;
+    int estCpuCost = DrillCostBase.PROJECT_CPU_COST;
+    return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT,
+        estRowCount, estCpuCost, estDataSize);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("httpScanSpec", httpScanSpec)
+      .field("columns", columns)
+      .field("httpStoragePluginConfig", config)
+      .toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(httpScanSpec, columns, config);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    HttpGroupScan other = (HttpGroupScan) obj;
+    return Objects.equals(httpScanSpec, other.httpScanSpec())
+      && Objects.equals(columns, other.columns())
+      && Objects.equals(config, other.config());
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
new file mode 100644
index 0000000..46d9838
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> {
+
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, HttpSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+
+    try {
+      ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan);
+      return builder.buildScanOperator(context, subScan);
+    } catch (UserException e) {
+      // Rethrow user exceptions directly
+      throw e;
+    } catch (Throwable e) {
+      // Wrap all others
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  private ScanFrameworkBuilder createBuilder(OptionManager options,
+      HttpSubScan subScan) {
+    HttpStoragePluginConfig config = subScan.config();
+    ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
+    builder.projection(subScan.columns());
+    builder.setUserName(subScan.getUserName());
+
+    // Provide custom error context
+    builder.errorContext(
+        new ChildErrorContext(builder.errorContext()) {
+          @Override
+          public void addContext(UserException.Builder builder) {
+            builder.addContext("URL", subScan.getFullURL());
+          }
+        });
+
+    // Reader
+    ReaderFactory readerFactory = new HttpReaderFactory(config, subScan);
+    builder.setReaderFactory(readerFactory);
+    builder.nullType(Types.optional(MinorType.VARCHAR));
+    return builder;
+  }
+
+  private static class HttpReaderFactory implements ReaderFactory {
+
+    private final HttpStoragePluginConfig config;
+    private final HttpSubScan subScan;
+    private int count;
+
+    public HttpReaderFactory(HttpStoragePluginConfig config, HttpSubScan subScan) {
+      this.config = config;
+      this.subScan = subScan;
+    }
+
+    @Override
+    public void bind(ManagedScanFramework framework) { }
+
+    @Override
+    public ManagedReader<SchemaNegotiator> next() {
+
+      // Only a single scan (in a single thread)
+      if (count++ == 0) {
+        return new HttpBatchReader(config, subScan);
+      } else {
+        return null;
+      }
+    }
+  }}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java
new file mode 100644
index 0000000..19921bc
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+
+@JsonTypeName("http-scan-spec")
+public class HttpScanSpec {
+
+  protected final String schemaName;
+
+  protected final String database;
+
+  protected final String tableName;
+
+  protected final HttpStoragePluginConfig config;
+
+  @JsonCreator
+  public HttpScanSpec(@JsonProperty("schemaName") String schemaName,
+                      @JsonProperty("database") String database,
+                      @JsonProperty("tableName") String tableName,
+                      @JsonProperty("config") HttpStoragePluginConfig config) {
+    this.schemaName = schemaName;
+    this.database = database;
+    this.tableName = tableName;
+    this.config = config;
+  }
+
+  @JsonProperty("database")
+  public String database() {
+    return database;
+  }
+
+  @JsonProperty("tableName")
+  public String tableName() {
+    return tableName;
+  }
+
+  @JsonIgnore
+  public String getURL() {
+    return database;
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("schemaName", schemaName)
+      .field("database", database)
+      .field("tableName", tableName)
+      .field("config", config)
+      .toString();
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java
new file mode 100644
index 0000000..c111d3d
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HttpSchemaFactory extends AbstractSchemaFactory {
+  private static final Logger logger = LoggerFactory.getLogger(HttpSchemaFactory.class);
+
+  private final HttpStoragePlugin plugin;
+
+  public HttpSchemaFactory(HttpStoragePlugin plugin, String schemaName) {
+    super(schemaName);
+    this.plugin = plugin;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+    HttpSchema schema = new HttpSchema(getName());
+    logger.debug("Registering {} {}", schema.getName(), schema.toString());
+
+    SchemaPlus schemaPlus = parent.add(getName(), schema);
+    schema.setHolder(schemaPlus);
+  }
+
+  class HttpSchema extends AbstractSchema {
+
+    public HttpSchema(String name) {
+      super(Collections.emptyList(), name);
+    }
+
+    void setHolder(SchemaPlus plusOfThis) {
+      for (String s : getSubSchemaNames()) {
+        plusOfThis.add(s, getSubSchemaKnownExists(s));
+      }
+    }
+
+    @Override
+    public Set<String> getSubSchemaNames() {
+      HttpStoragePluginConfig config = plugin.getConfig();
+      Map<String, HttpAPIConfig> connections = config.connections();
+      Set<String> subSchemaNames = new HashSet<>();
+
+      // Get the possible subschemas.
+      for (Map.Entry<String, HttpAPIConfig> entry : connections.entrySet()) {
+        subSchemaNames.add(entry.getKey());
+      }
+      return subSchemaNames;
+    }
+
+    @Override
+    public AbstractSchema getSubSchema(String name) {
+      if (plugin.getConfig().connections().containsKey(name)) {
+        return getSubSchemaKnownExists(name);
+      } else {
+        throw UserException
+          .connectionError()
+          .message("API '{}' does not exist in HTTP Storage plugin '{}'", name, getName())
+          .build(logger);
+      }
+    }
+
+    /**
+     * Helper method to get subschema when we know it exists (already checked the existence)
+     */
+    private HttpAPIConnectionSchema getSubSchemaKnownExists(String name) {
+      return new HttpAPIConnectionSchema(this, name, plugin);
+    }
+
+    @Override
+    public String getTypeName() {
+      return HttpStoragePluginConfig.NAME;
+    }
+  }
+}
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
similarity index 53%
copy from contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
copy to contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
index 35c974d..c660a2e 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
@@ -15,67 +15,47 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.kudu;
-
-import java.io.IOException;
+package org.apache.drill.exec.store.http;
 
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.kudu.client.KuduClient;
-
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
 
-public class KuduStoragePlugin extends AbstractStoragePlugin {
+public class HttpStoragePlugin extends AbstractStoragePlugin {
 
-  private final KuduStoragePluginConfig engineConfig;
-  private final KuduSchemaFactory schemaFactory;
+  private final HttpStoragePluginConfig config;
 
-  private final KuduClient client;
+  private final HttpSchemaFactory schemaFactory;
 
-  public KuduStoragePlugin(KuduStoragePluginConfig configuration, DrillbitContext context, String name)
-      throws IOException {
+  public HttpStoragePlugin(HttpStoragePluginConfig configuration, DrillbitContext context, String name) {
     super(context, name);
-    this.schemaFactory = new KuduSchemaFactory(this, name);
-    this.engineConfig = configuration;
-    this.client = new KuduClient.KuduClientBuilder(configuration.getMasterAddresses()).build();
-  }
-
-  public KuduClient getClient() {
-    return client;
-  }
-
-  @Override
-  public void close() throws Exception {
-    client.close();
+    this.config = configuration;
+    this.schemaFactory = new HttpSchemaFactory(this, name);
   }
 
   @Override
-  public boolean supportsRead() {
-    return true;
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+    schemaFactory.registerSchemas(schemaConfig, parent);
   }
 
   @Override
-  public KuduGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
-    KuduScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<KuduScanSpec>() {});
-    return new KuduGroupScan(this, scanSpec, null);
+  public HttpStoragePluginConfig getConfig() {
+    return config;
   }
 
   @Override
-  public boolean supportsWrite() {
+  public boolean supportsRead() {
     return true;
   }
 
   @Override
-  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    schemaFactory.registerSchemas(schemaConfig, parent);
-  }
-
-  @Override
-  public KuduStoragePluginConfig getConfig() {
-    return engineConfig;
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+    HttpScanSpec scanSpec = selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<HttpScanSpec>() {});
+    return new HttpGroupScan(config, scanSpec, null);
   }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
new file mode 100644
index 0000000..2b65bb9
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+
+
+@JsonTypeName(HttpStoragePluginConfig.NAME)
+public class HttpStoragePluginConfig extends StoragePluginConfigBase {
+  private static final Logger logger = LoggerFactory.getLogger(HttpStoragePluginConfig.class);
+
+  public static final String NAME = "http";
+
+  public final Map<String, HttpAPIConfig> connections;
+
+  public final boolean cacheResults;
+
+  public final String proxyHost;
+
+  public final int proxyPort;
+
+  public final String proxyType;
+
+  public final String proxyUsername;
+
+  public final String proxyPassword;
+
+  /**
+   * Timeout in seconds.
+   */
+  public int timeout;
+
+  @JsonCreator
+  public HttpStoragePluginConfig(@JsonProperty("cacheResults") Boolean cacheResults,
+                                 @JsonProperty("connections") Map<String, HttpAPIConfig> connections,
+                                 @JsonProperty("timeout") Integer timeout,
+                                 @JsonProperty("proxyHost") String proxyHost,
+                                 @JsonProperty("proxyPort") Integer proxyPort,
+                                 @JsonProperty("proxyType") String proxyType,
+                                 @JsonProperty("proxyUsername") String proxyUsername,
+                                 @JsonProperty("proxyPassword") String proxyPassword
+                                 ) {
+    this.cacheResults = cacheResults == null ? false : cacheResults;
+
+    this.connections = CaseInsensitiveMap.newHashMap();
+    if (connections != null) {
+      this.connections.putAll(connections);
+    }
+
+    this.timeout = timeout == null ? 0 : timeout;
+    this.proxyHost = normalize(proxyHost);
+    this.proxyPort = proxyPort == null ? 0 : proxyPort;
+    this.proxyUsername = normalize(proxyUsername);
+    this.proxyPassword = normalize(proxyPassword);
+    proxyType = normalize(proxyType);
+    this.proxyType = proxyType == null
+        ? "direct" : proxyType.trim().toLowerCase();
+
+    // Validate Proxy Type
+    if (this.proxyType != null) {
+      switch (this.proxyType) {
+        case "direct":
+        case "http":
+        case "socks":
+          break;
+        default:
+          throw UserException
+            .validationError()
+            .message("Invalid Proxy Type: %s.  Drill supports 'direct', 'http' and 'socks' proxies.", proxyType)
+            .build(logger);
+      }
+    }
+  }
+
+  private static String normalize(String value) {
+    if (value == null) {
+      return value;
+    }
+    value = value.trim();
+    return value.isEmpty() ? null : value;
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (this == that) {
+      return true;
+    } else if (that == null || getClass() != that.getClass()) {
+      return false;
+    }
+    HttpStoragePluginConfig thatConfig = (HttpStoragePluginConfig) that;
+    return Objects.equals(connections, thatConfig.connections) &&
+           Objects.equals(cacheResults, thatConfig.cacheResults) &&
+           Objects.equals(proxyHost, thatConfig.proxyHost) &&
+           Objects.equals(proxyPort, thatConfig.proxyPort) &&
+           Objects.equals(proxyType, thatConfig.proxyType) &&
+           Objects.equals(proxyUsername, thatConfig.proxyUsername) &&
+           Objects.equals(proxyPassword, thatConfig.proxyPassword);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("connections", connections)
+      .field("cacheResults", cacheResults)
+      .field("timeout", timeout)
+      .field("proxyHost", proxyHost)
+      .field("proxyPort", proxyPort)
+      .field("proxyUsername", proxyUsername)
+      .field("proxyPassword", proxyPassword)
+      .field("proxyType", proxyType)
+      .toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(connections, cacheResults, timeout,
+        proxyHost, proxyPort, proxyType, proxyUsername, proxyPassword);
+  }
+
+  @JsonProperty("cacheResults")
+  public boolean cacheResults() { return cacheResults; }
+
+  @JsonProperty("connections")
+  public Map<String, HttpAPIConfig> connections() { return connections; }
+
+  @JsonProperty("timeout")
+  public int timeout() { return timeout;}
+
+  @JsonProperty("proxyHost")
+  public String proxyHost() { return proxyHost; }
+
+  @JsonProperty("proxyPort")
+  public int proxyPort() { return proxyPort; }
+
+  @JsonProperty("proxyUsername")
+  public String proxyUsername() { return proxyUsername; }
+
+  @JsonProperty("proxyPassword")
+  public String proxyPassword() { return proxyPassword; }
+
+  @JsonProperty("proxyType")
+  public String proxyType() { return proxyType; }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
new file mode 100644
index 0000000..700ad70
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+@JsonTypeName("http-sub-scan")
+public class HttpSubScan extends AbstractBase implements SubScan {
+
+  private final HttpScanSpec tableSpec;
+  private final HttpStoragePluginConfig config;
+  private final List<SchemaPath> columns;
+
+  @JsonCreator
+  public HttpSubScan(
+    @JsonProperty("config") HttpStoragePluginConfig config,
+    @JsonProperty("tableSpec") HttpScanSpec tableSpec,
+    @JsonProperty("columns") List<SchemaPath> columns) {
+    super("user-if-needed");
+    this.config = config;
+    this.tableSpec = tableSpec;
+    this.columns = columns;
+  }
+  @JsonProperty("tableSpec")
+  public HttpScanSpec tableSpec() {
+    return tableSpec;
+  }
+
+  @JsonProperty("columns")
+  public List<SchemaPath> columns() {
+    return columns;
+  }
+
+  @JsonProperty("config")
+  public HttpStoragePluginConfig config() {
+    return config;
+  }
+
+  @JsonIgnore
+  public String getURL() {
+    return tableSpec.getURL();
+  }
+
+  @JsonIgnore
+  public String getFullURL() {
+    String selectedConnection = tableSpec.database();
+    String url = config.connections().get(selectedConnection).url();
+    return url + tableSpec.tableName();
+  }
+
+ @Override
+  public <T, X, E extends Throwable> T accept(
+   PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    return new HttpSubScan(config, tableSpec, columns);
+  }
+
+  @Override
+  @JsonIgnore
+  public int getOperatorType() {
+    return CoreOperatorType.HTTP_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return ImmutableSet.<PhysicalOperator>of().iterator();
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("tableSpec", tableSpec)
+      .field("columns", columns)
+      .field("config", config)
+      .toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(tableSpec,columns,config);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    HttpSubScan other = (HttpSubScan) obj;
+    return Objects.equals(tableSpec, other.tableSpec)
+      && Objects.equals(columns, other.columns)
+      && Objects.equals(config, other.config);
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/HttpProxyConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/HttpProxyConfig.java
new file mode 100644
index 0000000..41f6d35
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/HttpProxyConfig.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http.util;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.parquet.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+/**
+ * HTTP proxy settings. Provides a builder to create settings
+ * from the Drill config or from code. Allows combining the two.
+ * The Drill config allows integrating with Linux env. vars. Allows
+ * combinations: take values from config, but selectively replace bits,
+ * such as the user name/password.
+ * <p>
+ * This class provides values passed to the HTTP client, whatever it
+ * might be.
+ *
+ * @see <a href="https://www.shellhacks.com/linux-proxy-server-settings-set-proxy-command-line/">
+ * Proxy Server Settings</a>
+ *
+ */
+public class HttpProxyConfig {
+  private static final Logger logger = LoggerFactory.getLogger(HttpProxyConfig.class);
+
+  public enum ProxyType {
+    NONE, HTTP, SOCKS
+  }
+
+  public static class ProxyBuilder {
+    private String url;
+    private String typeStr;
+    private ProxyType type = ProxyType.NONE;
+    private String host;
+    private int port = 80;
+    private String username;
+    private String password;
+
+    public ProxyBuilder fromHttpConfig(Config config) {
+      url(config.getString(ExecConstants.HTTP_PROXY_URL));
+      type(config.getString(ExecConstants.HTTP_PROXY_TYPE));
+      host(config.getString(ExecConstants.HTTP_PROXY_HOST));
+      port(config.getInt(ExecConstants.HTTP_PROXY_PORT));
+      username(config.getString(ExecConstants.HTTP_PROXY_USER_NAME));
+      password(config.getString(ExecConstants.HTTP_PROXY_PASSWORD));
+      return this;
+    }
+
+    public ProxyBuilder fromHttpsConfig(Config config) {
+      url(config.getString(ExecConstants.HTTPS_PROXY_URL));
+      type(config.getString(ExecConstants.HTTPS_PROXY_TYPE));
+      host(config.getString(ExecConstants.HTTPS_PROXY_HOST));
+      port(config.getInt(ExecConstants.HTTPS_PROXY_PORT));
+      username(config.getString(ExecConstants.HTTPS_PROXY_USER_NAME));
+      password(config.getString(ExecConstants.HTTPS_PROXY_PASSWORD));
+      return this;
+    }
+
+    public ProxyBuilder fromConfigForURL(Config config, String url) {
+      try {
+        URL parsed = new URL(url);
+        if (parsed.getProtocol().equals("https")) {
+          return fromHttpsConfig(config);
+        }
+      } catch (Exception e) {
+        // This is not the place to warn about a bad URL.
+        // Just assume HTTP, something later will fail.
+      }
+      return fromHttpConfig(config);
+    }
+
+    public ProxyBuilder url(String url) {
+      this.url = url;
+      return this;
+    }
+
+    public ProxyBuilder type(ProxyType type) {
+      this.type = type;
+      this.typeStr = null;
+      return this;
+    }
+
+    public ProxyBuilder type(String type) {
+      this.typeStr = type;
+      this.type = null;
+      return this;
+    }
+
+    public ProxyBuilder host(String host) {
+      this.host = host;
+      return this;
+    }
+
+    public ProxyBuilder port(int port) {
+      this.port = port;
+      return this;
+    }
+
+    public ProxyBuilder username(String username) {
+      this.username = username;
+      return this;
+    }
+
+    public ProxyBuilder password(String password) {
+      this.password = password;
+      return this;
+    }
+
+    public HttpProxyConfig build() {
+      buildFromUrl();
+      buildType();
+
+      // Info can come from the config file. Ignore extra spaces.
+      host = host == null ? null : host.trim();
+      username = username == null ? null : username.trim();
+      password = password == null ? null : password.trim();
+      return new HttpProxyConfig(this);
+    }
+
+    private void buildFromUrl() {
+      url = url == null ? null : url.trim();
+      if (Strings.isNullOrEmpty(url)) {
+        return;
+      }
+      typeStr = null;
+      URL parsed;
+      try {
+        parsed = new URL(url);
+      } catch (MalformedURLException e) {
+        logger.warn("Invalid proxy url: {}, assuming NONE", typeStr);
+        type = ProxyType.NONE;
+        return;
+      }
+      type = ProxyType.HTTP;
+      host = parsed.getHost();
+      port = parsed.getPort();
+      String userInfo = parsed.getUserInfo();
+      if (userInfo != null) {
+        String[] parts = userInfo.split(":");
+        username = parts[0];
+        password = parts.length > 1 ? parts[1] : null;
+      }
+    }
+
+    private void buildType() {
+
+      // If type string, validate to a type
+      if (typeStr != null) {
+        typeStr = typeStr.trim().toUpperCase();
+        if (!Strings.isNullOrEmpty(typeStr)) {
+          try {
+            type = ProxyType.valueOf(typeStr);
+          } catch (IllegalArgumentException e) {
+            logger.warn("Invalid proxy type: {}, assuming NONE", typeStr);
+            this.type = ProxyType.NONE;
+          }
+        }
+      }
+
+      // If not type, assume NONE
+      if (type == null) {
+        type = ProxyType.NONE;
+      }
+
+      // Validate host based on type
+      if (type != ProxyType.NONE) {
+        host = host == null ? null : host.trim();
+        if (Strings.isNullOrEmpty(host)) {
+          logger.warn("{} proxy type specified, but host is null. Reverting to NONE",
+              type.name());
+          type = ProxyType.NONE;
+        }
+      }
+
+      // If no proxy, ignore other settings
+      if (type == ProxyType.NONE) {
+        host = null;
+        username = null;
+        password = null;
+      }
+    }
+  }
+
+  public final ProxyType type;
+  public final String host;
+  public final int port;
+  public final String username;
+  public final String password;
+
+  private HttpProxyConfig(ProxyBuilder builder) {
+    this.type = builder.type;
+    this.host = builder.host;
+    this.port = builder.port;
+    this.username = builder.username;
+    this.password = builder.password;
+  }
+
+  public static ProxyBuilder builder() { return new ProxyBuilder(); }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
new file mode 100644
index 0000000..9bc234f
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http.util;
+
+import okhttp3.Authenticator;
+import okhttp3.Cache;
+import okhttp3.Credentials;
+import okhttp3.FormBody;
+import okhttp3.Interceptor;
+import okhttp3.OkHttpClient;
+import okhttp3.OkHttpClient.Builder;
+import okhttp3.Request;
+import okhttp3.Response;
+
+import okhttp3.Route;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.http.HttpAPIConfig;
+import org.apache.drill.exec.store.http.HttpAPIConfig.HttpMethods;
+import org.apache.drill.exec.store.http.HttpStoragePluginConfig;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+
+/**
+ * Performs the actual HTTP requests for the HTTP Storage Plugin. The core
+ * method is the getInputStream() method which accepts a url and opens an
+ * InputStream with that URL's contents.
+ */
+public class SimpleHttp {
+  private static final Logger logger = LoggerFactory.getLogger(SimpleHttp.class);
+
+  private final OkHttpClient client;
+  private final HttpStoragePluginConfig config;
+  private final HttpAPIConfig apiConfig;
+  private final File tempDir;
+  private final HttpProxyConfig proxyConfig;
+  private final CustomErrorContext errorContext;
+
+  public SimpleHttp(HttpStoragePluginConfig config, File tempDir,
+      String connectionName, HttpProxyConfig proxyConfig,
+      CustomErrorContext errorContext) {
+    this.config = config;
+    this.tempDir = tempDir;
+    this.apiConfig = config.connections().get(connectionName);
+    this.proxyConfig = proxyConfig;
+    this.errorContext = errorContext;
+    this.client = setupHttpClient();
+  }
+
+  public InputStream getInputStream(String urlStr) {
+    Request.Builder requestBuilder;
+
+    requestBuilder = new Request.Builder()
+        .url(urlStr);
+
+    // The configuration does not allow for any other request types other than POST and GET.
+    if (apiConfig.getMethodType() == HttpMethods.POST) {
+      // Handle POST requests
+      FormBody.Builder formBodyBuilder = buildPostBody();
+      requestBuilder.post(formBodyBuilder.build());
+    }
+
+    // Add headers to request
+    if (apiConfig.headers() != null) {
+      for (Map.Entry<String, String> entry : apiConfig.headers().entrySet()) {
+        requestBuilder.addHeader(entry.getKey(), entry.getValue());
+      }
+    }
+
+    // Build the request object
+    Request request = requestBuilder.build();
+
+    try {
+      // Execute the request
+      Response response = client
+        .newCall(request)
+        .execute();
+
+      // If the request is unsuccessful, throw a UserException
+      if (!response.isSuccessful()) {
+        throw UserException
+          .dataReadError()
+          .message("Error retrieving data from HTTP Storage Plugin: %d %s",
+              response.code(), response.message())
+          .addContext(errorContext)
+          .build(logger);
+      }
+      logger.debug("HTTP Request for {} successful.", urlStr);
+      logger.debug("Response Headers: {} ", response.headers().toString());
+
+      // Return the InputStream of the response
+      return Objects.requireNonNull(response.body()).byteStream();
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Error retrieving data from HTTP Storage Plugin: %s", e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+  }
+
+  /**
+   * Configures the OkHTTP3 server object with configuration info from the user.
+   *
+   * @return OkHttpClient configured server
+   */
+  private OkHttpClient setupHttpClient() {
+    Builder builder = new OkHttpClient.Builder();
+
+    // Set up the HTTP Cache.   Future possibilities include making the cache size and retention configurable but
+    // right now it is on or off.  The writer will write to the Drill temp directory if it is accessible and
+    // output a warning if not.
+    if (config.cacheResults()) {
+      setupCache(builder);
+    }
+
+    // If the API uses basic authentication add the authentication code.
+    if (apiConfig.authType().toLowerCase().equals("basic")) {
+      logger.debug("Adding Interceptor");
+      builder.addInterceptor(new BasicAuthInterceptor(apiConfig.userName(), apiConfig.password()));
+    }
+
+    // Set timeouts
+    builder.connectTimeout(config.timeout(), TimeUnit.SECONDS);
+    builder.writeTimeout(config.timeout(), TimeUnit.SECONDS);
+    builder.readTimeout(config.timeout(), TimeUnit.SECONDS);
+
+    // Set the proxy configuration
+
+    Proxy.Type proxyType;
+    switch (proxyConfig.type) {
+      case SOCKS:
+        proxyType = Proxy.Type.SOCKS;
+        break;
+      case HTTP:
+        proxyType = Proxy.Type.HTTP;
+        break;
+      default:
+        proxyType = Proxy.Type.DIRECT;
+    }
+    if (proxyType != Proxy.Type.DIRECT) {
+      builder.proxy(new Proxy(proxyType,
+          new InetSocketAddress(proxyConfig.host, proxyConfig.port)));
+      if (proxyConfig.username != null) {
+        builder.proxyAuthenticator(new Authenticator() {
+          @Override public Request authenticate(Route route, Response response) {
+            String credential = Credentials.basic(proxyConfig.username, proxyConfig.password);
+            return response.request().newBuilder()
+              .header("Proxy-Authorization", credential)
+              .build();
+          }
+        });
+      }
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Configures response caching using a provided temp directory.
+   *
+   * @param builder
+   *          Builder the Builder object to which the caching is to be
+   *          configured
+   */
+  private void setupCache(Builder builder) {
+    int cacheSize = 10 * 1024 * 1024;   // TODO Add cache size in MB to config
+    File cacheDirectory = new File(tempDir, "http-cache");
+    if (!cacheDirectory.mkdirs()) {
+      throw UserException.dataWriteError()
+        .message("Could not create the HTTP cache directory")
+        .addContext("Path", cacheDirectory.getAbsolutePath())
+        .addContext("Please check the temp directory or disable HTTP caching.")
+        .addContext(errorContext)
+        .build(logger);
+    }
+    try {
+      Cache cache = new Cache(cacheDirectory, cacheSize);
+      logger.debug("Caching HTTP Query Results at: {}", cacheDirectory);
+      builder.cache(cache);
+    } catch (Exception e) {
+      throw UserException.dataWriteError(e)
+        .message("Could not create the HTTP cache")
+        .addContext("Path", cacheDirectory.getAbsolutePath())
+        .addContext("Please check the temp directory or disable HTTP caching.")
+        .addContext(errorContext)
+        .build(logger);
+    }
+  }
+
+  /**
+   * Accepts text from a post body in the format:<br>
+   * {@code key1=value1}<br>
+   * {@code key2=value2}
+   * <p>
+   * and creates the appropriate headers.
+   *
+   * @return FormBody.Builder The populated formbody builder
+   */
+  private FormBody.Builder buildPostBody() {
+    final Pattern postBodyPattern = Pattern.compile("^.+=.+$");
+
+    FormBody.Builder formBodyBuilder = new FormBody.Builder();
+    String[] lines = apiConfig.postBody().split("\\r?\\n");
+    for(String line : lines) {
+
+      // If the string is in the format key=value split it,
+      // Otherwise ignore
+      if (postBodyPattern.matcher(line).find()) {
+        //Split into key/value
+        String[] parts = line.split("=");
+        formBodyBuilder.add(parts[0], parts[1]);
+      }
+    }
+    return formBodyBuilder;
+  }
+
+  /**
+   * Intercepts requests and adds authentication headers to the request
+   */
+  public static class BasicAuthInterceptor implements Interceptor {
+    private final String credentials;
+
+    public BasicAuthInterceptor(String user, String password) {
+      credentials = Credentials.basic(user, password);
+    }
+
+    @NotNull
+    @Override
+    public Response intercept(Chain chain) throws IOException {
+      // Get the existing request
+      Request request = chain.request();
+
+      // Replace with new request containing the authorization headers and previous headers
+      Request authenticatedRequest = request.newBuilder().header("Authorization", credentials).build();
+      return chain.proceed(authenticatedRequest);
+    }
+  }
+}
diff --git a/contrib/storage-http/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-http/src/main/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..9afcaee
--- /dev/null
+++ b/contrib/storage-http/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,9 @@
+{
+  "storage":{
+    "http" : {
+      "type":"http",
+      "connections": {},
+      "enabled": false
+    }
+  }
+}
diff --git a/contrib/storage-http/src/main/resources/drill-module.conf b/contrib/storage-http/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..c0f2a93
--- /dev/null
+++ b/contrib/storage-http/src/main/resources/drill-module.conf
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#  This file tells Drill to consider this module when class path scanning.
+#  This file can also include any supplementary configuration information.
+#  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill: {
+  classpath.scanning: {
+    packages += "org.apache.drill.exec.store.http"
+  }
+}
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
new file mode 100644
index 0000000..f99df7c
--- /dev/null
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import okio.Buffer;
+import okio.Okio;
+
+/**
+ * Tests the HTTP Storage plugin. Since the plugin makes use of REST requests,
+ * this test class makes use of the okhttp3 MockWebServer to simulate a remote
+ * web server. There are two unit tests that make remote REST calls, however
+ * these tests are ignored by default.
+ * <p>
+ * The HTTP reader uses Drill's existing JSON reader class, so the unit tests
+ * focus on testing the plugin configurations rather than how well it parses the
+ * JSON as this is tested elsewhere.
+ */
+public class TestHttpPlugin extends ClusterTest {
+
+  private static final int MOCK_SERVER_PORT = 8091;
+  private static String TEST_JSON_RESPONSE;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response.json"), Charsets.UTF_8).read();
+
+    dirTestWatcher.copyResourceToRoot(Paths.get("data/"));
+
+    Map<String, String> headers = new HashMap<>();
+    headers.put("header1", "value1");
+    headers.put("header2", "value2");
+
+    HttpAPIConfig mockConfig = new HttpAPIConfig("http://localhost:8091/", "GET", headers, "basic", "user", "pass",null);
+
+    HttpAPIConfig sunriseConfig = new HttpAPIConfig("https://api.sunrise-sunset.org/", "GET", null, null, null, null, null);
+
+    HttpAPIConfig stockConfig = new HttpAPIConfig("https://api.worldtradingdata.com/api/v1/stock?symbol=SNAP,TWTR,VOD" +
+      ".L&api_token=zuHlu2vZaehdZN6GmJdTiVlp7xgZn6gl6sfgmI4G6TY4ej0NLOzvy0TUl4D4", "get", null, null, null, null, null);
+
+    HttpAPIConfig mockPostConfig = new HttpAPIConfig("http://localhost:8091/", "POST", headers, null, null, null,"key1=value1\nkey2=value2");
+
+    Map<String, HttpAPIConfig> configs = new HashMap<>();
+    configs.put("stock", stockConfig);
+    configs.put("sunrise", sunriseConfig);
+    configs.put("mock", mockConfig);
+    configs.put("mockpost", mockPostConfig);
+
+    HttpStoragePluginConfig mockStorageConfigWithWorkspace = new HttpStoragePluginConfig(false, configs, 2, "", 80, "", "", "");
+    mockStorageConfigWithWorkspace.setEnabled(true);
+    cluster.defineStoragePlugin("api", mockStorageConfigWithWorkspace);
+  }
+
+  @Test
+  public void verifyPluginConfig() throws Exception {
+    String sql = "SELECT SCHEMA_NAME, TYPE FROM INFORMATION_SCHEMA.`SCHEMATA` WHERE TYPE='http'";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("SCHEMA_NAME", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("TYPE", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("api.mock", "http")
+      .addRow("api.mockpost", "http")
+      .addRow("api.stock", "http")
+      .addRow("api.sunrise", "http")
+      .addRow("api", "http")
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  /**
+   * Evaluates the HTTP plugin with the results from an API that returns the
+   * sunrise/sunset times for a given lat/long and date. API documentation is
+   * available here: https://sunrise-sunset.org/api
+   *
+   * The API returns results in the following format:
+   * <pre><code>
+   * {
+   *       "results":
+   *       {
+   *         "sunrise":"7:27:02 AM",
+   *         "sunset":"5:05:55 PM",
+   *         "solar_noon":"12:16:28 PM",
+   *         "day_length":"9:38:53",
+   *         "civil_twilight_begin":"6:58:14 AM",
+   *         "civil_twilight_end":"5:34:43 PM",
+   *         "nautical_twilight_begin":"6:25:47 AM",
+   *         "nautical_twilight_end":"6:07:10 PM",
+   *         "astronomical_twilight_begin":"5:54:14 AM",
+   *         "astronomical_twilight_end":"6:38:43 PM"
+   *       },
+   *        "status":"OK"
+   *     }
+   * }</code></pre>
+   *
+   * @throws Exception
+   *           Throws exception if something goes awry
+   */
+  @Test
+  @Ignore("Requires Remote Server")
+  public void simpleStarQuery() throws Exception {
+    String sql = "SELECT * FROM api.sunrise.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addMap("results")
+      .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .resumeSchema()
+      .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow( mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+      .build();
+
+    int resultCount =  results.rowCount();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+
+    assertEquals(1,  resultCount);
+  }
+
+  @Test
+  @Ignore("Requires Remote Server")
+  public void simpleSpecificQuery() throws Exception {
+    String sql = "SELECT t1.results.sunrise AS sunrise, t1.results.sunset AS sunset FROM api.sunrise.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("6:13:58 AM", "5:59:55 PM")
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+   public void testSerDe() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+        new MockResponse().setResponseCode(200)
+          .setBody(TEST_JSON_RESPONSE)
+      );
+
+      String sql = "SELECT COUNT(*) FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+      String plan = queryBuilder().sql(sql).explainJson();
+      long cnt = queryBuilder().physical(plan).singletonLong();
+      assertEquals("Counts should match",1L, cnt);
+    }
+  }
+
+  @Test
+  public void simpleTestWithMockServer() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+        new MockResponse().setResponseCode(200)
+          .setBody(TEST_JSON_RESPONSE)
+      );
+
+      String sql = "SELECT * FROM api.mock.`json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .addMap("results")
+        .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .resumeSchema()
+        .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+        .build();
+
+      int resultCount =  results.rowCount();
+      new RowSetComparison(expected).verifyAndClearAll(results);
+
+      assertEquals(1,  resultCount);
+    }
+  }
+
+  @Test
+  public void testPostWithMockServer() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+        new MockResponse()
+          .setResponseCode(200)
+          .setBody(TEST_JSON_RESPONSE)
+      );
+
+      String sql = "SELECT * FROM api.mockPost.`json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .addMap("results")
+        .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .resumeSchema()
+        .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+        .build();
+
+      int resultCount =  results.rowCount();
+      new RowSetComparison(expected).verifyAndClearAll(results);
+
+      RecordedRequest recordedRequest = server.takeRequest();
+      assertEquals("POST", recordedRequest.getMethod());
+      assertEquals(recordedRequest.getHeader("header1"), "value1");
+      assertEquals(recordedRequest.getHeader("header2"), "value2");
+      assertEquals(1,  resultCount);
+    }
+  }
+
+  @Test
+  public void specificTestWithMockServer() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+        new MockResponse().setResponseCode(200)
+          .setBody(TEST_JSON_RESPONSE)
+      );
+
+      String sql = "SELECT t1.results.sunrise AS sunrise, t1.results.sunset AS sunset FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .buildSchema();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("6:13:58 AM", "5:59:55 PM")
+        .build();
+
+      new RowSetComparison(expected).verifyAndClearAll(results);
+    }
+  }
+
+  @Test
+  public void testSlowResponse() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+        new MockResponse().setResponseCode(200)
+          .setBody(TEST_JSON_RESPONSE)
+          .throttleBody(64, 4, TimeUnit.SECONDS)
+      );
+
+      String sql = "SELECT t1.results.sunrise AS sunrise, t1.results.sunset AS sunset FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
+
+      try {
+        client.queryBuilder().sql(sql).rowSet();
+        fail();
+      } catch (Exception e) {
+        assertTrue(e.getMessage().contains("DATA_READ ERROR: timeout"));
+      }
+    }
+  }
+
+  @Test
+  public void testZeroByteResponse() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+        new MockResponse().setResponseCode(200)
+          .setBody("")
+      );
+
+      String sql = "SELECT * FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+      assertNull(results);
+    }
+  }
+
+  // Note that, in this test, the response is not empty. Instead, the
+  // response has a single row with no columns.
+  @Test
+  public void testEmptyJSONObjectResponse() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+        new MockResponse().setResponseCode(200)
+          .setBody("{}")
+      );
+
+      String sql = "SELECT * FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .buildSchema();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow()
+        .build();
+
+      new RowSetComparison(expected).verifyAndClearAll(results);
+    }
+  }
+
+  @Test
+  public void testErrorResponse() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+        new MockResponse().setResponseCode(404)
+          .setBody("{}")
+      );
+
+      String sql = "SELECT * FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+
+      try {
+        client.queryBuilder().sql(sql).rowSet();
+        fail();
+      } catch (Exception e) {
+        assertTrue(e.getMessage().contains("DATA_READ ERROR: Error retrieving data from HTTP Storage Plugin: 404 Client Error"));
+      }
+    }
+  }
+
+  @Test
+  public void testHeaders() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+        new MockResponse().setResponseCode(200)
+          .setBody(TEST_JSON_RESPONSE)
+      );
+
+      String sql = "SELECT * FROM api.mock.`json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .addMap("results")
+        .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .resumeSchema()
+        .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow( mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+        .build();
+
+      int resultCount =  results.rowCount();
+      new RowSetComparison(expected).verifyAndClearAll(results);
+
+      assertEquals(1,  resultCount);
+
+      RecordedRequest request = server.takeRequest();
+      assertEquals("value1", request.getHeader("header1"));
+      assertEquals("value2", request.getHeader("header2"));
+      assertEquals("Basic dXNlcjpwYXNz", request.getHeader("Authorization"));
+    }
+  }
+
+  /**
+   * Helper function to convert files to a readable input steam.
+   * @param file The input file to be read
+   * @return A buffer to the file
+   * @throws IOException If the file is unreadable, throws an IOException
+   */
+  private Buffer fileToBytes(File file) throws IOException {
+    Buffer result = new Buffer();
+    result.writeAll(Okio.source(file));
+    return result;
+  }
+
+  /**
+   * Helper function to start the MockHTTPServer
+   * @return Started Mock server
+   * @throws IOException If the server cannot start, throws IOException
+   */
+  private MockWebServer startServer() throws IOException {
+    MockWebServer server = new MockWebServer();
+    server.start(MOCK_SERVER_PORT);
+    return server;
+  }
+}
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpProxy.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpProxy.java
new file mode 100644
index 0000000..f3eca88
--- /dev/null
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpProxy.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.http.util.HttpProxyConfig;
+import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyType;
+import org.apache.drill.test.BaseTest;
+import org.apache.drill.test.ConfigBuilder;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.typesafe.config.Config;
+
+public class TestHttpProxy extends BaseTest {
+
+  @Test
+  public void testBasics() {
+    HttpProxyConfig proxy = HttpProxyConfig.builder()
+        .type("socks")
+        .host(" foo.com ")
+        .port(1234)
+        .username(" bob ")
+        .password(" secret ")
+        .build();
+    assertEquals(ProxyType.SOCKS, proxy.type);
+    assertEquals("foo.com", proxy.host);
+    assertEquals(1234, proxy.port);
+    assertEquals("bob", proxy.username);
+    assertEquals("secret", proxy.password);
+  }
+
+  @Test
+  public void testURL() {
+    // See https://www.shellhacks.com/linux-proxy-server-settings-set-proxy-command-line/
+    HttpProxyConfig proxy = HttpProxyConfig.builder()
+        .url("http://bob:secret@foo.com:1234")
+        .build();
+    assertEquals(ProxyType.HTTP, proxy.type);
+    assertEquals("foo.com", proxy.host);
+    assertEquals(1234, proxy.port);
+    assertEquals("bob", proxy.username);
+    assertEquals("secret", proxy.password);
+  }
+
+  @Test
+  public void testURLAndConfig() {
+    HttpProxyConfig proxy = HttpProxyConfig.builder()
+        .url("http://foo.com:1234")
+        .username("bob")
+        .password("secret")
+        .build();
+    assertEquals(ProxyType.HTTP, proxy.type);
+    assertEquals("foo.com", proxy.host);
+    assertEquals(1234, proxy.port);
+    assertEquals("bob", proxy.username);
+    assertEquals("secret", proxy.password);
+  }
+
+  @Test
+  public void testNone() {
+    HttpProxyConfig proxy = HttpProxyConfig.builder()
+        .type("")
+        .host("foo.com")
+        .port(1234)
+        .username("bob")
+        .password("secret")
+        .build();
+    assertEquals(ProxyType.NONE, proxy.type);
+    assertNull(proxy.host);
+    assertNull(proxy.username);
+    assertNull(proxy.password);
+  }
+
+  @Test
+  public void testBlankType() {
+    HttpProxyConfig proxy = HttpProxyConfig.builder()
+        .type("  ")
+        .host("foo.com")
+        .port(1234)
+        .username("bob")
+        .password("secret")
+        .build();
+    assertEquals(ProxyType.NONE, proxy.type);
+    assertNull(proxy.host);
+    assertNull(proxy.username);
+    assertNull(proxy.password);
+  }
+
+  @Test
+  public void testBadType() {
+    HttpProxyConfig proxy = HttpProxyConfig.builder()
+        .type("bogus")
+        .host("foo.com")
+        .port(1234)
+        .username("bob")
+        .password("secret")
+        .build();
+    assertEquals(ProxyType.NONE, proxy.type);
+    assertNull(proxy.host);
+    assertNull(proxy.username);
+    assertNull(proxy.password);
+  }
+
+  @Test
+  public void testHttpConfig() {
+    Config config = new ConfigBuilder()
+        .put(ExecConstants.HTTP_PROXY_URL, "http://bob:secret@foo.com:1234")
+        .build();
+    HttpProxyConfig proxy = HttpProxyConfig.builder()
+        .fromHttpConfig(config)
+        .build();
+    assertEquals(ProxyType.HTTP, proxy.type);
+    assertEquals("foo.com", proxy.host);
+    assertEquals(1234, proxy.port);
+    assertEquals("bob", proxy.username);
+    assertEquals("secret", proxy.password);
+  }
+
+  @Test
+  public void testHttpUrlConfig() {
+    Config config = new ConfigBuilder()
+        .put(ExecConstants.HTTP_PROXY_URL, "")
+        .put(ExecConstants.HTTP_PROXY_TYPE, "socks")
+        .put(ExecConstants.HTTP_PROXY_HOST, "foo.com")
+        .put(ExecConstants.HTTP_PROXY_PORT, 1234)
+        .put(ExecConstants.HTTP_PROXY_USER_NAME, "bob")
+        .put(ExecConstants.HTTP_PROXY_PASSWORD, "secret")
+        .build();
+    HttpProxyConfig proxy = HttpProxyConfig.builder()
+        .fromHttpConfig(config)
+        .build();
+    assertEquals(ProxyType.SOCKS, proxy.type);
+    assertEquals("foo.com", proxy.host);
+    assertEquals(1234, proxy.port);
+    assertEquals("bob", proxy.username);
+    assertEquals("secret", proxy.password);
+  }
+
+  @Test
+  public void testHttpsUrlConfig() {
+    Config config = new ConfigBuilder()
+        .put(ExecConstants.HTTPS_PROXY_URL, "https://bob:secret@foo.com:1234")
+        .build();
+    HttpProxyConfig proxy = HttpProxyConfig.builder()
+        .fromHttpsConfig(config)
+        .build();
+    assertEquals(ProxyType.HTTP, proxy.type);
+    assertEquals("foo.com", proxy.host);
+    assertEquals(1234, proxy.port);
+    assertEquals("bob", proxy.username);
+    assertEquals("secret", proxy.password);
+  }
+
+  @Test
+  public void testHttpsConfig() {
+    Config config = new ConfigBuilder()
+        .put(ExecConstants.HTTPS_PROXY_URL, "")
+        .put(ExecConstants.HTTPS_PROXY_TYPE, "socks")
+        .put(ExecConstants.HTTPS_PROXY_HOST, "foo.com")
+        .put(ExecConstants.HTTPS_PROXY_PORT, 1234)
+        .put(ExecConstants.HTTPS_PROXY_USER_NAME, "bob")
+        .put(ExecConstants.HTTPS_PROXY_PASSWORD, "secret")
+        .build();
+    HttpProxyConfig proxy = HttpProxyConfig.builder()
+        .fromHttpsConfig(config)
+        .build();
+    assertEquals(ProxyType.SOCKS, proxy.type);
+    assertEquals("foo.com", proxy.host);
+    assertEquals(1234, proxy.port);
+    assertEquals("bob", proxy.username);
+    assertEquals("secret", proxy.password);
+  }
+
+  @Test
+  public void testConfigForUrl() {
+    Config config = new ConfigBuilder()
+        .put(ExecConstants.HTTP_PROXY_URL, "http://bob:secret@foo.com:1234")
+        .put(ExecConstants.HTTPS_PROXY_URL, "http://alice:s3cr3t@bar.com:2345")
+        .build();
+    doTestConfigForUrl(config);
+  }
+
+  private void doTestConfigForUrl(Config config) {
+    HttpProxyConfig proxy = HttpProxyConfig.builder()
+        .fromConfigForURL(config, "http://google.com")
+        .build();
+    assertEquals(ProxyType.HTTP, proxy.type);
+    assertEquals("foo.com", proxy.host);
+    assertEquals(1234, proxy.port);
+    assertEquals("bob", proxy.username);
+    assertEquals("secret", proxy.password);
+
+    proxy = HttpProxyConfig.builder()
+        .fromConfigForURL(config, "https://google.com")
+        .build();
+    assertEquals(ProxyType.HTTP, proxy.type);
+    assertEquals("bar.com", proxy.host);
+    assertEquals(2345, proxy.port);
+    assertEquals("alice", proxy.username);
+    assertEquals("s3cr3t", proxy.password);
+  }
+
+  // To run this test, set two env vars in your run/debug
+  // configuration, then comment out the @Ignore:
+  // http_proxy=http://bob:secret@foo.com:1234
+  // https_proxy=http://alice:s3cr3t@bar.com:2345
+  @Test
+  @Ignore("Requires manual setup")
+  public void testEnvVar() {
+    Config config = new ConfigBuilder()
+        .build();
+    doTestConfigForUrl(config);
+  }
+}
diff --git a/contrib/storage-http/src/test/resources/data/response.json b/contrib/storage-http/src/test/resources/data/response.json
new file mode 100644
index 0000000..bde2d5b
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/response.json
@@ -0,0 +1,14 @@
+{"results":
+  {"sunrise":"6:13:58 AM",
+    "sunset":"5:59:55 PM",
+    "solar_noon":"12:06:56 PM",
+    "day_length":"11:45:57",
+    "civil_twilight_begin":"5:48:14 AM",
+    "civil_twilight_end":"6:25:38 PM",
+    "nautical_twilight_begin":"5:18:16 AM",
+    "nautical_twilight_end":"6:55:36 PM",
+    "astronomical_twilight_begin":"4:48:07 AM",
+    "astronomical_twilight_end":"7:25:45 PM"
+  },
+  "status":"OK"
+}
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
index 35c974d..fea8197 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
@@ -37,7 +37,7 @@ public class KuduStoragePlugin extends AbstractStoragePlugin {
   private final KuduClient client;
 
   public KuduStoragePlugin(KuduStoragePluginConfig configuration, DrillbitContext context, String name)
-      throws IOException {
+    throws IOException {
     super(context, name);
     this.schemaFactory = new KuduSchemaFactory(this, name);
     this.engineConfig = configuration;
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 420e14e..24c14a3 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -288,6 +288,11 @@
         </dependency>
         <dependency>
           <groupId>org.apache.drill.contrib</groupId>
+          <artifactId>drill-storage-http</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.drill.contrib</groupId>
           <artifactId>drill-opentsdb-storage</artifactId>
           <version>${project.version}</version>
         </dependency>
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index 2eb50fd..fefbac4 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -49,6 +49,7 @@
         <include>org.apache.drill.contrib:drill-jdbc-storage:jar</include>
         <include>org.apache.drill.contrib:drill-kudu-storage:jar</include>
         <include>org.apache.drill.contrib:drill-storage-kafka:jar</include>
+        <include>org.apache.drill.contrib:drill-storage-http:jar</include>
         <include>org.apache.drill.contrib:drill-opentsdb-storage:jar</include>
         <include>org.apache.drill.contrib:drill-udfs:jar</include>
       </includes>
diff --git a/distribution/src/main/resources/drill-override-example.conf b/distribution/src/main/resources/drill-override-example.conf
index 6fcca37..034307b 100644
--- a/distribution/src/main/resources/drill-override-example.conf
+++ b/distribution/src/main/resources/drill-override-example.conf
@@ -342,6 +342,37 @@ drill.exec: {
     #ssl provider. May be "JDK" or "OPENSSL". Default is "JDK"
     provider: "JDK"
   }
+
+  # HTTP client proxy configuration
+  net_proxy: {
+
+    # HTTP URL. Omit if from a Linux env var
+    # See https://www.shellhacks.com/linux-proxy-server-settings-set-proxy-command-line/
+    http_url: "",
+
+    # Explicit HTTP setup, used if URL is not set
+    http: {
+      type: "none", # none, http, socks. Blank same as none.
+      host: "",
+      port: 80,
+      user_name: "",
+      password: ""
+    },
+
+    # HTTPS URL. Omit if from a Linux env var
+    https_url: "",
+
+    # Explicit HTTPS setup, used if URL is not set
+    https: {
+      type: "none", # none, http, socks. Blank same as none.
+      host: "",
+      port: 80,
+      user_name: "",
+      password: ""
+    }
+  }
+}
+
 },
 
 drill.metrics : {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 3a16353..027ebb5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -1211,4 +1211,21 @@ public final class ExecConstants {
           ENABLE_DYNAMIC_CREDIT_BASED_FC, new OptionDescription("Enable dynamic credit based flow control.This feature allows " +
           "the sender to send out its data more rapidly, but you should know that it has a risk to OOM when the system is solving parallel " +
           "large queries until we have a more accurate resource manager."));
+
+  // HTTP proxy configuration (Drill config)
+  public static final String NET_PROXY_BASE = "drill.exec.net_proxy";
+  // HTTP proxy config
+  public static final String HTTP_PROXY_URL = NET_PROXY_BASE + ".http_url";
+  public static final String HTTP_PROXY_TYPE = NET_PROXY_BASE + ".http.type";
+  public static final String HTTP_PROXY_HOST = NET_PROXY_BASE + ".http.host";
+  public static final String HTTP_PROXY_PORT = NET_PROXY_BASE + ".http.port";
+  public static final String HTTP_PROXY_USER_NAME = NET_PROXY_BASE + ".http.user_name";
+  public static final String HTTP_PROXY_PASSWORD = NET_PROXY_BASE + ".http.password";
+  // HTTPS proxy config
+  public static final String HTTPS_PROXY_URL = NET_PROXY_BASE + ".https_url";
+  public static final String HTTPS_PROXY_TYPE = NET_PROXY_BASE + ".https.type";
+  public static final String HTTPS_PROXY_HOST = NET_PROXY_BASE + ".https.host";
+  public static final String HTTPS_PROXY_PORT = NET_PROXY_BASE + ".https.port";
+  public static final String HTTPS_PROXY_USER_NAME = NET_PROXY_BASE + ".https.user_name";
+  public static final String HTTPS_PROXY_PASSWORD = NET_PROXY_BASE + ".https.password";
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 0ab4181..731bf2d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -70,6 +70,7 @@ public class JSONRecordReader extends AbstractRecordReader {
   private final boolean skipMalformedJSONRecords;
   private final boolean printSkippedMalformedJSONRecordLineNumber;
   private ReadState write;
+  private InputStream inputStream;
 
   /**
    * Create a JSON Record Reader that uses a file based input stream.
@@ -81,7 +82,7 @@ public class JSONRecordReader extends AbstractRecordReader {
    */
   public JSONRecordReader(FragmentContext fragmentContext, Path inputPath, DrillFileSystem fileSystem,
       List<SchemaPath> columns) throws OutOfMemoryException {
-    this(fragmentContext, inputPath, null, fileSystem, columns);
+    this(fragmentContext, inputPath, null, fileSystem, columns, false);
   }
 
   /**
@@ -94,16 +95,28 @@ public class JSONRecordReader extends AbstractRecordReader {
    */
   public JSONRecordReader(FragmentContext fragmentContext, JsonNode embeddedContent, DrillFileSystem fileSystem,
       List<SchemaPath> columns) throws OutOfMemoryException {
-    this(fragmentContext, null, embeddedContent, fileSystem, columns);
+    this(fragmentContext, null, embeddedContent, fileSystem, columns, false);
+  }
+
+  /**
+   * Create a JSON Record Reader that uses an InputStream directly
+   * @param fragmentContext The Drill Fragmement
+   * @param inputStream The inputStream from which data will be received
+   * @param columns  pathnames of columns/subfields to read
+   * @throws OutOfMemoryException
+   */
+  public JSONRecordReader(FragmentContext fragmentContext, List<SchemaPath> columns) throws OutOfMemoryException {
+    this(fragmentContext, null, null, null, columns, true);
   }
 
   private JSONRecordReader(FragmentContext fragmentContext, Path inputPath, JsonNode embeddedContent,
-      DrillFileSystem fileSystem, List<SchemaPath> columns) {
+      DrillFileSystem fileSystem, List<SchemaPath> columns, boolean hasInputStream) {
 
     Preconditions.checkArgument(
-        (inputPath == null && embeddedContent != null) ||
-        (inputPath != null && embeddedContent == null),
-        "One of inputPath or embeddedContent must be set but not both."
+        (inputPath == null && embeddedContent != null && !hasInputStream) ||
+        (inputPath != null && embeddedContent == null && !hasInputStream) ||
+          (inputPath == null && embeddedContent == null && hasInputStream),
+      "One of inputPath, inputStream or embeddedContent must be set but not all."
         );
 
     if (inputPath != null) {
@@ -170,6 +183,8 @@ public class JSONRecordReader extends AbstractRecordReader {
   private void setupParser() throws IOException {
     if (hadoopPath != null) {
       jsonReader.setSource(stream);
+    } else if (inputStream!= null) {
+      jsonReader.setSource(inputStream);
     } else {
       jsonReader.setSource(embeddedContent);
     }
@@ -253,11 +268,20 @@ public class JSONRecordReader extends AbstractRecordReader {
     runningRecordCount += recordCount;
   }
 
+  public void setInputStream(InputStream in) {
+    this.inputStream = in;
+  }
+
   @Override
   public void close() throws Exception {
     if (stream != null) {
       stream.close();
       stream = null;
     }
+
+    if (inputStream != null) {
+      inputStream.close();
+      inputStream = null;
+    }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 4dd146e..1ce89fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -63,7 +63,7 @@ public class JsonReader extends BaseJsonReader {
 
   private final FieldSelection selection;
 
-  private JsonReader(Builder builder) {
+  public JsonReader(Builder builder) {
     super(builder.managedBuf, builder.enableNanInf, builder.enableEscapeAnyChar, builder.skipOuterList);
     selection = FieldSelection.getFieldSelection(builder.columns);
     workingBuffer = builder.workingBuffer;
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 46ae53a..01f454d 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -458,10 +458,49 @@ drill.exec: {
   # to have a grace period that is atleast twice the amount of zookeeper
   # refresh time.
   grace_period_ms : 0,
+
   # port hunting for drillbits. Enabled only for testing purposes.
   port_hunt : false,
+
   # Allow drillbit to bind to loopback address in distributed mode. Enabled only for testing purposes.
   allow_loopback_address_binding : false
+
+  # HTTP client proxy configuration
+  net_proxy: {
+
+    # HTTP URL, usually from a Linux env var
+    # See https://www.shellhacks.com/linux-proxy-server-settings-set-proxy-command-line/
+    http_url: "",
+    http_url: ${?HTTP_PROXY},
+    http_url: ${?http_proxy},
+    http_url: ${?all_proxy},
+    http_url: ${?ALL_PROXY},
+
+    # Explicit HTTP setup, used if URL is not set
+    http: {
+      type: "none", # none, http, socks. Blank same as none.
+      host: "",
+      port: 80,
+      user_name: "",
+      password: ""
+    },
+
+    # HTTPS URL, usually from a Linux env var
+    https_url: "",
+    https_url: ${?HTTPS_PROXY},
+    https_url: ${?https_proxy},
+    https_url: ${?all_proxy},
+    https_url: ${?ALL_PROXY},
+
+    # Explicit HTTPS setup, used if URL is not set
+    https: {
+      type: "none", # none, http, socks. Blank same as none.
+      host: "",
+      port: 80,
+      user_name: "",
+      password: ""
+    }
+  }
 }
 
 drill.jdbc: {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserBasics.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserBasics.java
index d51e1a8..f8006cc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserBasics.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserBasics.java
@@ -109,7 +109,7 @@ public class TestJsonParserBasics extends BaseTestJsonParser {
   @Test
   public void testExtendedFloat() {
     final String json =
-        "{a: NaN} {a: Infinity} {a: -Infinity}";
+      "{a: NaN} {a: Infinity} {a: -Infinity}";
     JsonParserFixture fixture = new JsonParserFixture();
     fixture.options.allowNanInf = true;
     fixture.open(json);
@@ -158,8 +158,8 @@ public class TestJsonParserBasics extends BaseTestJsonParser {
   public void testRootTuple() {
     final String json =
       "{id: 1, name: \"Fred\", balance: 100.0}\n" +
-      "{id: 2, name: \"Barney\"}\n" +
-      "{id: 3, name: \"Wilma\", balance: 500.00}";
+        "{id: 2, name: \"Barney\"}\n" +
+        "{id: 3, name: \"Wilma\", balance: 500.00}";
     JsonParserFixture fixture = new JsonParserFixture();
     fixture.open(json);
     assertEquals(3, fixture.read());
@@ -235,7 +235,7 @@ public class TestJsonParserBasics extends BaseTestJsonParser {
   @Test
   public void testProjection() {
     final String json =
-        "{a: 1, b: [[{x: [[{y: []}]]}]]}\n" +
+      "{a: 1, b: [[{x: [[{y: []}]]}]]}\n" +
         "{a: 2}\n" +
         "{b: \"bar\"}";
     JsonParserFixture fixture = new JsonParserFixture();
@@ -254,13 +254,13 @@ public class TestJsonParserBasics extends BaseTestJsonParser {
   @Test
   public void testAllTextMode() {
     final String json =
-        "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}";
+      "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}";
     JsonParserFixture fixture = new JsonParserFixture();
     fixture.options.allTextMode = true;
     fixture.open(json);
 
     fixture.expect("a",
-        new Object[] {"1", "foo", "true", "20.5", null});
+      new Object[] {"1", "foo", "true", "20.5", null});
     assertFalse(fixture.next());
     fixture.close();
   }
@@ -268,13 +268,13 @@ public class TestJsonParserBasics extends BaseTestJsonParser {
   @Test
   public void testColumnTextMode() {
     final String json =
-        "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}";
+      "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}";
     JsonParserFixture fixture = new JsonParserFixture();
     fixture.rootObject.fieldType = FieldType.TEXT;
     fixture.open(json);
 
     fixture.expect("a",
-        new Object[] {"1", "foo", "true", "20.5", null});
+      new Object[] {"1", "foo", "true", "20.5", null});
     assertFalse(fixture.next());
     fixture.close();
   }
@@ -282,13 +282,13 @@ public class TestJsonParserBasics extends BaseTestJsonParser {
   @Test
   public void testJsonModeScalars() {
     final String json =
-        "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}";
+      "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}";
     JsonParserFixture fixture = new JsonParserFixture();
     fixture.rootObject.fieldType = FieldType.JSON;
     fixture.open(json);
 
     fixture.expect("a",
-        new Object[] {"1", "\"foo\"", "true", "20.5", "null"});
+      new Object[] {"1", "\"foo\"", "true", "20.5", "null"});
     assertFalse(fixture.next());
     fixture.close();
   }
@@ -296,15 +296,15 @@ public class TestJsonParserBasics extends BaseTestJsonParser {
   @Test
   public void testJsonModeArrays() {
     final String json =
-        "{a: []} {a: [null]} {a: [null, null]} {a: [[]]}\n" +
+      "{a: []} {a: [null]} {a: [null, null]} {a: [[]]}\n" +
         "{a: [1, \"foo\", true]} {a: [[1, 2], [3, 4]]}\n";
     JsonParserFixture fixture = new JsonParserFixture();
     fixture.rootObject.fieldType = FieldType.JSON;
     fixture.open(json);
 
     fixture.expect("a",
-        new Object[] {"[]", "[null]", "[null, null]", "[[]]",
-            "[1, \"foo\", true]", "[[1, 2], [3, 4]]"});
+      new Object[] {"[]", "[null]", "[null, null]", "[[]]",
+        "[1, \"foo\", true]", "[[1, 2], [3, 4]]"});
     assertFalse(fixture.next());
     fixture.close();
   }
@@ -312,15 +312,15 @@ public class TestJsonParserBasics extends BaseTestJsonParser {
   @Test
   public void testJsonModeObjects() {
     final String json =
-        "{a: {}} {a: {b: null}} {a: {b: null, b: null}}\n" +
+      "{a: {}} {a: {b: null}} {a: {b: null, b: null}}\n" +
         "{a: {b: {c: {d: [{e: 10}, null, 20], f: \"foo\"}, g:30}, h: 40}}\n";
     JsonParserFixture fixture = new JsonParserFixture();
     fixture.rootObject.fieldType = FieldType.JSON;
     fixture.open(json);
 
     fixture.expect("a",
-        new Object[] {"{}", "{\"b\": null}", "{\"b\": null, \"b\": null}",
-            "{\"b\": {\"c\": {\"d\": [{\"e\": 10}, null, 20], \"f\": \"foo\"}, \"g\": 30}, \"h\": 40}"});
+      new Object[] {"{}", "{\"b\": null}", "{\"b\": null, \"b\": null}",
+        "{\"b\": {\"c\": {\"d\": [{\"e\": 10}, null, 20], \"f\": \"foo\"}, \"g\": 30}, \"h\": 40}"});
     assertFalse(fixture.next());
     fixture.close();
   }
diff --git a/logical/src/main/java/org/apache/drill/common/expression/SchemaPath.java b/logical/src/main/java/org/apache/drill/common/expression/SchemaPath.java
index 0b5292d..1f4cfee 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/SchemaPath.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/SchemaPath.java
@@ -322,6 +322,27 @@ public class SchemaPath extends LogicalExpressionBase {
     return rootSegment;
   }
 
+  public String getAsUnescapedPath() {
+    StringBuilder sb = new StringBuilder();
+    PathSegment seg = getRootSegment();
+    if (seg.isArray()) {
+      throw new IllegalStateException("Drill doesn't currently support top level arrays");
+    }
+    sb.append(seg.getNameSegment().getPath());
+
+    while ( (seg = seg.getChild()) != null) {
+      if (seg.isNamed()) {
+        sb.append('.');
+        sb.append(seg.getNameSegment().getPath());
+      } else {
+        sb.append('[');
+        sb.append(seg.getArraySegment().getIndex());
+        sb.append(']');
+      }
+    }
+    return sb.toString();
+  }
+
   @Override
   public MajorType getMajorType() {
     return Types.LATE_BIND_TYPE;
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 2583595..af148ef 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -667,6 +667,10 @@ public final class UserBitShared {
      * <code>METADATA_CONTROLLER = 67;</code>
      */
     METADATA_CONTROLLER(67),
+    /**
+     * <code>HTTP_SUB_SCAN = 70;</code>
+     */
+    HTTP_SUB_SCAN(70),
     ;
 
     /**
@@ -941,6 +945,10 @@ public final class UserBitShared {
      * <code>METADATA_CONTROLLER = 67;</code>
      */
     public static final int METADATA_CONTROLLER_VALUE = 67;
+    /**
+     * <code>HTTP_SUB_SCAN = 70;</code>
+     */
+    public static final int HTTP_SUB_SCAN_VALUE = 70;
 
 
     public final int getNumber() {
@@ -1025,6 +1033,7 @@ public final class UserBitShared {
         case 65: return SHP_SUB_SCAN;
         case 66: return METADATA_HANDLER;
         case 67: return METADATA_CONTROLLER;
+        case 70: return HTTP_SUB_SCAN;
         default: return null;
       }
     }
@@ -27924,7 +27933,7 @@ public final class UserBitShared {
       "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000" +
       "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014" +
       "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022" +
-      "\032\n\026CANCELLATION_REQUESTED\020\006*\344\n\n\020CoreOper" +
+      "\032\n\026CANCELLATION_REQUESTED\020\006*\367\n\n\020CoreOper" +
       "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST" +
       "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020" +
       "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH" +
@@ -27959,11 +27968,11 @@ public final class UserBitShared {
       "MERGE\020=\022\021\n\rLTSV_SUB_SCAN\020>\022\021\n\rHDF5_SUB_S" +
       "CAN\020?\022\022\n\016EXCEL_SUB_SCAN\020@\022\020\n\014SHP_SUB_SCA" +
       "N\020A\022\024\n\020METADATA_HANDLER\020B\022\027\n\023METADATA_CO" +
-      "NTROLLER\020C*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN" +
-      "\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002" +
-      "\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033o" +
-      "rg.apache.drill.exec.protoB\rUserBitShare" +
-      "dH\001"
+      "NTROLLER\020C\022\021\n\rHTTP_SUB_SCAN\020F*g\n\nSaslSta" +
+      "tus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n" +
+      "\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n" +
+      "\013SASL_FAILED\020\004B.\n\033org.apache.drill.exec." +
+      "protoB\rUserBitSharedH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
         new com.google.protobuf.Descriptors.FileDescriptor.    InternalDescriptorAssigner() {
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 55cfde5..c51cc66 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -379,6 +379,7 @@ enum CoreOperatorType {
   SHP_SUB_SCAN = 65;
   METADATA_HANDLER = 66;
   METADATA_CONTROLLER = 67;
+  HTTP_SUB_SCAN = 70;
 }
 
 /* Registry that contains list of jars, each jar contains its name and list of function signatures.