You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2021/08/25 20:28:25 UTC
[drill] branch master updated: DRILL-7970: Add URL Parameters to
HTTP Plugin (#2270)
This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new bf2b0d7 DRILL-7970: Add URL Parameters to HTTP Plugin (#2270)
bf2b0d7 is described below
commit bf2b0d79e43bf65448557510a7b39f17c428df78
Author: Charles S. Givre <cg...@apache.org>
AuthorDate: Wed Aug 25 16:28:20 2021 -0400
DRILL-7970: Add URL Parameters to HTTP Plugin (#2270)
* Initial Commit
* URL modification methods working
* Working, Need tests
* Code working
* Code cleanup
* WIP: Found Param Bug
* WIP: Found Param Bug
* Working, Adding Unit Tests
* Unit test working
* Initial Commit
* URL modification methods working
* Working, Need tests
* Code working
* Code cleanup
* WIP: Found Param Bug
* Working, Adding Unit Tests
* Unit test working
* Removed unused imports
* WIP
* Converted String manipulation to regex
* Added tests for hasLimit
* Addressed review comments
* Updated README
* Fixed regex
* Added Exception for Blank Default Param
* Updated docs
* Fixed Merge Conflicts
---
contrib/storage-http/README.md | 24 ++
.../drill/exec/store/http/HttpApiConfig.java | 6 +
.../drill/exec/store/http/HttpBatchReader.java | 32 ++-
.../drill/exec/store/http/HttpCSVBatchReader.java | 14 +-
.../drill/exec/store/http/HttpGroupScan.java | 4 +-
.../exec/store/http/HttpPushDownListener.java | 25 +-
.../drill/exec/store/http/HttpXMLBatchReader.java | 8 +-
.../drill/exec/store/http/util/SimpleHttp.java | 158 +++++++++++-
.../drill/exec/store/http/TestHttpPlugin.java | 270 ++++++++++++++++++++-
.../drill/exec/store/http/TestURLParameters.java | 135 +++++++++++
.../store/easy/json/loader/JsonLoaderImpl.java | 5 +
11 files changed, 651 insertions(+), 30 deletions(-)
diff --git a/contrib/storage-http/README.md b/contrib/storage-http/README.md
index 9a5e048..427ed83 100644
--- a/contrib/storage-http/README.md
+++ b/contrib/storage-http/README.md
@@ -45,6 +45,30 @@ The `connection` property can accept the following options.
`url`: The base URL which Drill will query.
+##### Parameters in the URL
+Many APIs require parameters to be passed directly in the URL instead of as query arguments. For example, github's API allows you to query an organization's repositories with the following
+URL: https://github.com/orgs/{org}/repos
+
+As of Drill 1.20.0, you can simply set the URL in the connection using the curly braces. If your API includes URL parameters you must include them in the `WHERE` clause in your
+query, or specify a default value in the configuration.
+
+As an example, the API above, you would have to query as shown below:
+
+```sql
+SELECT *
+FROM api.github
+WHERE org = 'apache'
+```
+
+This query would replace the `org`in the URL with the value from the `WHERE` clause, in this case `apache`. You can specify a default value as follows: `https://someapi.com/
+{param1}/{param2=default}`. In this case, the default would be used if and only if there isn't a parameter supplied in the query.
+
+#### Limitations on URL Parameters
+* Drill does not support boolean expressions of URL parameters in queries. For instance, for the above example, if you were to include `WHERE org='apache' OR org='linux'`,
+ these parameters could not be pushed down in the current state.
+* All URL parameter clauses must be equality only.
+
+### Passing Parameters in the Query
`requireTail`: Set to `true` if the query must contain an additional part of the service
URL as a table name, `false` if the URL needs no additional suffix other than that
provided by `WHERE` clause filters. (See below.)
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
index 325088a..11b96c9 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
@@ -30,6 +30,7 @@ import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
+import okhttp3.HttpUrl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.exceptions.UserException;
@@ -178,6 +179,11 @@ public class HttpApiConfig {
}
@JsonIgnore
+ public HttpUrl getHttpUrl() {
+ return HttpUrl.parse(this.url);
+ }
+
+ @JsonIgnore
public HttpMethod getMethodType() {
return HttpMethod.valueOf(this.method);
}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index 1cc70f6..921210c 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -24,6 +24,7 @@ import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.ChildErrorContext;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
@@ -88,8 +89,12 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
// JSON loader setup
ResultSetLoader loader = negotiator.build();
- implicitColumns = new ImplicitColumns(loader.writer());
- buildImplicitColumns();
+
+
+ if (implicitColumnsAreProjected()) {
+ implicitColumns = new ImplicitColumns(loader.writer());
+ buildImplicitColumns();
+ }
InputStream inStream = http.getInputStream();
populateImplicitFieldMap(http);
@@ -123,12 +128,30 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
}
protected void populateImplicitFieldMap(SimpleHttp http) {
+ // If the implicit fields are not projected, skip this step
+ if (! implicitColumnsAreProjected()) {
+ return;
+ }
+
implicitColumns.getColumn(STRING_METADATA_FIELDS[0]).setValue(http.getResponseMessage());
implicitColumns.getColumn(STRING_METADATA_FIELDS[1]).setValue(http.getResponseProtocol());
implicitColumns.getColumn(STRING_METADATA_FIELDS[2]).setValue(http.getResponseURL());
implicitColumns.getColumn(RESPONSE_CODE_FIELD).setValue(http.getResponseCode());
}
+ protected boolean implicitColumnsAreProjected() {
+ List<SchemaPath> columns = subScan.columns();
+ for (SchemaPath path : columns) {
+ if (path.nameEquals(STRING_METADATA_FIELDS[0]) ||
+ path.nameEquals(STRING_METADATA_FIELDS[1]) ||
+ path.nameEquals(STRING_METADATA_FIELDS[2]) ||
+ path.nameEquals(RESPONSE_CODE_FIELD)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
protected HttpUrl buildUrl() {
HttpApiConfig apiConfig = subScan.tableSpec().connectionConfig();
String baseUrl = apiConfig.url();
@@ -137,6 +160,11 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
if (subScan.tableSpec().tableName() != null) {
baseUrl += subScan.tableSpec().tableName();
}
+
+ // Add URL Parameters to baseURL
+ HttpUrl parsedURL = HttpUrl.parse(baseUrl);
+ baseUrl = SimpleHttp.mapURLParameters(parsedURL, subScan.filters());
+
HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl).newBuilder();
if (apiConfig.params() != null && !apiConfig.params().isEmpty() &&
subScan.filters() != null) {
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
index cffa7af..ae78252 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
@@ -102,9 +102,11 @@ public class HttpCSVBatchReader extends HttpBatchReader {
ResultSetLoader resultLoader = negotiator.build();
// Add implicit columns
- implicitColumns = new ImplicitColumns(resultLoader.writer());
- buildImplicitColumns();
- populateImplicitFieldMap(http);
+ if (implicitColumnsAreProjected()) {
+ implicitColumns = new ImplicitColumns(resultLoader.writer());
+ buildImplicitColumns();
+ populateImplicitFieldMap(http);
+ }
// Create ScalarWriters
rowWriter = resultLoader.writer();
@@ -166,7 +168,11 @@ public class HttpCSVBatchReader extends HttpBatchReader {
for (StringColumnWriter columnWriter : columnWriters) {
columnWriter.load(nextRow);
}
- implicitColumns.writeImplicitColumns();
+
+ if (implicitColumnsAreProjected()) {
+ implicitColumns.writeImplicitColumns();
+ }
+
rowWriter.save();
return true;
}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
index 4f2d9e1..d91d618 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.http.util.SimpleHttp;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -288,7 +289,8 @@ public class HttpGroupScan extends AbstractGroupScan {
@JsonIgnore
public boolean allowsFilters() {
- return getHttpConfig().params() != null;
+ // Return true if the query has either parameters specified in the URL or URL params.
+ return (getHttpConfig().params() != null) || SimpleHttp.hasURLParameters(getHttpConfig().getHttpUrl());
}
@Override
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPushDownListener.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPushDownListener.java
index 49c1154..790c42b 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPushDownListener.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPushDownListener.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.http;
+import okhttp3.HttpUrl;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.Pair;
import org.apache.drill.common.map.CaseInsensitiveMap;
@@ -30,9 +31,9 @@ import org.apache.drill.exec.store.base.filter.ExprNode.ColRelOpConstNode;
import org.apache.drill.exec.store.base.filter.ExprNode.OrNode;
import org.apache.drill.exec.store.base.filter.FilterPushDownListener;
import org.apache.drill.exec.store.base.filter.FilterPushDownStrategy;
+import org.apache.drill.exec.store.http.util.SimpleHttp;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -45,6 +46,10 @@ import java.util.Set;
* <li>An AND'ed set of such expressions,</li>
* <li>If the value is one with an unambiguous conversion to
* a string. (That is, not dates, binary, maps, etc.)</li>
+ * <li> A field from the URL. For instance in some APIs you have parameters that are part of the path.
+ * For example, https://github.com/orgs/{org}/repos. In this instance, the query must contain
+ * a parameter called org.
+ * </li>
* </ul>
*/
public class HttpPushDownListener implements FilterPushDownListener {
@@ -82,8 +87,20 @@ public class HttpPushDownListener implements FilterPushDownListener {
HttpScanPushDownListener(HttpGroupScan groupScan) {
this.groupScan = groupScan;
- for (String field : groupScan.getHttpConfig().params()) {
- filterParams.put(field, field);
+ // Add fields from config
+ if (groupScan.getHttpConfig().params() != null) {
+ for (String field : groupScan.getHttpConfig().params()) {
+ filterParams.put(field, field);
+ }
+ }
+
+ // Add fields from the URL path as denoted by {}
+ HttpUrl url = HttpUrl.parse(groupScan.getHttpConfig().url());
+ if (url != null) {
+ List<String> urlParams = SimpleHttp.getURLParameters(url);
+ for (String urlField : urlParams) {
+ filterParams.put(urlField, urlField);
+ }
}
}
@@ -144,7 +161,7 @@ public class HttpPushDownListener implements FilterPushDownListener {
*/
@Override
public Pair<GroupScan, List<RexNode>> transform(AndNode andNode) {
- Map<String, String> filters = new HashMap<>();
+ Map<String, String> filters = CaseInsensitiveMap.newHashMap();
double selectivity = 1;
for (ExprNode expr : andNode.children) {
ColRelOpConstNode relOp = (ColRelOpConstNode) expr;
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
index b995eca..e62f997 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
@@ -86,9 +86,11 @@ public class HttpXMLBatchReader extends HttpBatchReader {
xmlReader = new XMLReader(inStream, dataLevel, maxRecords);
ResultSetLoader resultLoader = negotiator.build();
- implicitColumns = new ImplicitColumns(resultLoader.writer());
- buildImplicitColumns();
- populateImplicitFieldMap(http);
+ if (implicitColumnsAreProjected()) {
+ implicitColumns = new ImplicitColumns(resultLoader.writer());
+ buildImplicitColumns();
+ populateImplicitFieldMap(http);
+ }
RowSetLoader rootRowWriter = resultLoader.writer();
xmlReader.open(rootRowWriter, errorContext);
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index db2e829..486117a 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -28,6 +28,8 @@ import okhttp3.OkHttpClient.Builder;
import okhttp3.Request;
import okhttp3.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.store.http.HttpApiConfig;
@@ -40,11 +42,16 @@ import org.jetbrains.annotations.NotNull;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.Proxy;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -55,6 +62,7 @@ import java.util.regex.Pattern;
*/
@Slf4j
public class SimpleHttp {
+ private static final Pattern URL_PARAM_REGEX = Pattern.compile("\\{(\\w+)(?:=(\\w*))?\\}");
private final OkHttpClient client;
private final HttpSubScan scanDefn;
@@ -137,7 +145,9 @@ public class SimpleHttp {
return builder.build();
}
- public String url() { return url.toString(); }
+ public String url() {
+ return url.toString();
+ }
public InputStream getInputStream() {
@@ -180,7 +190,7 @@ public class SimpleHttp {
responseURL = response.request().url().toString();
// If the request is unsuccessful, throw a UserException
- if (! isSuccessful(responseCode)) {
+ if (!isSuccessful(responseCode)) {
throw UserException
.dataReadError()
.message("HTTP request failed")
@@ -209,21 +219,23 @@ public class SimpleHttp {
* with okhttp3. The issue is that in some cases, a user may not want Drill to throw
* errors on 400 response codes. This function will return true/false depending on the
* configuration for the specific connection.
+ *
* @param responseCode An int of the connection code
* @return True if the response code is 200-299 and possibly 400-499, false if other
*/
private boolean isSuccessful(int responseCode) {
if (scanDefn.tableSpec().connectionConfig().errorOn400()) {
- return ((responseCode >= 200 && responseCode <=299) ||
- (responseCode >= 400 && responseCode <=499));
+ return responseCode >= 200 && responseCode <= 299;
} else {
- return responseCode >= 200 && responseCode <=299;
+ return ((responseCode >= 200 && responseCode <= 299) ||
+ (responseCode >= 400 && responseCode <= 499));
}
}
/**
* Gets the HTTP response code from the HTTP call. Note that this value
* is only available after the getInputStream() method has been called.
+ *
* @return int value of the HTTP response code
*/
public int getResponseCode() {
@@ -233,6 +245,7 @@ public class SimpleHttp {
/**
* Gets the HTTP response code from the HTTP call. Note that this value
* is only available after the getInputStream() method has been called.
+ *
* @return int of HTTP response code
*/
public String getResponseMessage() {
@@ -242,6 +255,7 @@ public class SimpleHttp {
/**
* Gets the HTTP response code from the HTTP call. Note that this value
* is only available after the getInputStream() method has been called.
+ *
* @return The HTTP response protocol
*/
public String getResponseProtocol() {
@@ -251,6 +265,7 @@ public class SimpleHttp {
/**
* Gets the HTTP response code from the HTTP call. Note that this value
* is only available after the getInputStream() method has been called.
+ *
* @return The HTTP response URL
*/
public String getResponseURL() {
@@ -260,9 +275,8 @@ public class SimpleHttp {
/**
* Configures response caching using a provided temp directory.
*
- * @param builder
- * Builder the Builder object to which the caching is to be
- * configured
+ * @param builder Builder the Builder object to which the caching is to be
+ * configured
*/
private void setupCache(Builder builder) {
int cacheSize = 10 * 1024 * 1024; // TODO Add cache size in MB to config
@@ -307,7 +321,7 @@ public class SimpleHttp {
FormBody.Builder formBodyBuilder = new FormBody.Builder();
String[] lines = postBody.split("\\r?\\n");
- for(String line : lines) {
+ for (String line : lines) {
// If the string is in the format key=value split it,
// Otherwise ignore
@@ -321,6 +335,132 @@ public class SimpleHttp {
}
/**
+ * Returns the URL-decoded URL. If the URL is invalid, return the original URL.
+ *
+ * @return Returns the URL-decoded URL
+ */
+ public static String decodedURL(HttpUrl url) {
+ try {
+ return URLDecoder.decode(url.toString(), "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ return url.toString();
+ }
+ }
+
+ /**
+ * Returns true if the url has url parameters, as indicated by the presence of
+ * {param} in a url.
+ *
+ * @return True if there are URL params, false if not
+ */
+ public static boolean hasURLParameters(HttpUrl url) {
+ String decodedUrl = SimpleHttp.decodedURL(url);
+ Matcher matcher = URL_PARAM_REGEX.matcher(decodedUrl);
+ return matcher.find();
+ }
+
+ /**
+ * APIs are sometimes structured with parameters in the URL itself. For instance, to request a list of
+ * an organization's repositories in github, the URL is: https://api.github.com/orgs/{org}/repos, where
+ * you can replace the org with the actual organization name.
+ *
+ * @return A list of URL parameters enclosed by curly braces.
+ */
+ public static List<String> getURLParameters(HttpUrl url) {
+ String decodedURL = decodedURL(url);
+ Matcher matcher = URL_PARAM_REGEX.matcher(decodedURL);
+ List<String> parameters = new ArrayList<>();
+ while (matcher.find()) {
+ String param = matcher.group(1);
+ parameters.add(param);
+ }
+ return parameters;
+ }
+
+ /**
+ * This function is used to extract the default parameter supplied in a URL. For instance,
+ * if the supplied URL is http://someapi.com/path/{p1=foo}, the function will return foo. If there
+ * is not a matching parameter or no default value, the function will return null.
+ * @param url The URL containing a default parameter
+ * @param parameter The parameter for which you need the value
+ * @return The value for the supplied parameter
+ */
+ public static String getDefaultParameterValue (HttpUrl url, String parameter) {
+ String decodedURL = decodedURL(url);
+ Pattern paramRegex = Pattern.compile("\\{" + parameter + "=(\\w+?)\\}");
+ Matcher paramMatcher = paramRegex.matcher(decodedURL);
+ if (paramMatcher.find()) {
+ return paramMatcher.group(1);
+ } else {
+ throw UserException
+ .validationError()
+ .message("Default URL parameters must have a value. The parameter " + parameter + " is not defined in the configuration.")
+ .build(logger);
+ }
+ }
+
+ /**
+ * Used for APIs which have parameters in the URL. This function maps the filters pushed down
+ * from the query into the URL. For example the API: github.com/orgs/{org}/repos requires a user to
+ * specify an organization and replace {org} with an actual organization. The filter is passed down from
+ * the query.
+ *
+ * Note that if a URL contains URL parameters and one is not provided in the filters, Drill will throw
+ * a UserException.
+ *
+ * @param url The HttpUrl containing URL Parameters
+ * @param filters A CaseInsensitiveMap of filters
+ * @return A string of the URL with the URL parameters replaced by filter values
+ */
+ public static String mapURLParameters(HttpUrl url, Map<String, String> filters) {
+ if (!hasURLParameters(url)) {
+ return url.toString();
+ }
+
+ if (filters == null) {
+ throw UserException
+ .parseError()
+ .message("API Query with URL Parameters must be populated.")
+ .build(logger);
+ }
+ CaseInsensitiveMap<String>caseInsensitiveFilterMap = (CaseInsensitiveMap<String>)filters;
+
+ List<String> params = SimpleHttp.getURLParameters(url);
+ String tempUrl = SimpleHttp.decodedURL(url);
+ for (String param : params) {
+
+ // The null check here verify that IF the user has configured the API with URL Parameters that:
+ // 1. The filter was pushed down IE: The user put something in the WHERE clause that corresponds to the
+ // parameter
+ // 2. There is a value associated with that parameter. Strictly speaking, the second check is not
+ // necessary as I don't think Calcite or Drill will push down an empty filter, but for the sake
+ // of providing helpful errors in strange cases, it is there.
+
+
+ String value = caseInsensitiveFilterMap.get(param);
+
+ // Check and see if there is a default for this parameter. If not throw an error.
+ if (StringUtils.isEmpty(value)) {
+ String defaultValue = getDefaultParameterValue(url, param);
+ if (! StringUtils.isEmpty(defaultValue)) {
+ tempUrl = tempUrl.replace("/{" + param + "=" + defaultValue + "}", "/" + defaultValue);
+ } else {
+ throw UserException
+ .parseError()
+ .message("API Query with URL Parameters must be populated. Parameter " + param + " must be included in WHERE clause.")
+ .build(logger);
+ }
+ } else {
+ // Note that if the user has a URL with duplicate parameters, both will be replaced. IE:
+ // someapi.com/{p1}/{p1}/something In this case, both p1 parameters will be replaced with
+ // the value.
+ tempUrl = tempUrl.replace("{" + param + "}", value);
+ }
+ }
+ return tempUrl;
+ }
+
+ /**
* Intercepts requests and adds authentication headers to the request
*/
public static class BasicAuthInterceptor implements Interceptor {
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index 4f73db8..57510f3 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -21,6 +21,7 @@ package org.apache.drill.exec.store.http;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.util.DrillFileUtils;
@@ -110,10 +111,18 @@ public class TestHttpPlugin extends ClusterTest {
.method("get")
.build();
+ HttpApiConfig pokemonConfig = HttpApiConfig.builder()
+ .url("https://pokeapi.co/api/v2/pokemon/{pokemon_name}")
+ .method("get")
+ .inputType("json")
+ .requireTail(false)
+ .build();
+
Map<String, HttpApiConfig> configs = new HashMap<>();
configs.put("stock", stockConfig);
configs.put("sunrise", sunriseConfig);
configs.put("sunrise2", sunriseWithParamsConfig);
+ configs.put("pokemon", pokemonConfig);
HttpStoragePluginConfig mockStorageConfigWithWorkspace =
new HttpStoragePluginConfig(false, configs, 10, "", 80, "", "", "", PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
@@ -144,6 +153,7 @@ public class TestHttpPlugin extends ClusterTest {
UsernamePasswordCredentials.USERNAME, "user",
UsernamePasswordCredentials.PASSWORD, "pass")))
.dataPath("results")
+ .errorOn400(true)
.build();
// Use the mock server with the HTTP parameters passed as WHERE
@@ -193,12 +203,43 @@ public class TestHttpPlugin extends ClusterTest {
.xmlDataLevel(2)
.build();
+ HttpApiConfig mockGithubWithParam = HttpApiConfig.builder()
+ .url("http://localhost:8091/orgs/{org}/repos")
+ .method("GET")
+ .headers(headers)
+ .params(Arrays.asList("lat", "lng", "date"))
+ .dataPath("results")
+ .requireTail(false)
+ .build();
+
+ HttpApiConfig mockGithubWithDuplicateParam = HttpApiConfig.builder()
+ .url("http://localhost:8091/orgs/{org}/repos")
+ .method("GET")
+ .headers(headers)
+ .params(Arrays.asList("org", "lng", "date"))
+ .dataPath("results")
+ .requireTail(false)
+ .build();
+
+ HttpApiConfig mockGithubWithParamInQuery = HttpApiConfig.builder()
+ .url("http://localhost:8091/orgs/{org}/repos?p1={p1}")
+ .method("GET")
+ .headers(headers)
+ .params(Arrays.asList("p2", "p3"))
+ .dataPath("results")
+ .requireTail(false)
+ .build();
+
+
Map<String, HttpApiConfig> configs = new HashMap<>();
configs.put("sunrise", mockSchema);
configs.put("mocktable", mockTable);
configs.put("mockpost", mockPostConfig);
configs.put("mockcsv", mockCsvConfig);
configs.put("mockxml", mockXmlConfig);
+ configs.put("github", mockGithubWithParam);
+ configs.put("github2", mockGithubWithDuplicateParam);
+ configs.put("github3", mockGithubWithParamInQuery);
HttpStoragePluginConfig mockStorageConfigWithWorkspace =
new HttpStoragePluginConfig(false, configs, 2, "", 80, "", "", "", PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
@@ -285,9 +326,8 @@ public class TestHttpPlugin extends ClusterTest {
.build();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57",
- "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM",
- "4:48:07 AM", "7:25:45 PM"), "OK")
+ .addRow(mapValue("6:12:17 AM", "6:01:54 PM", "12:07:06 PM", "11:49:37",
+ "5:47:49 AM", "6:26:22 PM", "5:17:51 AM", "6:56:21 PM", "4:47:41 AM", "7:26:31 PM"), "OK")
.build();
RowSetUtilities.verify(expected, results);
@@ -297,7 +337,7 @@ public class TestHttpPlugin extends ClusterTest {
* As above, but we return only the contents of {@code results}, and use
* filter push-down for the arguments.
*
- * @throws Exception
+ * @throws Exception if anything goes wrong
*/
@Test
@Ignore("Requires Remote Server")
@@ -322,8 +362,8 @@ public class TestHttpPlugin extends ClusterTest {
.build();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM",
- "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM")
+ .addRow("6:12:17 AM", "6:01:54 PM", "12:07:06 PM", "11:49:37", "5:47:49 AM",
+ "6:26:22 PM", "5:17:51 AM", "6:56:21 PM", "4:47:41 AM", "7:26:31 PM")
.build();
RowSetUtilities.verify(expected, results);
@@ -357,13 +397,201 @@ public class TestHttpPlugin extends ClusterTest {
.buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("6:13:58 AM", "5:59:55 PM")
+ .addRow("6:12:17 AM", "6:01:54 PM")
.build();
RowSetUtilities.verify(expected, results);
}
@Test
+ @Ignore("Requires Remote Server")
+ public void liveTestWithURLParameters() throws Exception {
+ String sql = "SELECT * FROM live.pokemon WHERE pokemon_name = 'ditto'";
+ client.testBuilder()
+ .sqlQuery(sql)
+ .expectsNumRecords(1)
+ .go();
+ }
+
+ @Test
+ public void simpleTestWithMockServerWithURLParams() throws Exception {
+ String sql = "SELECT _response_url FROM local.github\n" +
+ "WHERE `org` = 'apache'";
+
+ try (MockWebServer server = startServer()) {
+ server.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setBody(TEST_JSON_RESPONSE)
+ );
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("_response_url", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("http://localhost:8091/orgs/apache/repos")
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+ }
+
+ @Test
+ public void simpleTestWithMockServerWithURLParamsOfBooleanType() throws Exception {
+ String sql = "SELECT _response_url FROM local.github\n" +
+ "WHERE `org` = true";
+
+ try (MockWebServer server = startServer()) {
+ server.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setBody(TEST_JSON_RESPONSE)
+ );
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("_response_url", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("http://localhost:8091/orgs/true/repos")
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+ }
+
+ @Test
+ public void simpleTestWithMockServerWithURLParamsOfIntType() throws Exception {
+ String sql = "SELECT _response_url FROM local.github\n" +
+ "WHERE `org` = 1234";
+
+ try (MockWebServer server = startServer()) {
+ server.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setBody(TEST_JSON_RESPONSE)
+ );
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("_response_url", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("http://localhost:8091/orgs/1234/repos")
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+ }
+
+ @Test
+ @Ignore("Requires Remote Server")
+ public void simpleTestWithUrlParamsInSubquery() throws Exception {
+ String sql = "select pokemon_data.data.game_index AS game_index, pokemon_data.data.version.name AS name " +
+ "from (select flatten(game_indices) as data " +
+ "from live.pokemon " +
+ "where pokemon_name='ditto' " +
+ ") as pokemon_data WHERE pokemon_data.data.game_index=76";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("game_index", MinorType.BIGINT, TypeProtos.DataMode.OPTIONAL)
+ .add("name", MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(76, "red")
+ .addRow(76, "blue")
+ .addRow(76, "yellow")
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+
+ @Test
+ public void simpleTestWithMockServerWithDuplicateURLParams() throws Exception {
+ String sql = "SELECT _response_url FROM local.github2\n" +
+ "WHERE `org` = 'apache'";
+
+ try (MockWebServer server = startServer()) {
+ server.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setBody(TEST_JSON_RESPONSE)
+ );
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("_response_url", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("http://localhost:8091/orgs/apache/repos?org=apache")
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+ }
+
+ @Test
+ public void testUrlParamsInQueryString() throws Exception {
+ String sql = "SELECT _response_url FROM local.github3\n" +
+ "WHERE `org` = 'apache' AND p1='param1' AND p2='param2'";
+
+ try (MockWebServer server = startServer()) {
+ server.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setBody(TEST_JSON_RESPONSE)
+ );
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("_response_url", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("http://localhost:8091/orgs/apache/repos?p1=param1&p2=param2")
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+ }
+
+ /**
+ * When the user has configured an API connection with URL parameters,
+ * it is mandatory that those parameters are included in the WHERE clause. Drill
+ * will throw an exception if that parameter is not present.
+ * @throws Exception if anything goes wrong
+ */
+ @Test
+ public void testUrlParamError() throws Exception {
+ String sql = "SELECT _response_url FROM local.github\n";
+
+ try (MockWebServer server = startServer()) {
+ server.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setBody(TEST_JSON_RESPONSE)
+ );
+ run(sql);
+ fail();
+ } catch (UserException e) {
+ assertTrue(e.getMessage().contains("API Query with URL Parameters must be populated."));
+ }
+ }
+
+ @Test
public void testSerDe() throws Exception {
try (MockWebServer server = startServer()) {
@@ -766,6 +994,34 @@ public class TestHttpPlugin extends ClusterTest {
}
@Test
+ public void testNoErrorOn404() throws Exception {
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(
+ new MockResponse()
+ .setResponseCode(404)
+ .setBody("{}")
+ );
+
+ String sql = "SELECT _response_code, _response_message, _response_protocol, _response_url FROM local.mocktable";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("_response_code", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+ .add("_response_message", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("_response_protocol", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("_response_url", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(404, "Client Error", "http/1.1", "http://localhost:8091/json")
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+ }
+
+ @Test
public void testHeaders() throws Exception {
try (MockWebServer server = startServer()) {
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestURLParameters.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestURLParameters.java
new file mode 100644
index 0000000..9e5d364
--- /dev/null
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestURLParameters.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http;
+
+import okhttp3.HttpUrl;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.store.http.util.SimpleHttp;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestURLParameters {
+
+ @Test
+ public void testUrlParameters() {
+ // Http client setup
+ HttpUrl githubSingleParam = HttpUrl.parse("https://github.com/orgs/{org}/repos");
+ CaseInsensitiveMap<String> filters = CaseInsensitiveMap.newHashMap();
+ filters.put("org", "apache");
+ filters.put("param1", "value1");
+ filters.put("param2", "value2");
+ assertEquals("https://github.com/orgs/apache/repos", SimpleHttp.mapURLParameters(githubSingleParam, filters));
+
+
+ HttpUrl githubMultiParam = HttpUrl.parse("https://github.com/orgs/{org}/{repos}");
+ CaseInsensitiveMap<String> filters2 = CaseInsensitiveMap.newHashMap();
+ filters2.put("org", "apache");
+ filters2.put("param1", "value1");
+ filters2.put("repos", "drill");
+ assertEquals("https://github.com/orgs/apache/drill", SimpleHttp.mapURLParameters(githubMultiParam, filters2));
+
+ HttpUrl githubNoParam = HttpUrl.parse("https://github.com/orgs/org/repos");
+ CaseInsensitiveMap<String> filters3 = CaseInsensitiveMap.newHashMap();
+
+ filters3.put("org", "apache");
+ filters3.put("param1", "value1");
+ filters3.put("repos", "drill");
+ assertEquals("https://github.com/orgs/org/repos", SimpleHttp.mapURLParameters(githubNoParam, filters3));
+ }
+
+ @Test
+ public void testParamAtEnd() {
+ HttpUrl pokemonUrl = HttpUrl.parse("https://pokeapi.co/api/v2/pokemon/{pokemon_name}");
+ CaseInsensitiveMap<String> filters = CaseInsensitiveMap.newHashMap();
+ filters.put("pokemon_name", "Misty");
+ filters.put("param1", "value1");
+ filters.put("repos", "drill");
+ assertEquals("https://pokeapi.co/api/v2/pokemon/Misty", SimpleHttp.mapURLParameters(pokemonUrl, filters));
+ }
+
+ @Test
+ public void testUpperCase() {
+ HttpUrl githubSingleParam = HttpUrl.parse("https://github.com/orgs/{ORG}/repos");
+ CaseInsensitiveMap<String> filters = CaseInsensitiveMap.newHashMap();
+ filters.put("org", "apache");
+ filters.put("param1", "value1");
+ filters.put("param2", "value2");
+ assertEquals("https://github.com/orgs/apache/repos", SimpleHttp.mapURLParameters(githubSingleParam, filters));
+ }
+
+ @Test
+ public void testURLDefaultParameters() {
+ HttpUrl githubSingleParam = HttpUrl.parse("https://github.com/orgs/{org=apache}/repos");
+ CaseInsensitiveMap<String> filters = CaseInsensitiveMap.newHashMap();
+ filters.put("param1", "value1");
+ filters.put("param2", "value2");
+ assertEquals("https://github.com/orgs/apache/repos", SimpleHttp.mapURLParameters(githubSingleParam, filters));
+ }
+
+ @Test
+ public void testMixedCase() {
+ // Since SQL is case-insensitive,
+ HttpUrl githubSingleParam = HttpUrl.parse("https://github.com/orgs/{ORG}/{org}/repos");
+ CaseInsensitiveMap<String> filters = CaseInsensitiveMap.newHashMap();
+ filters.put("org", "apache");
+ filters.put("ORG", "linux");
+ filters.put("param1", "value1");
+ filters.put("param2", "value2");
+ assertEquals("https://github.com/orgs/linux/linux/repos", SimpleHttp.mapURLParameters(githubSingleParam, filters));
+ }
+
+ @Test
+ public void testDuplicateParameters() {
+ HttpUrl pokemonUrl = HttpUrl.parse("https://pokeapi.co/api/{pokemon_name}/pokemon/{pokemon_name}");
+ CaseInsensitiveMap<String> filters = CaseInsensitiveMap.newHashMap();
+ filters.put("pokemon_name", "Misty");
+ filters.put("param1", "value1");
+ filters.put("repos", "drill");
+ assertEquals("https://pokeapi.co/api/Misty/pokemon/Misty", SimpleHttp.mapURLParameters(pokemonUrl, filters));
+ }
+
+ @Test
+ public void testDefaultParametersWithDifferentDatatypes() {
+ HttpUrl pokemonUrl = HttpUrl.parse("https://pokeapi.co/api/{boolean=true}/{int=1234}");
+ CaseInsensitiveMap<String> filters = CaseInsensitiveMap.newHashMap();
+ assertEquals("https://pokeapi.co/api/true/1234", SimpleHttp.mapURLParameters(pokemonUrl,filters));
+ }
+
+ @Test
+ public void testDefaultParameterExtractor() {
+ HttpUrl pokemonUrl = HttpUrl.parse("https://pokeapi.co/api/{pokemon_name=Misty}");
+ String defaultValue = SimpleHttp.getDefaultParameterValue(pokemonUrl, "pokemon_name");
+ assertEquals("Misty", defaultValue);
+ }
+
+ @Test
+ public void testDefaultParameterExtractorWithBlankDefault() {
+ HttpUrl pokemonUrl = HttpUrl.parse("https://pokeapi.co/api/{pokemon_name=}");
+ try {
+ SimpleHttp.getDefaultParameterValue(pokemonUrl, "pokemon_name");
+ fail();
+ } catch (UserException e) {
+ assertTrue(e.getMessage().contains("Default URL parameters must have a value."));
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index 0e7fd4d..586e7bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -299,6 +299,11 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
}
rowWriter.save();
} else {
+ // Special case for empty data sets to still record implicit columns
+ if (implicitFields != null) {
+ implicitFields.writeImplicitColumns();
+ rowWriter.save();
+ }
eof = true;
break;
}