You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2020/04/14 02:33:00 UTC
[drill] branch master updated: DRILL-7437: Storage Plugin for
Generic HTTP REST API
This is an automated email from the ASF dual-hosted git repository.
progers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 4d47a61 DRILL-7437: Storage Plugin for Generic HTTP REST API
4d47a61 is described below
commit 4d47a61aefa77004f7541e59820c68a8ed8bad80
Author: Charles Givre <cg...@apache.org>
AuthorDate: Mon Apr 13 10:16:36 2020 -0400
DRILL-7437: Storage Plugin for Generic HTTP REST API
---
common/pom.xml | 2 -
.../native/client/src/protobuf/UserBitShared.pb.cc | 15 +-
.../native/client/src/protobuf/UserBitShared.pb.h | 5 +-
contrib/pom.xml | 1 +
contrib/storage-http/README.md | 301 +++++++++++++
contrib/storage-http/images/issue_count.png | Bin 0 -> 50256 bytes
contrib/storage-http/pom.xml | 101 +++++
.../drill/exec/store/http/HttpAPIConfig.java | 165 +++++++
.../exec/store/http/HttpAPIConnectionSchema.java | 83 ++++
.../drill/exec/store/http/HttpBatchReader.java | 97 +++++
.../drill/exec/store/http/HttpGroupScan.java | 168 ++++++++
.../exec/store/http/HttpScanBatchCreator.java | 104 +++++
.../apache/drill/exec/store/http/HttpScanSpec.java | 72 ++++
.../drill/exec/store/http/HttpSchemaFactory.java | 101 +++++
.../drill/exec/store/http/HttpStoragePlugin.java} | 54 +--
.../exec/store/http/HttpStoragePluginConfig.java | 170 ++++++++
.../apache/drill/exec/store/http/HttpSubScan.java | 130 ++++++
.../exec/store/http/util/HttpProxyConfig.java | 220 ++++++++++
.../drill/exec/store/http/util/SimpleHttp.java | 267 ++++++++++++
.../main/resources/bootstrap-storage-plugins.json | 9 +
.../src/main/resources/drill-module.conf | 27 ++
.../drill/exec/store/http/TestHttpPlugin.java | 479 +++++++++++++++++++++
.../drill/exec/store/http/TestHttpProxy.java | 233 ++++++++++
.../src/test/resources/data/response.json | 14 +
.../drill/exec/store/kudu/KuduStoragePlugin.java | 2 +-
distribution/pom.xml | 5 +
distribution/src/assemble/component.xml | 1 +
.../src/main/resources/drill-override-example.conf | 31 ++
.../java/org/apache/drill/exec/ExecConstants.java | 17 +
.../exec/store/easy/json/JSONRecordReader.java | 36 +-
.../drill/exec/vector/complex/fn/JsonReader.java | 2 +-
.../java-exec/src/main/resources/drill-module.conf | 39 ++
.../easy/json/parser/TestJsonParserBasics.java | 32 +-
.../apache/drill/common/expression/SchemaPath.java | 21 +
.../org/apache/drill/exec/proto/UserBitShared.java | 21 +-
protocol/src/main/protobuf/UserBitShared.proto | 1 +
36 files changed, 2948 insertions(+), 78 deletions(-)
diff --git a/common/pom.xml b/common/pom.xml
index 889275e..8198be6 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -37,14 +37,12 @@
<artifactId>drill-protocol</artifactId>
<version>${project.version}</version>
</dependency>
-
<dependency>
<!-- add as provided scope so that we can compile TestTools. Should only be ever used in a test scenario where someone else is bringing JUnit in. -->
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
-
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
index 5f2186f..9ce65c2 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
@@ -1034,7 +1034,7 @@ void AddDescriptorsImpl() {
"ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000"
"\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014"
"\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022"
- "\032\n\026CANCELLATION_REQUESTED\020\006*\344\n\n\020CoreOper"
+ "\032\n\026CANCELLATION_REQUESTED\020\006*\367\n\n\020CoreOper"
"atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST"
"_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020"
"\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH"
@@ -1069,14 +1069,14 @@ void AddDescriptorsImpl() {
"MERGE\020=\022\021\n\rLTSV_SUB_SCAN\020>\022\021\n\rHDF5_SUB_S"
"CAN\020\?\022\022\n\016EXCEL_SUB_SCAN\020@\022\020\n\014SHP_SUB_SCA"
"N\020A\022\024\n\020METADATA_HANDLER\020B\022\027\n\023METADATA_CO"
- "NTROLLER\020C*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN"
- "\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002"
- "\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033o"
- "rg.apache.drill.exec.protoB\rUserBitShare"
- "dH\001"
+ "NTROLLER\020C\022\021\n\rHTTP_SUB_SCAN\020F*g\n\nSaslSta"
+ "tus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n"
+ "\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n"
+ "\013SASL_FAILED\020\004B.\n\033org.apache.drill.exec."
+ "protoB\rUserBitSharedH\001"
};
::google::protobuf::DescriptorPool::InternalAddGeneratedFile(
- descriptor, 5763);
+ descriptor, 5782);
::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
"UserBitShared.proto", &protobuf_RegisterTypes);
::protobuf_Types_2eproto::AddDescriptors();
@@ -1323,6 +1323,7 @@ bool CoreOperatorType_IsValid(int value) {
case 65:
case 66:
case 67:
+ case 70:
return true;
default:
return false;
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h b/contrib/native/client/src/protobuf/UserBitShared.pb.h
index 0aa41cf..d5483bc 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.h
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h
@@ -358,11 +358,12 @@ enum CoreOperatorType {
EXCEL_SUB_SCAN = 64,
SHP_SUB_SCAN = 65,
METADATA_HANDLER = 66,
- METADATA_CONTROLLER = 67
+ METADATA_CONTROLLER = 67,
+ HTTP_SUB_SCAN = 70
};
bool CoreOperatorType_IsValid(int value);
const CoreOperatorType CoreOperatorType_MIN = SINGLE_SENDER;
-const CoreOperatorType CoreOperatorType_MAX = METADATA_CONTROLLER;
+const CoreOperatorType CoreOperatorType_MAX = HTTP_SUB_SCAN;
const int CoreOperatorType_ARRAYSIZE = CoreOperatorType_MAX + 1;
const ::google::protobuf::EnumDescriptor* CoreOperatorType_descriptor();
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 8f81d48..15f3dd5 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -54,6 +54,7 @@
<module>storage-kafka</module>
<module>storage-kudu</module>
<module>storage-opentsdb</module>
+ <module>storage-http</module>
</modules>
</project>
diff --git a/contrib/storage-http/README.md b/contrib/storage-http/README.md
new file mode 100644
index 0000000..3b9965c
--- /dev/null
+++ b/contrib/storage-http/README.md
@@ -0,0 +1,301 @@
+# Generic REST API Storage Plugin
+
+This plugin is intended to enable you to query APIs over HTTP/REST. At this point, the API reader will only accept JSON as input however in the future, it may be possible to
+ add additional format readers to allow for APIs which return XML, CSV or other formats.
+
+Note: This plugin should **NOT** be used for interacting with tools which have REST APIs such as Splunk or Solr. It will not be performant for those use cases.
+
+## Configuration
+
+To configure the plugin, create a new storage plugin, and add the following configuration options which apply to ALL connections defined in this plugin:
+
+```json
+{
+ "type": "http",
+ "cacheResults": true,
+ "connections": {},
+ "timeout": 0,
+ "proxyHost": null,
+ "proxyPort": 0,
+ "proxyType": null,
+ "proxyUsername": null,
+ "proxyPassword": null,
+ "enabled": true
+}
+```
+The required options are:
+* `type`: This should be `http`
+* `cacheResults`: Enable caching of the HTTP responses. Defaults to `false`
+* `timeout`: Sets the response timeout in seconds. Defaults to `0` which is no timeout.
+* `connections`: This field contains the details for individual connections. See the section *Configuring API Connections for Details*.
+
+You can configure Drill to work behind a corporate proxy. Details are listed below.
+
+### Configuring the API Connections
+
+The HTTP Storage plugin allows you to configure multiple APIS which you can query directly from this plugin. To do so, first add a `connections` parameter to the configuration
+. Next give the connection a name, which will be used in queries. For instance `stockAPI` or `jira`.
+
+The `connection` can accept the following options:
+* `url`: The base URL which Drill will query. You should include the ending slash if there are additional arguments which you are passing.
+* `method`: The request method. Must be `get` or `post`. Other methods are not allowed and will default to `GET`.
+* `headers`: Often APIs will require custom headers as part of the authentication. This field allows you to define key/value pairs which are submitted with the http request
+. The format is:
+```json
+headers: {
+ "key1": "Value1",
+ "key2": "Value2"
+}
+```
+* `authType`: If your API requires authentication, specify the authentication type. At the time of implementation, the plugin only supports basic authentication, however, the
+ plugin will likely support OAUTH2 in the future. Defaults to `none`. If the `authType` is set to `basic`, `username` and `password` must be set in the configuration as well.
+ * `username`: The username for basic authentication.
+ * `password`: The password for basic authentication.
+ * `postBody`: Contains data, in the form of key value pairs, which are sent during a `POST` request. Post body should be in the form:
+ ```
+key1=value1
+key2=value2
+```
+
+## Usage
+
+This plugin is different from other plugins in that it the table component of the `FROM` clause is different. In normal Drill queries, the `FROM` clause is constructed as follows:
+```sql
+FROM <storage plugin>.<schema>.<table>
+```
+For example, you might have:
+```sql
+FROM dfs.test.`somefile.csv`
+
+-- or
+
+FROM mongo.stats.sales_data
+```
+
+The HTTP/REST plugin the `FROM` clause enables you to pass arguments to your REST call. The structure is:
+```sql
+FROM <plugin>.<connection>.<arguments>
+--Actual example:
+ FROM http.sunrise.`/json?lat=36.7201600&lng=-4.4203400&date=today`
+```
+
+## Proxy Setup
+
+Some users access HTTP services from behind a proxy firewall. Drill provides three ways specify proxy
+configuration.
+
+### Proxy Environment Variables
+
+Drill recognizes the usual Linux proxy environment variables:
+
+* `http_proxy`, `HTTP_PROXY`
+* `https_proxy`, `HTTP_PROXY`
+* `all_proxy`, `ALL_PROXY`
+
+This technique works well if your system is already configured to
+handle proxies.
+
+### Boot Configuration
+
+You can also specify proxy configuration in the `drill-override.conf` file.
+See `drill-override-example.conf` for a template.
+
+First, you can use the same form of URL you would use with the environment
+variables:
+
+```
+drill.exec.net_proxy.http_url: "http://foo.com/1234"
+```
+
+There is one setting for HTTP, another for HTTPS.
+
+Alternatively, you can specify each field separately:
+
+```
+drill.exec.net_proxy.http: {
+ type: "none", # none, http, socks. Blank same as none.
+ host: "",
+ port: 80,
+ user_name: "",
+ password: ""
+ },
+```
+
+The valid proxy types are `none`, `http` and `socks`. Blank is the same
+as `none`.
+
+Again, there is a parallel section for HTTPS.
+
+Either of these approaches is preferred if the proxy is an attribute of your
+network environment and is the same for all external HTTP/HTTPS requests.
+
+### In the HTTP Storage Plugin Config
+
+The final way to configure proxy is in the HTTP storage plugin itself. The proxy
+applies to all connections defined in that plugin. Use this approach if the proxy
+applies only to some external services, or if each service has a different proxy
+(defined by creating a separate plugin config for each service.)
+
+```json
+ proxy_type: "direct",
+ proxy_host: "",
+ proxy_port: 80,
+ proxy_user_name: "",
+ proxy_password: ""
+```
+
+The valid proxy types are `direct`, `http` or `socks`. Blank is the same
+as `direct`.
+
+## Examples
+
+### Example 1: Reference Data, A Sunrise/Sunset API
+
+The API sunrise-sunset.org returns data in the following format:
+
+ ```json
+ "results":
+ {
+ "sunrise":"7:27:02 AM",
+ "sunset":"5:05:55 PM",
+ "solar_noon":"12:16:28 PM",
+ "day_length":"9:38:53",
+ "civil_twilight_begin":"6:58:14 AM",
+ "civil_twilight_end":"5:34:43 PM",
+ "nautical_twilight_begin":"6:25:47 AM",
+ "nautical_twilight_end":"6:07:10 PM",
+ "astronomical_twilight_begin":"5:54:14 AM",
+ "astronomical_twilight_end":"6:38:43 PM"
+ },
+ "status":"OK"
+}
+```
+To query this API, set the configuration as follows:
+
+```json
+
+ {
+ "type": "http",
+ "cacheResults": false,
+ "enabled": true,
+ "timeout": 5,
+ "connections": {
+ "sunrise": {
+ "url": "https://api.sunrise-sunset.org/",
+ "method": "GET",
+ "headers": null,
+ "authType": "none",
+ "userName": null,
+ "password": null,
+ "postBody": null
+ }
+ }
+}
+
+```
+Then, to execute a query:
+```sql
+ SELECT api_results.results.sunrise AS sunrise,
+ api_results.results.sunset AS sunset
+ FROM http.sunrise.`/json?lat=36.7201600&lng=-4.4203400&date=today` AS api_results;
+```
+Which yields the following results:
+```
++------------+------------+
+| sunrise | sunset |
++------------+------------+
+| 7:17:46 AM | 5:01:33 PM |
++------------+------------+
+1 row selected (0.632 seconds)
+```
+
+### Example 2: JIRA
+
+JIRA Cloud has a REST API which is [documented here](https://developer.atlassian.com/cloud/jira/platform/rest/v3/?utm_source=%2Fcloud%2Fjira%2Fplatform%2Frest%2F&utm_medium=302).
+
+To connect Drill to JIRA Cloud, use the following configuration:
+```json
+{
+ "type": "http",
+ "cacheResults": false,
+ "timeout": 5,
+ "connections": {
+ "sunrise": {
+ "url": "https://api.sunrise-sunset.org/",
+ "method": "GET",
+ "headers": null,
+ "authType": "none",
+ "userName": null,
+ "password": null,
+ "postBody": null
+ },
+ "jira": {
+ "url": "https://<project>.atlassian.net/rest/api/3/",
+ "method": "GET",
+ "headers": {
+ "Accept": "application/json"
+ },
+ "authType": "basic",
+ "userName": "<username>",
+ "password": "<API Key>",
+ "postBody": null
+ }
+ },
+ "enabled": true
+}
+```
+
+Once you've configured Drill to query the API, you can now easily access any of your data in JIRA. The JIRA API returns highly nested data, however with a little preparation, it
+ is pretty straightforward to transform it into a more useful table. For instance, the
+ query below:
+```sql
+SELECT jira_data.issues.key AS key,
+jira_data.issues.fields.issueType.name AS issueType,
+SUBSTR(jira_data.issues.fields.created, 1, 10) AS created,
+SUBSTR(jira_data.issues.fields.updated, 1, 10) AS updated,
+jira_data.issues.fields.assignee.displayName as assignee,
+jira_data.issues.fields.creator.displayName as creator,
+jira_data.issues.fields.summary AS summary,
+jira_data.issues.fields.status.name AS currentStatus,
+jira_data.issues.fields.priority.name AS priority,
+jira_data.issues.fields.labels AS labels,
+jira_data.issues.fields.subtasks AS subtasks
+FROM (
+SELECT flatten(t1.issues) as issues
+FROM http.jira.`search?jql=project=<project>&&maxResults=100` AS t1
+) AS jira_data
+```
+The query below counts the number of issues by priority:
+
+```sql
+SELECT
+jira_data.issues.fields.priority.name AS priority,
+COUNT(*) AS issue_count
+FROM (
+SELECT flatten(t1.issues) as issues
+FROM http.jira.`search?jql=project=<project>&maxResults=100` AS t1
+) AS jira_data
+GROUP BY priority
+ORDER BY issue_count DESC
+```
+
+<img src="images/issue_count.png" alt="Issue Count by Priority"/>
+
+
+## Limitations
+
+1. The plugin is supposed to follow redirects, however if you are using Authentication, you may encounter errors or empty responses if you are counting on the endpoint for
+ redirection.
+
+2. At this time, the plugin does not support any authentication other than basic authentication. Future functionality may include OAUTH2 authentication and/or PKI
+ authentication for REST APIs.
+
+3. This plugin does not implement filter pushdowns. Filter pushdown has the potential to improve performance.
+
+4. This plugin only reads JSON responses. Future functionality may include the ability to parse XML, CSV or other common rest responses.
+
+5. At this time `POST` bodies can only be in the format of key/value pairs. Some APIs accept JSON based `POST` bodies and this is not currently supported.
+
+6. The returned message should contain only records, as a JSON array of objects (or as a series of JSON objects as in a JSON file). The
+ present version does not yet have the ability to ignore message "overhead" such as status codes, etc. You can of course, select individual fields in your query to ignore
+ "overhead" fields.
diff --git a/contrib/storage-http/images/issue_count.png b/contrib/storage-http/images/issue_count.png
new file mode 100644
index 0000000..49bc04f
Binary files /dev/null and b/contrib/storage-http/images/issue_count.png differ
diff --git a/contrib/storage-http/pom.xml b/contrib/storage-http/pom.xml
new file mode 100644
index 0000000..0c2c875
--- /dev/null
+++ b/contrib/storage-http/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>drill-contrib-parent</artifactId>
+ <groupId>org.apache.drill.contrib</groupId>
+ <version>1.18.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>drill-storage-http</artifactId>
+ <name>contrib/http-storage-plugin</name>
+
+ <properties>
+ <okhttp.version>4.5.0</okhttp.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp</artifactId>
+ <version>${okhttp.version}</version>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.drill</groupId>
+ <artifactId>drill-common</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>${okhttp.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-java-sources</id>
+ <phase>process-sources</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${basedir}/target/classes/org/apache/drill/exec/store/http
+ </outputDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/java/org/apache/drill/exec/store/http</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConfig.java
new file mode 100644
index 0000000..a850d1d
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConfig.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.parquet.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class HttpAPIConfig {
+ private static final Logger logger = LoggerFactory.getLogger(HttpAPIConfig.class);
+
+ private final String url;
+
+ private final HttpMethods method;
+
+ private final Map<String, String> headers;
+
+ private final String authType;
+
+ private final String userName;
+
+ private final String password;
+
+ private final String postBody;
+
+ public enum HttpMethods {
+ /**
+ * Value for HTTP GET method
+ */
+ GET,
+ /**
+ * Value for HTTP POST method
+ */
+ POST;
+ }
+
+ public HttpAPIConfig(@JsonProperty("url") String url,
+ @JsonProperty("method") String method,
+ @JsonProperty("headers") Map<String, String> headers,
+ @JsonProperty("authType") String authType,
+ @JsonProperty("userName") String userName,
+ @JsonProperty("password") String password,
+ @JsonProperty("postBody") String postBody) {
+
+ this.headers = headers;
+ this.method = Strings.isNullOrEmpty(method)
+ ? HttpMethods.GET : HttpMethods.valueOf(method.trim().toUpperCase());
+
+ // Get the request method. Only accept GET and POST requests. Anything else will default to GET.
+ switch (this.method) {
+ case GET:
+ case POST:
+ break;
+ default:
+ throw UserException
+ .validationError()
+ .message("Invalid HTTP method: %s. Drill supports 'GET' and , 'POST'.", method)
+ .build(logger);
+ }
+ if (Strings.isNullOrEmpty(url)) {
+ throw UserException
+ .validationError()
+ .message("URL is required for the HTTP storage plugin.")
+ .build(logger);
+ }
+
+ // Put a trailing slash on the URL if it is missing
+ if (url.charAt(url.length() - 1) != '/') {
+ this.url = url + "/";
+ } else {
+ this.url = url;
+ }
+
+ // Get the authentication method. Future functionality will include OAUTH2 authentication but for now
+ // Accept either basic or none. The default is none.
+ this.authType = Strings.isNullOrEmpty(authType) ? "none" : authType;
+ this.userName = userName;
+ this.password = password;
+ this.postBody = postBody;
+ }
+
+ @JsonProperty("url")
+ public String url() { return url; }
+
+ @JsonProperty("method")
+ public String method() { return method.toString(); }
+
+ @JsonProperty("headers")
+ public Map<String, String> headers() { return headers; }
+
+ @JsonProperty("authType")
+ public String authType() { return authType; }
+
+ @JsonProperty("userName")
+ public String userName() { return userName; }
+
+ @JsonProperty("password")
+ public String password() { return password; }
+
+ @JsonProperty("postBody")
+ public String postBody() { return postBody; }
+
+ @JsonIgnore
+ public HttpMethods getMethodType() {
+ return HttpMethods.valueOf(this.method());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(url, method, headers, authType, userName, password, postBody);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("url", url)
+ .field("method", method)
+ .field("headers", headers)
+ .field("authType", authType)
+ .field("username", userName)
+ .field("password", password)
+ .field("postBody", postBody)
+ .toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ HttpAPIConfig other = (HttpAPIConfig) obj;
+ return Objects.equals(url, other.url)
+ && Objects.equals(method, other.method)
+ && Objects.equals(headers, other.headers)
+ && Objects.equals(authType, other.authType)
+ && Objects.equals(userName, other.userName)
+ && Objects.equals(password, other.password)
+ && Objects.equals(postBody, other.postBody);
+ }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java
new file mode 100644
index 0000000..d3fa51f
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import org.apache.calcite.schema.Table;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * In the HTTP storage plugin, users can define specific connections or APIs.
+ * This class represents the database component of other storage plugins.
+ */
+public class HttpAPIConnectionSchema extends AbstractSchema {
+
+ private final Map<String, DynamicDrillTable> activeTables = new HashMap<>();
+
+ private final HttpStoragePlugin plugin;
+
+ private final String pluginName;
+
+ public HttpAPIConnectionSchema(HttpSchemaFactory.HttpSchema httpSchema,
+ String name,
+ HttpStoragePlugin plugin) {
+ super(httpSchema.getSchemaPath(), name);
+ this.plugin = plugin;
+ pluginName = plugin.getName();
+ }
+
+ @Override
+ public String getTypeName() {
+ return HttpStoragePluginConfig.NAME;
+ }
+
+ /**
+ * Gets the table that is received from the query. In this case, the table actually are arguments which are passed
+ * in the URL string.
+ *
+ * @param tableName
+ * The "tableName" actually will contain the URL arguments passed to
+ * the record reader
+ * @return the selected table
+ */
+ @Override
+ public Table getTable(String tableName) {
+ DynamicDrillTable table = activeTables.get(name);
+ if (table != null) {
+ // Return the found table
+ return table;
+ } else {
+ // Register a new table
+ return registerTable(name, new DynamicDrillTable(plugin, pluginName, new HttpScanSpec(pluginName, name, tableName, plugin.getConfig())));
+ }
+ }
+
+ @Override
+ public Set<String> getTableNames() {
+ return activeTables.keySet();
+ }
+
+ private DynamicDrillTable registerTable(String name, DynamicDrillTable table) {
+ activeTables.put(name, table);
+ return table;
+ }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
new file mode 100644
index 0000000..7aaa1e8
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import java.io.File;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoader;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
+import org.apache.drill.exec.store.http.util.HttpProxyConfig;
+import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyBuilder;
+import org.apache.drill.exec.store.http.util.SimpleHttp;
+
+import com.typesafe.config.Config;
+
+public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
+ private final HttpStoragePluginConfig config;
+ private final HttpSubScan subScan;
+ private JsonLoader jsonLoader;
+
+ public HttpBatchReader(HttpStoragePluginConfig config, HttpSubScan subScan) {
+ this.config = config;
+ this.subScan = subScan;
+ }
+
+ @Override
+ public boolean open(SchemaNegotiator negotiator) {
+ CustomErrorContext errorContext = negotiator.parentErrorContext();
+
+ // Result set loader setup
+ String tempDirPath = negotiator
+ .drillConfig()
+ .getString(ExecConstants.DRILL_TMP_DIR);
+ ResultSetLoader loader = negotiator.build();
+
+ // Http client setup
+ SimpleHttp http = new SimpleHttp(config, new File(tempDirPath), subScan.tableSpec().database(), proxySettings(negotiator.drillConfig()), errorContext);
+
+ // JSON loader setup
+ jsonLoader = new JsonLoaderBuilder()
+ .resultSetLoader(loader)
+ .standardOptions(negotiator.queryOptions())
+ .errorContext(errorContext)
+ .fromStream(http.getInputStream(subScan.getFullURL()))
+ .build();
+
+ // Please read the first batch
+ return true;
+ }
+
+ private HttpProxyConfig proxySettings(Config drillConfig) {
+ ProxyBuilder builder = HttpProxyConfig.builder()
+ .fromConfigForURL(drillConfig, subScan.getFullURL());
+ String proxyType = config.proxyType();
+ if (proxyType != null && !"direct".equals(proxyType)) {
+ builder
+ .type(config.proxyType())
+ .host(config.proxyHost())
+ .port(config.proxyPort())
+ .username(config.proxyUsername())
+ .password(config.proxyPassword());
+ }
+ return builder.build();
+ }
+
+ @Override
+ public boolean next() {
+ return jsonLoader.readBatch();
+ }
+
+ @Override
+ public void close() {
+ if (jsonLoader != null) {
+ jsonLoader.close();
+ jsonLoader = null;
+ }
+ }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
new file mode 100644
index 0000000..9e4e86d
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import java.util.List;
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+
+@JsonTypeName("http-scan")
+public class HttpGroupScan extends AbstractGroupScan {
+
+ private final List<SchemaPath> columns;
+ private final HttpScanSpec httpScanSpec;
+ private final HttpStoragePluginConfig config;
+
+ public HttpGroupScan (
+ HttpStoragePluginConfig config,
+ HttpScanSpec scanSpec,
+ List<SchemaPath> columns) {
+ super("no-user");
+ this.config = config;
+ this.httpScanSpec = scanSpec;
+ this.columns = columns;
+ }
+
+ public HttpGroupScan(HttpGroupScan that) {
+ super(that);
+ config = that.config();
+ httpScanSpec = that.httpScanSpec();
+ columns = that.getColumns();
+ }
+
+ public HttpGroupScan(HttpGroupScan that, List<SchemaPath> columns) {
+ super("no-user");
+ this.columns = columns;
+ this.config = that.config;
+ this.httpScanSpec = that.httpScanSpec;
+ }
+
+ @JsonCreator
+ public HttpGroupScan(
+ @JsonProperty("config") HttpStoragePluginConfig config,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("httpScanSpec") HttpScanSpec httpScanSpec
+ ) {
+ super("no-user");
+ this.config = config;
+ this.columns = columns;
+ this.httpScanSpec = httpScanSpec;
+ }
+
+ @JsonProperty("config")
+ public HttpStoragePluginConfig config() { return config; }
+
+ @JsonProperty("columns")
+ public List<SchemaPath> columns() { return columns; }
+
+ @JsonProperty("httpScanSpec")
+ public HttpScanSpec httpScanSpec() { return httpScanSpec; }
+
+ @Override
+ public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+ // No filter pushdowns yet, so this method does nothing
+ return;
+ }
+
+ @Override
+ @JsonIgnore
+ public int getMaxParallelizationWidth() {
+ return 1;
+ }
+
+ @Override
+ public boolean canPushdownProjects(List<SchemaPath> columns) {
+ return true;
+ }
+
+ @Override
+ public SubScan getSpecificScan(int minorFragmentId) {
+ return new HttpSubScan(config, httpScanSpec, columns);
+ }
+
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ return new HttpGroupScan(this, columns);
+ }
+
+ @Override
+ public String getDigest() {
+ return toString();
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new HttpGroupScan(this);
+ }
+
+ @Override
+ public ScanStats getScanStats() {
+ int estRowCount = 10_000;
+ int rowWidth = columns == null ? 200 : 100;
+ int estDataSize = estRowCount * 200 * rowWidth;
+ int estCpuCost = DrillCostBase.PROJECT_CPU_COST;
+ return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT,
+ estRowCount, estCpuCost, estDataSize);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("httpScanSpec", httpScanSpec)
+ .field("columns", columns)
+ .field("httpStoragePluginConfig", config)
+ .toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(httpScanSpec, columns, config);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ HttpGroupScan other = (HttpGroupScan) obj;
+ return Objects.equals(httpScanSpec, other.httpScanSpec())
+ && Objects.equals(columns, other.columns())
+ && Objects.equals(config, other.config());
+ }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
new file mode 100644
index 0000000..46d9838
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> {
+
+ @Override
+ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, HttpSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+
+ try {
+ ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan);
+ return builder.buildScanOperator(context, subScan);
+ } catch (UserException e) {
+ // Rethrow user exceptions directly
+ throw e;
+ } catch (Throwable e) {
+ // Wrap all others
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ private ScanFrameworkBuilder createBuilder(OptionManager options,
+ HttpSubScan subScan) {
+ HttpStoragePluginConfig config = subScan.config();
+ ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
+ builder.projection(subScan.columns());
+ builder.setUserName(subScan.getUserName());
+
+ // Provide custom error context
+ builder.errorContext(
+ new ChildErrorContext(builder.errorContext()) {
+ @Override
+ public void addContext(UserException.Builder builder) {
+ builder.addContext("URL", subScan.getFullURL());
+ }
+ });
+
+ // Reader
+ ReaderFactory readerFactory = new HttpReaderFactory(config, subScan);
+ builder.setReaderFactory(readerFactory);
+ builder.nullType(Types.optional(MinorType.VARCHAR));
+ return builder;
+ }
+
+ private static class HttpReaderFactory implements ReaderFactory {
+
+ private final HttpStoragePluginConfig config;
+ private final HttpSubScan subScan;
+ private int count;
+
+ public HttpReaderFactory(HttpStoragePluginConfig config, HttpSubScan subScan) {
+ this.config = config;
+ this.subScan = subScan;
+ }
+
+ @Override
+ public void bind(ManagedScanFramework framework) { }
+
+ @Override
+ public ManagedReader<SchemaNegotiator> next() {
+
+ // Only a single scan (in a single thread)
+ if (count++ == 0) {
+ return new HttpBatchReader(config, subScan);
+ } else {
+ return null;
+ }
+ }
+ }}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java
new file mode 100644
index 0000000..19921bc
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+
+@JsonTypeName("http-scan-spec")
+public class HttpScanSpec {
+
+ protected final String schemaName;
+
+ protected final String database;
+
+ protected final String tableName;
+
+ protected final HttpStoragePluginConfig config;
+
+ @JsonCreator
+ public HttpScanSpec(@JsonProperty("schemaName") String schemaName,
+ @JsonProperty("database") String database,
+ @JsonProperty("tableName") String tableName,
+ @JsonProperty("config") HttpStoragePluginConfig config) {
+ this.schemaName = schemaName;
+ this.database = database;
+ this.tableName = tableName;
+ this.config = config;
+ }
+
+ @JsonProperty("database")
+ public String database() {
+ return database;
+ }
+
+ @JsonProperty("tableName")
+ public String tableName() {
+ return tableName;
+ }
+
+ @JsonIgnore
+ public String getURL() {
+ return database;
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("schemaName", schemaName)
+ .field("database", database)
+ .field("tableName", tableName)
+ .field("config", config)
+ .toString();
+ }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java
new file mode 100644
index 0000000..c111d3d
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HttpSchemaFactory extends AbstractSchemaFactory {
+ private static final Logger logger = LoggerFactory.getLogger(HttpSchemaFactory.class);
+
+ private final HttpStoragePlugin plugin;
+
+ public HttpSchemaFactory(HttpStoragePlugin plugin, String schemaName) {
+ super(schemaName);
+ this.plugin = plugin;
+ }
+
+ @Override
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+ HttpSchema schema = new HttpSchema(getName());
+ logger.debug("Registering {} {}", schema.getName(), schema.toString());
+
+ SchemaPlus schemaPlus = parent.add(getName(), schema);
+ schema.setHolder(schemaPlus);
+ }
+
+ class HttpSchema extends AbstractSchema {
+
+ public HttpSchema(String name) {
+ super(Collections.emptyList(), name);
+ }
+
+ void setHolder(SchemaPlus plusOfThis) {
+ for (String s : getSubSchemaNames()) {
+ plusOfThis.add(s, getSubSchemaKnownExists(s));
+ }
+ }
+
+ @Override
+ public Set<String> getSubSchemaNames() {
+ HttpStoragePluginConfig config = plugin.getConfig();
+ Map<String, HttpAPIConfig> connections = config.connections();
+ Set<String> subSchemaNames = new HashSet<>();
+
+ // Get the possible subschemas.
+ for (Map.Entry<String, HttpAPIConfig> entry : connections.entrySet()) {
+ subSchemaNames.add(entry.getKey());
+ }
+ return subSchemaNames;
+ }
+
+ @Override
+ public AbstractSchema getSubSchema(String name) {
+ if (plugin.getConfig().connections().containsKey(name)) {
+ return getSubSchemaKnownExists(name);
+ } else {
+ throw UserException
+ .connectionError()
+ .message("API '{}' does not exist in HTTP Storage plugin '{}'", name, getName())
+ .build(logger);
+ }
+ }
+
+ /**
+ * Helper method to get subschema when we know it exists (already checked the existence)
+ */
+ private HttpAPIConnectionSchema getSubSchemaKnownExists(String name) {
+ return new HttpAPIConnectionSchema(this, name, plugin);
+ }
+
+ @Override
+ public String getTypeName() {
+ return HttpStoragePluginConfig.NAME;
+ }
+ }
+}
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
similarity index 53%
copy from contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
copy to contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
index 35c974d..c660a2e 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
@@ -15,67 +15,47 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.kudu;
-
-import java.io.IOException;
+package org.apache.drill.exec.store.http;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.JSONOptions;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.kudu.client.KuduClient;
-
import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
-public class KuduStoragePlugin extends AbstractStoragePlugin {
+public class HttpStoragePlugin extends AbstractStoragePlugin {
- private final KuduStoragePluginConfig engineConfig;
- private final KuduSchemaFactory schemaFactory;
+ private final HttpStoragePluginConfig config;
- private final KuduClient client;
+ private final HttpSchemaFactory schemaFactory;
- public KuduStoragePlugin(KuduStoragePluginConfig configuration, DrillbitContext context, String name)
- throws IOException {
+ public HttpStoragePlugin(HttpStoragePluginConfig configuration, DrillbitContext context, String name) {
super(context, name);
- this.schemaFactory = new KuduSchemaFactory(this, name);
- this.engineConfig = configuration;
- this.client = new KuduClient.KuduClientBuilder(configuration.getMasterAddresses()).build();
- }
-
- public KuduClient getClient() {
- return client;
- }
-
- @Override
- public void close() throws Exception {
- client.close();
+ this.config = configuration;
+ this.schemaFactory = new HttpSchemaFactory(this, name);
}
@Override
- public boolean supportsRead() {
- return true;
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+ schemaFactory.registerSchemas(schemaConfig, parent);
}
@Override
- public KuduGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
- KuduScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<KuduScanSpec>() {});
- return new KuduGroupScan(this, scanSpec, null);
+ public HttpStoragePluginConfig getConfig() {
+ return config;
}
@Override
- public boolean supportsWrite() {
+ public boolean supportsRead() {
return true;
}
@Override
- public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
- schemaFactory.registerSchemas(schemaConfig, parent);
- }
-
- @Override
- public KuduStoragePluginConfig getConfig() {
- return engineConfig;
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+ HttpScanSpec scanSpec = selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<HttpScanSpec>() {});
+ return new HttpGroupScan(config, scanSpec, null);
}
}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
new file mode 100644
index 0000000..2b65bb9
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+
+
+@JsonTypeName(HttpStoragePluginConfig.NAME)
+public class HttpStoragePluginConfig extends StoragePluginConfigBase {
+ private static final Logger logger = LoggerFactory.getLogger(HttpStoragePluginConfig.class);
+
+ public static final String NAME = "http";
+
+ public final Map<String, HttpAPIConfig> connections;
+
+ public final boolean cacheResults;
+
+ public final String proxyHost;
+
+ public final int proxyPort;
+
+ public final String proxyType;
+
+ public final String proxyUsername;
+
+ public final String proxyPassword;
+
+ /**
+ * Timeout in seconds.
+ */
+ public int timeout;
+
+ @JsonCreator
+ public HttpStoragePluginConfig(@JsonProperty("cacheResults") Boolean cacheResults,
+ @JsonProperty("connections") Map<String, HttpAPIConfig> connections,
+ @JsonProperty("timeout") Integer timeout,
+ @JsonProperty("proxyHost") String proxyHost,
+ @JsonProperty("proxyPort") Integer proxyPort,
+ @JsonProperty("proxyType") String proxyType,
+ @JsonProperty("proxyUsername") String proxyUsername,
+ @JsonProperty("proxyPassword") String proxyPassword
+ ) {
+ this.cacheResults = cacheResults == null ? false : cacheResults;
+
+ this.connections = CaseInsensitiveMap.newHashMap();
+ if (connections != null) {
+ this.connections.putAll(connections);
+ }
+
+ this.timeout = timeout == null ? 0 : timeout;
+ this.proxyHost = normalize(proxyHost);
+ this.proxyPort = proxyPort == null ? 0 : proxyPort;
+ this.proxyUsername = normalize(proxyUsername);
+ this.proxyPassword = normalize(proxyPassword);
+ proxyType = normalize(proxyType);
+ this.proxyType = proxyType == null
+ ? "direct" : proxyType.trim().toLowerCase();
+
+ // Validate Proxy Type
+ if (this.proxyType != null) {
+ switch (this.proxyType) {
+ case "direct":
+ case "http":
+ case "socks":
+ break;
+ default:
+ throw UserException
+ .validationError()
+ .message("Invalid Proxy Type: %s. Drill supports 'direct', 'http' and 'socks' proxies.", proxyType)
+ .build(logger);
+ }
+ }
+ }
+
+ private static String normalize(String value) {
+ if (value == null) {
+ return value;
+ }
+ value = value.trim();
+ return value.isEmpty() ? null : value;
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (this == that) {
+ return true;
+ } else if (that == null || getClass() != that.getClass()) {
+ return false;
+ }
+ HttpStoragePluginConfig thatConfig = (HttpStoragePluginConfig) that;
+ return Objects.equals(connections, thatConfig.connections) &&
+ Objects.equals(cacheResults, thatConfig.cacheResults) &&
+ Objects.equals(proxyHost, thatConfig.proxyHost) &&
+ Objects.equals(proxyPort, thatConfig.proxyPort) &&
+ Objects.equals(proxyType, thatConfig.proxyType) &&
+ Objects.equals(proxyUsername, thatConfig.proxyUsername) &&
+ Objects.equals(proxyPassword, thatConfig.proxyPassword);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("connections", connections)
+ .field("cacheResults", cacheResults)
+ .field("timeout", timeout)
+ .field("proxyHost", proxyHost)
+ .field("proxyPort", proxyPort)
+ .field("proxyUsername", proxyUsername)
+ .field("proxyPassword", proxyPassword)
+ .field("proxyType", proxyType)
+ .toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(connections, cacheResults, timeout,
+ proxyHost, proxyPort, proxyType, proxyUsername, proxyPassword);
+ }
+
+ @JsonProperty("cacheResults")
+ public boolean cacheResults() { return cacheResults; }
+
+ @JsonProperty("connections")
+ public Map<String, HttpAPIConfig> connections() { return connections; }
+
+ @JsonProperty("timeout")
+ public int timeout() { return timeout;}
+
+ @JsonProperty("proxyHost")
+ public String proxyHost() { return proxyHost; }
+
+ @JsonProperty("proxyPort")
+ public int proxyPort() { return proxyPort; }
+
+ @JsonProperty("proxyUsername")
+ public String proxyUsername() { return proxyUsername; }
+
+ @JsonProperty("proxyPassword")
+ public String proxyPassword() { return proxyPassword; }
+
+ @JsonProperty("proxyType")
+ public String proxyType() { return proxyType; }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
new file mode 100644
index 0000000..700ad70
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+@JsonTypeName("http-sub-scan")
+public class HttpSubScan extends AbstractBase implements SubScan {
+
+ private final HttpScanSpec tableSpec;
+ private final HttpStoragePluginConfig config;
+ private final List<SchemaPath> columns;
+
+ @JsonCreator
+ public HttpSubScan(
+ @JsonProperty("config") HttpStoragePluginConfig config,
+ @JsonProperty("tableSpec") HttpScanSpec tableSpec,
+ @JsonProperty("columns") List<SchemaPath> columns) {
+ super("user-if-needed");
+ this.config = config;
+ this.tableSpec = tableSpec;
+ this.columns = columns;
+ }
+ @JsonProperty("tableSpec")
+ public HttpScanSpec tableSpec() {
+ return tableSpec;
+ }
+
+ @JsonProperty("columns")
+ public List<SchemaPath> columns() {
+ return columns;
+ }
+
+ @JsonProperty("config")
+ public HttpStoragePluginConfig config() {
+ return config;
+ }
+
+ @JsonIgnore
+ public String getURL() {
+ return tableSpec.getURL();
+ }
+
+ @JsonIgnore
+ public String getFullURL() {
+ String selectedConnection = tableSpec.database();
+ String url = config.connections().get(selectedConnection).url();
+ return url + tableSpec.tableName();
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(
+ PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitSubScan(this, value);
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ return new HttpSubScan(config, tableSpec, columns);
+ }
+
+ @Override
+ @JsonIgnore
+ public int getOperatorType() {
+ return CoreOperatorType.HTTP_SUB_SCAN_VALUE;
+ }
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return ImmutableSet.<PhysicalOperator>of().iterator();
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("tableSpec", tableSpec)
+ .field("columns", columns)
+ .field("config", config)
+ .toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tableSpec,columns,config);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ HttpSubScan other = (HttpSubScan) obj;
+ return Objects.equals(tableSpec, other.tableSpec)
+ && Objects.equals(columns, other.columns)
+ && Objects.equals(config, other.config);
+ }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/HttpProxyConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/HttpProxyConfig.java
new file mode 100644
index 0000000..41f6d35
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/HttpProxyConfig.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http.util;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.parquet.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+/**
+ * HTTP proxy settings. Provides a builder to create settings
+ * from the Drill config or from code. Allows combining the two.
+ * The Drill config allows integrating with Linux env. vars. Allows
+ * combinations: take values from config, but selectively replace bits,
+ * such as the user name/password.
+ * <p>
+ * This class provides values passed to the HTTP client, whatever it
+ * might be.
+ *
+ * @see <a href="https://www.shellhacks.com/linux-proxy-server-settings-set-proxy-command-line/">
+ * Proxy Server Settings</a>
+ *
+ */
+public class HttpProxyConfig {
+ private static final Logger logger = LoggerFactory.getLogger(HttpProxyConfig.class);
+
+ public enum ProxyType {
+ NONE, HTTP, SOCKS
+ }
+
+ public static class ProxyBuilder {
+ private String url;
+ private String typeStr;
+ private ProxyType type = ProxyType.NONE;
+ private String host;
+ private int port = 80;
+ private String username;
+ private String password;
+
+ public ProxyBuilder fromHttpConfig(Config config) {
+ url(config.getString(ExecConstants.HTTP_PROXY_URL));
+ type(config.getString(ExecConstants.HTTP_PROXY_TYPE));
+ host(config.getString(ExecConstants.HTTP_PROXY_HOST));
+ port(config.getInt(ExecConstants.HTTP_PROXY_PORT));
+ username(config.getString(ExecConstants.HTTP_PROXY_USER_NAME));
+ password(config.getString(ExecConstants.HTTP_PROXY_PASSWORD));
+ return this;
+ }
+
+ public ProxyBuilder fromHttpsConfig(Config config) {
+ url(config.getString(ExecConstants.HTTPS_PROXY_URL));
+ type(config.getString(ExecConstants.HTTPS_PROXY_TYPE));
+ host(config.getString(ExecConstants.HTTPS_PROXY_HOST));
+ port(config.getInt(ExecConstants.HTTPS_PROXY_PORT));
+ username(config.getString(ExecConstants.HTTPS_PROXY_USER_NAME));
+ password(config.getString(ExecConstants.HTTPS_PROXY_PASSWORD));
+ return this;
+ }
+
+ public ProxyBuilder fromConfigForURL(Config config, String url) {
+ try {
+ URL parsed = new URL(url);
+ if (parsed.getProtocol().equals("https")) {
+ return fromHttpsConfig(config);
+ }
+ } catch (Exception e) {
+ // This is not the place to warn about a bad URL.
+ // Just assume HTTP, something later will fail.
+ }
+ return fromHttpConfig(config);
+ }
+
+ public ProxyBuilder url(String url) {
+ this.url = url;
+ return this;
+ }
+
+ public ProxyBuilder type(ProxyType type) {
+ this.type = type;
+ this.typeStr = null;
+ return this;
+ }
+
+ public ProxyBuilder type(String type) {
+ this.typeStr = type;
+ this.type = null;
+ return this;
+ }
+
+ public ProxyBuilder host(String host) {
+ this.host = host;
+ return this;
+ }
+
+ public ProxyBuilder port(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public ProxyBuilder username(String username) {
+ this.username = username;
+ return this;
+ }
+
+ public ProxyBuilder password(String password) {
+ this.password = password;
+ return this;
+ }
+
+ public HttpProxyConfig build() {
+ buildFromUrl();
+ buildType();
+
+ // Info can come from the config file. Ignore extra spaces.
+ host = host == null ? null : host.trim();
+ username = username == null ? null : username.trim();
+ password = password == null ? null : password.trim();
+ return new HttpProxyConfig(this);
+ }
+
+ private void buildFromUrl() {
+ url = url == null ? null : url.trim();
+ if (Strings.isNullOrEmpty(url)) {
+ return;
+ }
+ typeStr = null;
+ URL parsed;
+ try {
+ parsed = new URL(url);
+ } catch (MalformedURLException e) {
+ logger.warn("Invalid proxy url: {}, assuming NONE", typeStr);
+ type = ProxyType.NONE;
+ return;
+ }
+ type = ProxyType.HTTP;
+ host = parsed.getHost();
+ port = parsed.getPort();
+ String userInfo = parsed.getUserInfo();
+ if (userInfo != null) {
+ String[] parts = userInfo.split(":");
+ username = parts[0];
+ password = parts.length > 1 ? parts[1] : null;
+ }
+ }
+
+ private void buildType() {
+
+ // If type string, validate to a type
+ if (typeStr != null) {
+ typeStr = typeStr.trim().toUpperCase();
+ if (!Strings.isNullOrEmpty(typeStr)) {
+ try {
+ type = ProxyType.valueOf(typeStr);
+ } catch (IllegalArgumentException e) {
+ logger.warn("Invalid proxy type: {}, assuming NONE", typeStr);
+ this.type = ProxyType.NONE;
+ }
+ }
+ }
+
+ // If not type, assume NONE
+ if (type == null) {
+ type = ProxyType.NONE;
+ }
+
+ // Validate host based on type
+ if (type != ProxyType.NONE) {
+ host = host == null ? null : host.trim();
+ if (Strings.isNullOrEmpty(host)) {
+ logger.warn("{} proxy type specified, but host is null. Reverting to NONE",
+ type.name());
+ type = ProxyType.NONE;
+ }
+ }
+
+ // If no proxy, ignore other settings
+ if (type == ProxyType.NONE) {
+ host = null;
+ username = null;
+ password = null;
+ }
+ }
+ }
+
+ public final ProxyType type;
+ public final String host;
+ public final int port;
+ public final String username;
+ public final String password;
+
+ private HttpProxyConfig(ProxyBuilder builder) {
+ this.type = builder.type;
+ this.host = builder.host;
+ this.port = builder.port;
+ this.username = builder.username;
+ this.password = builder.password;
+ }
+
+ public static ProxyBuilder builder() { return new ProxyBuilder(); }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
new file mode 100644
index 0000000..9bc234f
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http.util;
+
+import okhttp3.Authenticator;
+import okhttp3.Cache;
+import okhttp3.Credentials;
+import okhttp3.FormBody;
+import okhttp3.Interceptor;
+import okhttp3.OkHttpClient;
+import okhttp3.OkHttpClient.Builder;
+import okhttp3.Request;
+import okhttp3.Response;
+
+import okhttp3.Route;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.http.HttpAPIConfig;
+import org.apache.drill.exec.store.http.HttpAPIConfig.HttpMethods;
+import org.apache.drill.exec.store.http.HttpStoragePluginConfig;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+
+/**
+ * Performs the actual HTTP requests for the HTTP Storage Plugin. The core
+ * method is the getInputStream() method which accepts a url and opens an
+ * InputStream with that URL's contents.
+ */
+public class SimpleHttp {
+ private static final Logger logger = LoggerFactory.getLogger(SimpleHttp.class);
+
+ private final OkHttpClient client;
+ private final HttpStoragePluginConfig config;
+ private final HttpAPIConfig apiConfig;
+ private final File tempDir;
+ private final HttpProxyConfig proxyConfig;
+ private final CustomErrorContext errorContext;
+
+ public SimpleHttp(HttpStoragePluginConfig config, File tempDir,
+ String connectionName, HttpProxyConfig proxyConfig,
+ CustomErrorContext errorContext) {
+ this.config = config;
+ this.tempDir = tempDir;
+ this.apiConfig = config.connections().get(connectionName);
+ this.proxyConfig = proxyConfig;
+ this.errorContext = errorContext;
+ this.client = setupHttpClient();
+ }
+
+ public InputStream getInputStream(String urlStr) {
+ Request.Builder requestBuilder;
+
+ requestBuilder = new Request.Builder()
+ .url(urlStr);
+
+ // The configuration does not allow for any other request types other than POST and GET.
+ if (apiConfig.getMethodType() == HttpMethods.POST) {
+ // Handle POST requests
+ FormBody.Builder formBodyBuilder = buildPostBody();
+ requestBuilder.post(formBodyBuilder.build());
+ }
+
+ // Add headers to request
+ if (apiConfig.headers() != null) {
+ for (Map.Entry<String, String> entry : apiConfig.headers().entrySet()) {
+ requestBuilder.addHeader(entry.getKey(), entry.getValue());
+ }
+ }
+
+ // Build the request object
+ Request request = requestBuilder.build();
+
+ try {
+ // Execute the request
+ Response response = client
+ .newCall(request)
+ .execute();
+
+ // If the request is unsuccessful, throw a UserException
+ if (!response.isSuccessful()) {
+ throw UserException
+ .dataReadError()
+ .message("Error retrieving data from HTTP Storage Plugin: %d %s",
+ response.code(), response.message())
+ .addContext(errorContext)
+ .build(logger);
+ }
+ logger.debug("HTTP Request for {} successful.", urlStr);
+ logger.debug("Response Headers: {} ", response.headers().toString());
+
+ // Return the InputStream of the response
+ return Objects.requireNonNull(response.body()).byteStream();
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Error retrieving data from HTTP Storage Plugin: %s", e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
+ }
+ }
+
+ /**
+ * Configures the OkHTTP3 server object with configuration info from the user.
+ *
+ * @return OkHttpClient configured server
+ */
+ private OkHttpClient setupHttpClient() {
+ Builder builder = new OkHttpClient.Builder();
+
+ // Set up the HTTP Cache. Future possibilities include making the cache size and retention configurable but
+ // right now it is on or off. The writer will write to the Drill temp directory if it is accessible and
+ // output a warning if not.
+ if (config.cacheResults()) {
+ setupCache(builder);
+ }
+
+ // If the API uses basic authentication add the authentication code.
+ if (apiConfig.authType().toLowerCase().equals("basic")) {
+ logger.debug("Adding Interceptor");
+ builder.addInterceptor(new BasicAuthInterceptor(apiConfig.userName(), apiConfig.password()));
+ }
+
+ // Set timeouts
+ builder.connectTimeout(config.timeout(), TimeUnit.SECONDS);
+ builder.writeTimeout(config.timeout(), TimeUnit.SECONDS);
+ builder.readTimeout(config.timeout(), TimeUnit.SECONDS);
+
+ // Set the proxy configuration
+
+ Proxy.Type proxyType;
+ switch (proxyConfig.type) {
+ case SOCKS:
+ proxyType = Proxy.Type.SOCKS;
+ break;
+ case HTTP:
+ proxyType = Proxy.Type.HTTP;
+ break;
+ default:
+ proxyType = Proxy.Type.DIRECT;
+ }
+ if (proxyType != Proxy.Type.DIRECT) {
+ builder.proxy(new Proxy(proxyType,
+ new InetSocketAddress(proxyConfig.host, proxyConfig.port)));
+ if (proxyConfig.username != null) {
+ builder.proxyAuthenticator(new Authenticator() {
+ @Override public Request authenticate(Route route, Response response) {
+ String credential = Credentials.basic(proxyConfig.username, proxyConfig.password);
+ return response.request().newBuilder()
+ .header("Proxy-Authorization", credential)
+ .build();
+ }
+ });
+ }
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Configures response caching using a provided temp directory.
+ *
+ * @param builder
+ * Builder the Builder object to which the caching is to be
+ * configured
+ */
+ private void setupCache(Builder builder) {
+ int cacheSize = 10 * 1024 * 1024; // TODO Add cache size in MB to config
+ File cacheDirectory = new File(tempDir, "http-cache");
+ if (!cacheDirectory.mkdirs()) {
+ throw UserException.dataWriteError()
+ .message("Could not create the HTTP cache directory")
+ .addContext("Path", cacheDirectory.getAbsolutePath())
+ .addContext("Please check the temp directory or disable HTTP caching.")
+ .addContext(errorContext)
+ .build(logger);
+ }
+ try {
+ Cache cache = new Cache(cacheDirectory, cacheSize);
+ logger.debug("Caching HTTP Query Results at: {}", cacheDirectory);
+ builder.cache(cache);
+ } catch (Exception e) {
+ throw UserException.dataWriteError(e)
+ .message("Could not create the HTTP cache")
+ .addContext("Path", cacheDirectory.getAbsolutePath())
+ .addContext("Please check the temp directory or disable HTTP caching.")
+ .addContext(errorContext)
+ .build(logger);
+ }
+ }
+
+ /**
+ * Accepts text from a post body in the format:<br>
+ * {@code key1=value1}<br>
+ * {@code key2=value2}
+ * <p>
+ * and creates the appropriate headers.
+ *
+ * @return FormBody.Builder The populated formbody builder
+ */
+ private FormBody.Builder buildPostBody() {
+ final Pattern postBodyPattern = Pattern.compile("^.+=.+$");
+
+ FormBody.Builder formBodyBuilder = new FormBody.Builder();
+ String[] lines = apiConfig.postBody().split("\\r?\\n");
+ for(String line : lines) {
+
+ // If the string is in the format key=value split it,
+ // Otherwise ignore
+ if (postBodyPattern.matcher(line).find()) {
+ //Split into key/value
+ String[] parts = line.split("=");
+ formBodyBuilder.add(parts[0], parts[1]);
+ }
+ }
+ return formBodyBuilder;
+ }
+
+ /**
+ * Intercepts requests and adds authentication headers to the request
+ */
+ public static class BasicAuthInterceptor implements Interceptor {
+ private final String credentials;
+
+ public BasicAuthInterceptor(String user, String password) {
+ credentials = Credentials.basic(user, password);
+ }
+
+ @NotNull
+ @Override
+ public Response intercept(Chain chain) throws IOException {
+ // Get the existing request
+ Request request = chain.request();
+
+ // Replace with new request containing the authorization headers and previous headers
+ Request authenticatedRequest = request.newBuilder().header("Authorization", credentials).build();
+ return chain.proceed(authenticatedRequest);
+ }
+ }
+}
diff --git a/contrib/storage-http/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-http/src/main/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..9afcaee
--- /dev/null
+++ b/contrib/storage-http/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,9 @@
+{
+ "storage":{
+ "http" : {
+ "type":"http",
+ "connections": {},
+ "enabled": false
+ }
+ }
+}
diff --git a/contrib/storage-http/src/main/resources/drill-module.conf b/contrib/storage-http/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..c0f2a93
--- /dev/null
+++ b/contrib/storage-http/src/main/resources/drill-module.conf
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file tells Drill to consider this module when class path scanning.
+# This file can also include any supplementary configuration information.
+# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill: {
+ classpath.scanning: {
+ packages += "org.apache.drill.exec.store.http"
+ }
+}
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
new file mode 100644
index 0000000..f99df7c
--- /dev/null
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import okio.Buffer;
+import okio.Okio;
+
+/**
+ * Tests the HTTP Storage plugin. Since the plugin makes use of REST requests,
+ * this test class makes use of the okhttp3 MockWebServer to simulate a remote
+ * web server. There are two unit tests that make remote REST calls, however
+ * these tests are ignored by default.
+ * <p>
+ * The HTTP reader uses Drill's existing JSON reader class, so the unit tests
+ * focus on testing the plugin configurations rather than how well it parses the
+ * JSON as this is tested elsewhere.
+ */
+public class TestHttpPlugin extends ClusterTest {
+
+ private static final int MOCK_SERVER_PORT = 8091;
+ private static String TEST_JSON_RESPONSE;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response.json"), Charsets.UTF_8).read();
+
+ dirTestWatcher.copyResourceToRoot(Paths.get("data/"));
+
+ Map<String, String> headers = new HashMap<>();
+ headers.put("header1", "value1");
+ headers.put("header2", "value2");
+
+ HttpAPIConfig mockConfig = new HttpAPIConfig("http://localhost:8091/", "GET", headers, "basic", "user", "pass",null);
+
+ HttpAPIConfig sunriseConfig = new HttpAPIConfig("https://api.sunrise-sunset.org/", "GET", null, null, null, null, null);
+
+ HttpAPIConfig stockConfig = new HttpAPIConfig("https://api.worldtradingdata.com/api/v1/stock?symbol=SNAP,TWTR,VOD" +
+ ".L&api_token=zuHlu2vZaehdZN6GmJdTiVlp7xgZn6gl6sfgmI4G6TY4ej0NLOzvy0TUl4D4", "get", null, null, null, null, null);
+
+ HttpAPIConfig mockPostConfig = new HttpAPIConfig("http://localhost:8091/", "POST", headers, null, null, null,"key1=value1\nkey2=value2");
+
+ Map<String, HttpAPIConfig> configs = new HashMap<>();
+ configs.put("stock", stockConfig);
+ configs.put("sunrise", sunriseConfig);
+ configs.put("mock", mockConfig);
+ configs.put("mockpost", mockPostConfig);
+
+ HttpStoragePluginConfig mockStorageConfigWithWorkspace = new HttpStoragePluginConfig(false, configs, 2, "", 80, "", "", "");
+ mockStorageConfigWithWorkspace.setEnabled(true);
+ cluster.defineStoragePlugin("api", mockStorageConfigWithWorkspace);
+ }
+
+ @Test
+ public void verifyPluginConfig() throws Exception {
+ String sql = "SELECT SCHEMA_NAME, TYPE FROM INFORMATION_SCHEMA.`SCHEMATA` WHERE TYPE='http'";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("SCHEMA_NAME", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("TYPE", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("api.mock", "http")
+ .addRow("api.mockpost", "http")
+ .addRow("api.stock", "http")
+ .addRow("api.sunrise", "http")
+ .addRow("api", "http")
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ /**
+ * Evaluates the HTTP plugin with the results from an API that returns the
+ * sunrise/sunset times for a given lat/long and date. API documentation is
+ * available here: https://sunrise-sunset.org/api
+ *
+ * The API returns results in the following format:
+ * <pre><code>
+ * {
+ * "results":
+ * {
+ * "sunrise":"7:27:02 AM",
+ * "sunset":"5:05:55 PM",
+ * "solar_noon":"12:16:28 PM",
+ * "day_length":"9:38:53",
+ * "civil_twilight_begin":"6:58:14 AM",
+ * "civil_twilight_end":"5:34:43 PM",
+ * "nautical_twilight_begin":"6:25:47 AM",
+ * "nautical_twilight_end":"6:07:10 PM",
+ * "astronomical_twilight_begin":"5:54:14 AM",
+ * "astronomical_twilight_end":"6:38:43 PM"
+ * },
+ * "status":"OK"
+ * }
+ * }</code></pre>
+ *
+ * @throws Exception
+ * Throws exception if something goes awry
+ */
+ @Test
+ @Ignore("Requires Remote Server")
+ public void simpleStarQuery() throws Exception {
+ String sql = "SELECT * FROM api.sunrise.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addMap("results")
+ .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .resumeSchema()
+ .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow( mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+ .build();
+
+ int resultCount = results.rowCount();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+
+ assertEquals(1, resultCount);
+ }
+
+ @Test
+ @Ignore("Requires Remote Server")
+ public void simpleSpecificQuery() throws Exception {
+ String sql = "SELECT t1.results.sunrise AS sunrise, t1.results.sunset AS sunset FROM api.sunrise.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("6:13:58 AM", "5:59:55 PM")
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testSerDe() throws Exception {
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(
+ new MockResponse().setResponseCode(200)
+ .setBody(TEST_JSON_RESPONSE)
+ );
+
+ String sql = "SELECT COUNT(*) FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+ String plan = queryBuilder().sql(sql).explainJson();
+ long cnt = queryBuilder().physical(plan).singletonLong();
+ assertEquals("Counts should match",1L, cnt);
+ }
+ }
+
+ @Test
+ public void simpleTestWithMockServer() throws Exception {
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(
+ new MockResponse().setResponseCode(200)
+ .setBody(TEST_JSON_RESPONSE)
+ );
+
+ String sql = "SELECT * FROM api.mock.`json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addMap("results")
+ .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .resumeSchema()
+ .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+ .build();
+
+ int resultCount = results.rowCount();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+
+ assertEquals(1, resultCount);
+ }
+ }
+
+ @Test
+ public void testPostWithMockServer() throws Exception {
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setBody(TEST_JSON_RESPONSE)
+ );
+
+ String sql = "SELECT * FROM api.mockPost.`json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addMap("results")
+ .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .resumeSchema()
+ .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+ .build();
+
+ int resultCount = results.rowCount();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+
+ RecordedRequest recordedRequest = server.takeRequest();
+ assertEquals("POST", recordedRequest.getMethod());
+ assertEquals(recordedRequest.getHeader("header1"), "value1");
+ assertEquals(recordedRequest.getHeader("header2"), "value2");
+ assertEquals(1, resultCount);
+ }
+ }
+
+ @Test
+ public void specificTestWithMockServer() throws Exception {
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(
+ new MockResponse().setResponseCode(200)
+ .setBody(TEST_JSON_RESPONSE)
+ );
+
+ String sql = "SELECT t1.results.sunrise AS sunrise, t1.results.sunset AS sunset FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("6:13:58 AM", "5:59:55 PM")
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+ }
+
+ @Test
+ public void testSlowResponse() throws Exception {
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(
+ new MockResponse().setResponseCode(200)
+ .setBody(TEST_JSON_RESPONSE)
+ .throttleBody(64, 4, TimeUnit.SECONDS)
+ );
+
+ String sql = "SELECT t1.results.sunrise AS sunrise, t1.results.sunset AS sunset FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
+
+ try {
+ client.queryBuilder().sql(sql).rowSet();
+ fail();
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("DATA_READ ERROR: timeout"));
+ }
+ }
+ }
+
+ @Test
+ public void testZeroByteResponse() throws Exception {
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(
+ new MockResponse().setResponseCode(200)
+ .setBody("")
+ );
+
+ String sql = "SELECT * FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ assertNull(results);
+ }
+ }
+
+ // Note that, in this test, the response is not empty. Instead, the
+ // response has a single row with no columns.
+ @Test
+ public void testEmptyJSONObjectResponse() throws Exception {
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(
+ new MockResponse().setResponseCode(200)
+ .setBody("{}")
+ );
+
+ String sql = "SELECT * FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow()
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+ }
+
+ @Test
+ public void testErrorResponse() throws Exception {
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(
+ new MockResponse().setResponseCode(404)
+ .setBody("{}")
+ );
+
+ String sql = "SELECT * FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+
+ try {
+ client.queryBuilder().sql(sql).rowSet();
+ fail();
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("DATA_READ ERROR: Error retrieving data from HTTP Storage Plugin: 404 Client Error"));
+ }
+ }
+ }
+
+ @Test
+ public void testHeaders() throws Exception {
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(
+ new MockResponse().setResponseCode(200)
+ .setBody(TEST_JSON_RESPONSE)
+ );
+
+ String sql = "SELECT * FROM api.mock.`json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addMap("results")
+ .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .resumeSchema()
+ .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow( mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+ .build();
+
+ int resultCount = results.rowCount();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+
+ assertEquals(1, resultCount);
+
+ RecordedRequest request = server.takeRequest();
+ assertEquals("value1", request.getHeader("header1"));
+ assertEquals("value2", request.getHeader("header2"));
+ assertEquals("Basic dXNlcjpwYXNz", request.getHeader("Authorization"));
+ }
+ }
+
+ /**
+ * Helper function to convert files to a readable input steam.
+ * @param file The input file to be read
+ * @return A buffer to the file
+ * @throws IOException If the file is unreadable, throws an IOException
+ */
+ private Buffer fileToBytes(File file) throws IOException {
+ Buffer result = new Buffer();
+ result.writeAll(Okio.source(file));
+ return result;
+ }
+
+ /**
+ * Helper function to start the MockHTTPServer
+ * @return Started Mock server
+ * @throws IOException If the server cannot start, throws IOException
+ */
+ private MockWebServer startServer() throws IOException {
+ MockWebServer server = new MockWebServer();
+ server.start(MOCK_SERVER_PORT);
+ return server;
+ }
+}
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpProxy.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpProxy.java
new file mode 100644
index 0000000..f3eca88
--- /dev/null
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpProxy.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.http.util.HttpProxyConfig;
+import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyType;
+import org.apache.drill.test.BaseTest;
+import org.apache.drill.test.ConfigBuilder;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.typesafe.config.Config;
+
+public class TestHttpProxy extends BaseTest {
+
+ @Test
+ public void testBasics() {
+ HttpProxyConfig proxy = HttpProxyConfig.builder()
+ .type("socks")
+ .host(" foo.com ")
+ .port(1234)
+ .username(" bob ")
+ .password(" secret ")
+ .build();
+ assertEquals(ProxyType.SOCKS, proxy.type);
+ assertEquals("foo.com", proxy.host);
+ assertEquals(1234, proxy.port);
+ assertEquals("bob", proxy.username);
+ assertEquals("secret", proxy.password);
+ }
+
+ @Test
+ public void testURL() {
+ // See https://www.shellhacks.com/linux-proxy-server-settings-set-proxy-command-line/
+ HttpProxyConfig proxy = HttpProxyConfig.builder()
+ .url("http://bob:secret@foo.com:1234")
+ .build();
+ assertEquals(ProxyType.HTTP, proxy.type);
+ assertEquals("foo.com", proxy.host);
+ assertEquals(1234, proxy.port);
+ assertEquals("bob", proxy.username);
+ assertEquals("secret", proxy.password);
+ }
+
+ @Test
+ public void testURLAndConfig() {
+ HttpProxyConfig proxy = HttpProxyConfig.builder()
+ .url("http://foo.com:1234")
+ .username("bob")
+ .password("secret")
+ .build();
+ assertEquals(ProxyType.HTTP, proxy.type);
+ assertEquals("foo.com", proxy.host);
+ assertEquals(1234, proxy.port);
+ assertEquals("bob", proxy.username);
+ assertEquals("secret", proxy.password);
+ }
+
+ @Test
+ public void testNone() {
+ HttpProxyConfig proxy = HttpProxyConfig.builder()
+ .type("")
+ .host("foo.com")
+ .port(1234)
+ .username("bob")
+ .password("secret")
+ .build();
+ assertEquals(ProxyType.NONE, proxy.type);
+ assertNull(proxy.host);
+ assertNull(proxy.username);
+ assertNull(proxy.password);
+ }
+
+ @Test
+ public void testBlankType() {
+ HttpProxyConfig proxy = HttpProxyConfig.builder()
+ .type(" ")
+ .host("foo.com")
+ .port(1234)
+ .username("bob")
+ .password("secret")
+ .build();
+ assertEquals(ProxyType.NONE, proxy.type);
+ assertNull(proxy.host);
+ assertNull(proxy.username);
+ assertNull(proxy.password);
+ }
+
+ @Test
+ public void testBadType() {
+ HttpProxyConfig proxy = HttpProxyConfig.builder()
+ .type("bogus")
+ .host("foo.com")
+ .port(1234)
+ .username("bob")
+ .password("secret")
+ .build();
+ assertEquals(ProxyType.NONE, proxy.type);
+ assertNull(proxy.host);
+ assertNull(proxy.username);
+ assertNull(proxy.password);
+ }
+
+ @Test
+ public void testHttpConfig() {
+ Config config = new ConfigBuilder()
+ .put(ExecConstants.HTTP_PROXY_URL, "http://bob:secret@foo.com:1234")
+ .build();
+ HttpProxyConfig proxy = HttpProxyConfig.builder()
+ .fromHttpConfig(config)
+ .build();
+ assertEquals(ProxyType.HTTP, proxy.type);
+ assertEquals("foo.com", proxy.host);
+ assertEquals(1234, proxy.port);
+ assertEquals("bob", proxy.username);
+ assertEquals("secret", proxy.password);
+ }
+
+ @Test
+ public void testHttpUrlConfig() {
+ Config config = new ConfigBuilder()
+ .put(ExecConstants.HTTP_PROXY_URL, "")
+ .put(ExecConstants.HTTP_PROXY_TYPE, "socks")
+ .put(ExecConstants.HTTP_PROXY_HOST, "foo.com")
+ .put(ExecConstants.HTTP_PROXY_PORT, 1234)
+ .put(ExecConstants.HTTP_PROXY_USER_NAME, "bob")
+ .put(ExecConstants.HTTP_PROXY_PASSWORD, "secret")
+ .build();
+ HttpProxyConfig proxy = HttpProxyConfig.builder()
+ .fromHttpConfig(config)
+ .build();
+ assertEquals(ProxyType.SOCKS, proxy.type);
+ assertEquals("foo.com", proxy.host);
+ assertEquals(1234, proxy.port);
+ assertEquals("bob", proxy.username);
+ assertEquals("secret", proxy.password);
+ }
+
+ @Test
+ public void testHttpsUrlConfig() {
+ Config config = new ConfigBuilder()
+ .put(ExecConstants.HTTPS_PROXY_URL, "https://bob:secret@foo.com:1234")
+ .build();
+ HttpProxyConfig proxy = HttpProxyConfig.builder()
+ .fromHttpsConfig(config)
+ .build();
+ assertEquals(ProxyType.HTTP, proxy.type);
+ assertEquals("foo.com", proxy.host);
+ assertEquals(1234, proxy.port);
+ assertEquals("bob", proxy.username);
+ assertEquals("secret", proxy.password);
+ }
+
+ @Test
+ public void testHttpsConfig() {
+ Config config = new ConfigBuilder()
+ .put(ExecConstants.HTTPS_PROXY_URL, "")
+ .put(ExecConstants.HTTPS_PROXY_TYPE, "socks")
+ .put(ExecConstants.HTTPS_PROXY_HOST, "foo.com")
+ .put(ExecConstants.HTTPS_PROXY_PORT, 1234)
+ .put(ExecConstants.HTTPS_PROXY_USER_NAME, "bob")
+ .put(ExecConstants.HTTPS_PROXY_PASSWORD, "secret")
+ .build();
+ HttpProxyConfig proxy = HttpProxyConfig.builder()
+ .fromHttpsConfig(config)
+ .build();
+ assertEquals(ProxyType.SOCKS, proxy.type);
+ assertEquals("foo.com", proxy.host);
+ assertEquals(1234, proxy.port);
+ assertEquals("bob", proxy.username);
+ assertEquals("secret", proxy.password);
+ }
+
+ @Test
+ public void testConfigForUrl() {
+ Config config = new ConfigBuilder()
+ .put(ExecConstants.HTTP_PROXY_URL, "http://bob:secret@foo.com:1234")
+ .put(ExecConstants.HTTPS_PROXY_URL, "http://alice:s3cr3t@bar.com:2345")
+ .build();
+ doTestConfigForUrl(config);
+ }
+
+ private void doTestConfigForUrl(Config config) {
+ HttpProxyConfig proxy = HttpProxyConfig.builder()
+ .fromConfigForURL(config, "http://google.com")
+ .build();
+ assertEquals(ProxyType.HTTP, proxy.type);
+ assertEquals("foo.com", proxy.host);
+ assertEquals(1234, proxy.port);
+ assertEquals("bob", proxy.username);
+ assertEquals("secret", proxy.password);
+
+ proxy = HttpProxyConfig.builder()
+ .fromConfigForURL(config, "https://google.com")
+ .build();
+ assertEquals(ProxyType.HTTP, proxy.type);
+ assertEquals("bar.com", proxy.host);
+ assertEquals(2345, proxy.port);
+ assertEquals("alice", proxy.username);
+ assertEquals("s3cr3t", proxy.password);
+ }
+
+ // To run this test, set two env vars in your run/debug
+ // configuration, then comment out the @Ignore:
+ // http_proxy=http://bob:secret@foo.com:1234
+ // https_proxy=http://alice:s3cr3t@bar.com:2345
+ @Test
+ @Ignore("Requires manual setup")
+ public void testEnvVar() {
+ Config config = new ConfigBuilder()
+ .build();
+ doTestConfigForUrl(config);
+ }
+}
diff --git a/contrib/storage-http/src/test/resources/data/response.json b/contrib/storage-http/src/test/resources/data/response.json
new file mode 100644
index 0000000..bde2d5b
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/response.json
@@ -0,0 +1,14 @@
+{"results":
+ {"sunrise":"6:13:58 AM",
+ "sunset":"5:59:55 PM",
+ "solar_noon":"12:06:56 PM",
+ "day_length":"11:45:57",
+ "civil_twilight_begin":"5:48:14 AM",
+ "civil_twilight_end":"6:25:38 PM",
+ "nautical_twilight_begin":"5:18:16 AM",
+ "nautical_twilight_end":"6:55:36 PM",
+ "astronomical_twilight_begin":"4:48:07 AM",
+ "astronomical_twilight_end":"7:25:45 PM"
+ },
+ "status":"OK"
+}
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
index 35c974d..fea8197 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
@@ -37,7 +37,7 @@ public class KuduStoragePlugin extends AbstractStoragePlugin {
private final KuduClient client;
public KuduStoragePlugin(KuduStoragePluginConfig configuration, DrillbitContext context, String name)
- throws IOException {
+ throws IOException {
super(context, name);
this.schemaFactory = new KuduSchemaFactory(this, name);
this.engineConfig = configuration;
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 420e14e..24c14a3 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -288,6 +288,11 @@
</dependency>
<dependency>
<groupId>org.apache.drill.contrib</groupId>
+ <artifactId>drill-storage-http</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill.contrib</groupId>
<artifactId>drill-opentsdb-storage</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index 2eb50fd..fefbac4 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -49,6 +49,7 @@
<include>org.apache.drill.contrib:drill-jdbc-storage:jar</include>
<include>org.apache.drill.contrib:drill-kudu-storage:jar</include>
<include>org.apache.drill.contrib:drill-storage-kafka:jar</include>
+ <include>org.apache.drill.contrib:drill-storage-http:jar</include>
<include>org.apache.drill.contrib:drill-opentsdb-storage:jar</include>
<include>org.apache.drill.contrib:drill-udfs:jar</include>
</includes>
diff --git a/distribution/src/main/resources/drill-override-example.conf b/distribution/src/main/resources/drill-override-example.conf
index 6fcca37..034307b 100644
--- a/distribution/src/main/resources/drill-override-example.conf
+++ b/distribution/src/main/resources/drill-override-example.conf
@@ -342,6 +342,37 @@ drill.exec: {
#ssl provider. May be "JDK" or "OPENSSL". Default is "JDK"
provider: "JDK"
}
+
+ # HTTP client proxy configuration
+ net_proxy: {
+
+ # HTTP URL. Omit if from a Linux env var
+ # See https://www.shellhacks.com/linux-proxy-server-settings-set-proxy-command-line/
+ http_url: "",
+
+ # Explicit HTTP setup, used if URL is not set
+ http: {
+ type: "none", # none, http, socks. Blank same as none.
+ host: "",
+ port: 80,
+ user_name: "",
+ password: ""
+ },
+
+ # HTTPS URL. Omit if from a Linux env var
+ https_url: "",
+
+ # Explicit HTTPS setup, used if URL is not set
+ https: {
+ type: "none", # none, http, socks. Blank same as none.
+ host: "",
+ port: 80,
+ user_name: "",
+ password: ""
+ }
+ }
+}
+
},
drill.metrics : {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 3a16353..027ebb5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -1211,4 +1211,21 @@ public final class ExecConstants {
ENABLE_DYNAMIC_CREDIT_BASED_FC, new OptionDescription("Enable dynamic credit based flow control.This feature allows " +
"the sender to send out its data more rapidly, but you should know that it has a risk to OOM when the system is solving parallel " +
"large queries until we have a more accurate resource manager."));
+
+ // HTTP proxy configuration (Drill config)
+ public static final String NET_PROXY_BASE = "drill.exec.net_proxy";
+ // HTTP proxy config
+ public static final String HTTP_PROXY_URL = NET_PROXY_BASE + ".http_url";
+ public static final String HTTP_PROXY_TYPE = NET_PROXY_BASE + ".http.type";
+ public static final String HTTP_PROXY_HOST = NET_PROXY_BASE + ".http.host";
+ public static final String HTTP_PROXY_PORT = NET_PROXY_BASE + ".http.port";
+ public static final String HTTP_PROXY_USER_NAME = NET_PROXY_BASE + ".http.user_name";
+ public static final String HTTP_PROXY_PASSWORD = NET_PROXY_BASE + ".http.password";
+ // HTTPS proxy config
+ public static final String HTTPS_PROXY_URL = NET_PROXY_BASE + ".https_url";
+ public static final String HTTPS_PROXY_TYPE = NET_PROXY_BASE + ".https.type";
+ public static final String HTTPS_PROXY_HOST = NET_PROXY_BASE + ".https.host";
+ public static final String HTTPS_PROXY_PORT = NET_PROXY_BASE + ".https.port";
+ public static final String HTTPS_PROXY_USER_NAME = NET_PROXY_BASE + ".https.user_name";
+ public static final String HTTPS_PROXY_PASSWORD = NET_PROXY_BASE + ".https.password";
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 0ab4181..731bf2d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -70,6 +70,7 @@ public class JSONRecordReader extends AbstractRecordReader {
private final boolean skipMalformedJSONRecords;
private final boolean printSkippedMalformedJSONRecordLineNumber;
private ReadState write;
+ private InputStream inputStream;
/**
* Create a JSON Record Reader that uses a file based input stream.
@@ -81,7 +82,7 @@ public class JSONRecordReader extends AbstractRecordReader {
*/
public JSONRecordReader(FragmentContext fragmentContext, Path inputPath, DrillFileSystem fileSystem,
List<SchemaPath> columns) throws OutOfMemoryException {
- this(fragmentContext, inputPath, null, fileSystem, columns);
+ this(fragmentContext, inputPath, null, fileSystem, columns, false);
}
/**
@@ -94,16 +95,28 @@ public class JSONRecordReader extends AbstractRecordReader {
*/
public JSONRecordReader(FragmentContext fragmentContext, JsonNode embeddedContent, DrillFileSystem fileSystem,
List<SchemaPath> columns) throws OutOfMemoryException {
- this(fragmentContext, null, embeddedContent, fileSystem, columns);
+ this(fragmentContext, null, embeddedContent, fileSystem, columns, false);
+ }
+
+ /**
+ * Create a JSON Record Reader that uses an InputStream directly
+ * @param fragmentContext The Drill Fragmement
+ * @param inputStream The inputStream from which data will be received
+ * @param columns pathnames of columns/subfields to read
+ * @throws OutOfMemoryException
+ */
+ public JSONRecordReader(FragmentContext fragmentContext, List<SchemaPath> columns) throws OutOfMemoryException {
+ this(fragmentContext, null, null, null, columns, true);
}
private JSONRecordReader(FragmentContext fragmentContext, Path inputPath, JsonNode embeddedContent,
- DrillFileSystem fileSystem, List<SchemaPath> columns) {
+ DrillFileSystem fileSystem, List<SchemaPath> columns, boolean hasInputStream) {
Preconditions.checkArgument(
- (inputPath == null && embeddedContent != null) ||
- (inputPath != null && embeddedContent == null),
- "One of inputPath or embeddedContent must be set but not both."
+ (inputPath == null && embeddedContent != null && !hasInputStream) ||
+ (inputPath != null && embeddedContent == null && !hasInputStream) ||
+ (inputPath == null && embeddedContent == null && hasInputStream),
+ "One of inputPath, inputStream or embeddedContent must be set but not all."
);
if (inputPath != null) {
@@ -170,6 +183,8 @@ public class JSONRecordReader extends AbstractRecordReader {
private void setupParser() throws IOException {
if (hadoopPath != null) {
jsonReader.setSource(stream);
+ } else if (inputStream!= null) {
+ jsonReader.setSource(inputStream);
} else {
jsonReader.setSource(embeddedContent);
}
@@ -253,11 +268,20 @@ public class JSONRecordReader extends AbstractRecordReader {
runningRecordCount += recordCount;
}
+ public void setInputStream(InputStream in) {
+ this.inputStream = in;
+ }
+
@Override
public void close() throws Exception {
if (stream != null) {
stream.close();
stream = null;
}
+
+ if (inputStream != null) {
+ inputStream.close();
+ inputStream = null;
+ }
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 4dd146e..1ce89fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -63,7 +63,7 @@ public class JsonReader extends BaseJsonReader {
private final FieldSelection selection;
- private JsonReader(Builder builder) {
+ public JsonReader(Builder builder) {
super(builder.managedBuf, builder.enableNanInf, builder.enableEscapeAnyChar, builder.skipOuterList);
selection = FieldSelection.getFieldSelection(builder.columns);
workingBuffer = builder.workingBuffer;
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 46ae53a..01f454d 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -458,10 +458,49 @@ drill.exec: {
# to have a grace period that is atleast twice the amount of zookeeper
# refresh time.
grace_period_ms : 0,
+
# port hunting for drillbits. Enabled only for testing purposes.
port_hunt : false,
+
# Allow drillbit to bind to loopback address in distributed mode. Enabled only for testing purposes.
allow_loopback_address_binding : false
+
+ # HTTP client proxy configuration
+ net_proxy: {
+
+ # HTTP URL, usually from a Linux env var
+ # See https://www.shellhacks.com/linux-proxy-server-settings-set-proxy-command-line/
+ http_url: "",
+ http_url: ${?HTTP_PROXY},
+ http_url: ${?http_proxy},
+ http_url: ${?all_proxy},
+ http_url: ${?ALL_PROXY},
+
+ # Explicit HTTP setup, used if URL is not set
+ http: {
+ type: "none", # none, http, socks. Blank same as none.
+ host: "",
+ port: 80,
+ user_name: "",
+ password: ""
+ },
+
+ # HTTPS URL, usually from a Linux env var
+ https_url: "",
+ https_url: ${?HTTPS_PROXY},
+ https_url: ${?https_proxy},
+ https_url: ${?all_proxy},
+ https_url: ${?ALL_PROXY},
+
+ # Explicit HTTPS setup, used if URL is not set
+ https: {
+ type: "none", # none, http, socks. Blank same as none.
+ host: "",
+ port: 80,
+ user_name: "",
+ password: ""
+ }
+ }
}
drill.jdbc: {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserBasics.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserBasics.java
index d51e1a8..f8006cc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserBasics.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserBasics.java
@@ -109,7 +109,7 @@ public class TestJsonParserBasics extends BaseTestJsonParser {
@Test
public void testExtendedFloat() {
final String json =
- "{a: NaN} {a: Infinity} {a: -Infinity}";
+ "{a: NaN} {a: Infinity} {a: -Infinity}";
JsonParserFixture fixture = new JsonParserFixture();
fixture.options.allowNanInf = true;
fixture.open(json);
@@ -158,8 +158,8 @@ public class TestJsonParserBasics extends BaseTestJsonParser {
public void testRootTuple() {
final String json =
"{id: 1, name: \"Fred\", balance: 100.0}\n" +
- "{id: 2, name: \"Barney\"}\n" +
- "{id: 3, name: \"Wilma\", balance: 500.00}";
+ "{id: 2, name: \"Barney\"}\n" +
+ "{id: 3, name: \"Wilma\", balance: 500.00}";
JsonParserFixture fixture = new JsonParserFixture();
fixture.open(json);
assertEquals(3, fixture.read());
@@ -235,7 +235,7 @@ public class TestJsonParserBasics extends BaseTestJsonParser {
@Test
public void testProjection() {
final String json =
- "{a: 1, b: [[{x: [[{y: []}]]}]]}\n" +
+ "{a: 1, b: [[{x: [[{y: []}]]}]]}\n" +
"{a: 2}\n" +
"{b: \"bar\"}";
JsonParserFixture fixture = new JsonParserFixture();
@@ -254,13 +254,13 @@ public class TestJsonParserBasics extends BaseTestJsonParser {
@Test
public void testAllTextMode() {
final String json =
- "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}";
+ "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}";
JsonParserFixture fixture = new JsonParserFixture();
fixture.options.allTextMode = true;
fixture.open(json);
fixture.expect("a",
- new Object[] {"1", "foo", "true", "20.5", null});
+ new Object[] {"1", "foo", "true", "20.5", null});
assertFalse(fixture.next());
fixture.close();
}
@@ -268,13 +268,13 @@ public class TestJsonParserBasics extends BaseTestJsonParser {
@Test
public void testColumnTextMode() {
final String json =
- "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}";
+ "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}";
JsonParserFixture fixture = new JsonParserFixture();
fixture.rootObject.fieldType = FieldType.TEXT;
fixture.open(json);
fixture.expect("a",
- new Object[] {"1", "foo", "true", "20.5", null});
+ new Object[] {"1", "foo", "true", "20.5", null});
assertFalse(fixture.next());
fixture.close();
}
@@ -282,13 +282,13 @@ public class TestJsonParserBasics extends BaseTestJsonParser {
@Test
public void testJsonModeScalars() {
final String json =
- "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}";
+ "{a: 1} {a: \"foo\"} {a: true} {a: 20.5} {a: null}";
JsonParserFixture fixture = new JsonParserFixture();
fixture.rootObject.fieldType = FieldType.JSON;
fixture.open(json);
fixture.expect("a",
- new Object[] {"1", "\"foo\"", "true", "20.5", "null"});
+ new Object[] {"1", "\"foo\"", "true", "20.5", "null"});
assertFalse(fixture.next());
fixture.close();
}
@@ -296,15 +296,15 @@ public class TestJsonParserBasics extends BaseTestJsonParser {
@Test
public void testJsonModeArrays() {
final String json =
- "{a: []} {a: [null]} {a: [null, null]} {a: [[]]}\n" +
+ "{a: []} {a: [null]} {a: [null, null]} {a: [[]]}\n" +
"{a: [1, \"foo\", true]} {a: [[1, 2], [3, 4]]}\n";
JsonParserFixture fixture = new JsonParserFixture();
fixture.rootObject.fieldType = FieldType.JSON;
fixture.open(json);
fixture.expect("a",
- new Object[] {"[]", "[null]", "[null, null]", "[[]]",
- "[1, \"foo\", true]", "[[1, 2], [3, 4]]"});
+ new Object[] {"[]", "[null]", "[null, null]", "[[]]",
+ "[1, \"foo\", true]", "[[1, 2], [3, 4]]"});
assertFalse(fixture.next());
fixture.close();
}
@@ -312,15 +312,15 @@ public class TestJsonParserBasics extends BaseTestJsonParser {
@Test
public void testJsonModeObjects() {
final String json =
- "{a: {}} {a: {b: null}} {a: {b: null, b: null}}\n" +
+ "{a: {}} {a: {b: null}} {a: {b: null, b: null}}\n" +
"{a: {b: {c: {d: [{e: 10}, null, 20], f: \"foo\"}, g:30}, h: 40}}\n";
JsonParserFixture fixture = new JsonParserFixture();
fixture.rootObject.fieldType = FieldType.JSON;
fixture.open(json);
fixture.expect("a",
- new Object[] {"{}", "{\"b\": null}", "{\"b\": null, \"b\": null}",
- "{\"b\": {\"c\": {\"d\": [{\"e\": 10}, null, 20], \"f\": \"foo\"}, \"g\": 30}, \"h\": 40}"});
+ new Object[] {"{}", "{\"b\": null}", "{\"b\": null, \"b\": null}",
+ "{\"b\": {\"c\": {\"d\": [{\"e\": 10}, null, 20], \"f\": \"foo\"}, \"g\": 30}, \"h\": 40}"});
assertFalse(fixture.next());
fixture.close();
}
diff --git a/logical/src/main/java/org/apache/drill/common/expression/SchemaPath.java b/logical/src/main/java/org/apache/drill/common/expression/SchemaPath.java
index 0b5292d..1f4cfee 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/SchemaPath.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/SchemaPath.java
@@ -322,6 +322,27 @@ public class SchemaPath extends LogicalExpressionBase {
return rootSegment;
}
+ public String getAsUnescapedPath() {
+ StringBuilder sb = new StringBuilder();
+ PathSegment seg = getRootSegment();
+ if (seg.isArray()) {
+ throw new IllegalStateException("Drill doesn't currently support top level arrays");
+ }
+ sb.append(seg.getNameSegment().getPath());
+
+ while ( (seg = seg.getChild()) != null) {
+ if (seg.isNamed()) {
+ sb.append('.');
+ sb.append(seg.getNameSegment().getPath());
+ } else {
+ sb.append('[');
+ sb.append(seg.getArraySegment().getIndex());
+ sb.append(']');
+ }
+ }
+ return sb.toString();
+ }
+
@Override
public MajorType getMajorType() {
return Types.LATE_BIND_TYPE;
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 2583595..af148ef 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -667,6 +667,10 @@ public final class UserBitShared {
* <code>METADATA_CONTROLLER = 67;</code>
*/
METADATA_CONTROLLER(67),
+ /**
+ * <code>HTTP_SUB_SCAN = 70;</code>
+ */
+ HTTP_SUB_SCAN(70),
;
/**
@@ -941,6 +945,10 @@ public final class UserBitShared {
* <code>METADATA_CONTROLLER = 67;</code>
*/
public static final int METADATA_CONTROLLER_VALUE = 67;
+ /**
+ * <code>HTTP_SUB_SCAN = 70;</code>
+ */
+ public static final int HTTP_SUB_SCAN_VALUE = 70;
public final int getNumber() {
@@ -1025,6 +1033,7 @@ public final class UserBitShared {
case 65: return SHP_SUB_SCAN;
case 66: return METADATA_HANDLER;
case 67: return METADATA_CONTROLLER;
+ case 70: return HTTP_SUB_SCAN;
default: return null;
}
}
@@ -27924,7 +27933,7 @@ public final class UserBitShared {
"ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000" +
"\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014" +
"\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022" +
- "\032\n\026CANCELLATION_REQUESTED\020\006*\344\n\n\020CoreOper" +
+ "\032\n\026CANCELLATION_REQUESTED\020\006*\367\n\n\020CoreOper" +
"atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST" +
"_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020" +
"\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH" +
@@ -27959,11 +27968,11 @@ public final class UserBitShared {
"MERGE\020=\022\021\n\rLTSV_SUB_SCAN\020>\022\021\n\rHDF5_SUB_S" +
"CAN\020?\022\022\n\016EXCEL_SUB_SCAN\020@\022\020\n\014SHP_SUB_SCA" +
"N\020A\022\024\n\020METADATA_HANDLER\020B\022\027\n\023METADATA_CO" +
- "NTROLLER\020C*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN" +
- "\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002" +
- "\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033o" +
- "rg.apache.drill.exec.protoB\rUserBitShare" +
- "dH\001"
+ "NTROLLER\020C\022\021\n\rHTTP_SUB_SCAN\020F*g\n\nSaslSta" +
+ "tus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n" +
+ "\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n" +
+ "\013SASL_FAILED\020\004B.\n\033org.apache.drill.exec." +
+ "protoB\rUserBitSharedH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() {
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 55cfde5..c51cc66 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -379,6 +379,7 @@ enum CoreOperatorType {
SHP_SUB_SCAN = 65;
METADATA_HANDLER = 66;
METADATA_CONTROLLER = 67;
+ HTTP_SUB_SCAN = 70;
}
/* Registry that contains list of jars, each jar contains its name and list of function signatures.