You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/03/18 17:12:34 UTC
[drill] branch master updated: DRILL-8169: Add UDFs to HTTP Plugin to Facilitate Joins (#2496)
This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 7ef203b DRILL-8169: Add UDFs to HTTP Plugin to Facilitate Joins (#2496)
7ef203b is described below
commit 7ef203b840f873fca0625d2fe77d9d4fab6ad99f
Author: Charles S. Givre <cg...@apache.org>
AuthorDate: Fri Mar 18 13:12:28 2022 -0400
DRILL-8169: Add UDFs to HTTP Plugin to Facilitate Joins (#2496)
* Initial commit
* Unit tests wroking
* Added configs from constants
* Working with Storage plugins
* Removed cache files
* Fixed cache directory
* Updated docs
* Removed unneeded pom.xml modifications
* Addressed Review comments
* Converted holder to VarCharHolder[]
---
contrib/storage-http/README.md | 34 ++
.../exec/store/http/udfs/HttpHelperFunctions.java | 166 ++++++++++
.../drill/exec/store/http/util/SimpleHttp.java | 367 +++++++++++++++++++--
.../exec/store/http/TestHttpUDFFunctions.java | 159 +++++++++
.../src/test/resources/data/simple.json | 14 +
.../src/test/resources/data/weather.json | 94 ++++++
.../apache/drill/exec/ops/FragmentContextImpl.java | 5 +
.../org/apache/drill/exec/ops/QueryContext.java | 5 +
.../org/apache/drill/exec/ops/UdfUtilities.java | 9 +
.../org/apache/drill/test/OperatorFixture.java | 6 +
10 files changed, 839 insertions(+), 20 deletions(-)
diff --git a/contrib/storage-http/README.md b/contrib/storage-http/README.md
index 8154f0a..6c19ae6 100644
--- a/contrib/storage-http/README.md
+++ b/contrib/storage-http/README.md
@@ -659,3 +659,37 @@ The HTTP plugin includes four implicit fields which can be used for debugging.
* `_response_message`: The response message.
* `_response_protocol`: The response protocol.
* `_response_url`: The actual URL sent to the API.
+
+## Joining Data
+There are some situations where a user might want to join data with an API result and the pushdowns prevent that from happening. The main situation where this happens is when
+an API has parameters which are part of the URL AND these parameters are dynamically populated via a join.
+
+In this case, there are two functions `http_get_url` and `http_get` which you can use to faciliate these joins.
+
+* `http_request('<storage_plugin_name>', <params>)`: This function accepts a storage plugin as input and an optional list of parameters to include in a URL.
+* `http_get(<url>, <params>)`: This function works in the same way except that it does not pull any configuration information from existing storage plugins. The input url for
+ the `http_get` function must be a valid URL.
+
+### Example Queries
+Let's say that you have a storage plugin called `github` with an endpoint called `repos` which points to the url: https://github.com/orgs/{org}/repos. It is easy enough to
+write a query like this:
+
+```sql
+SELECT *
+FROM github.repos
+WHERE org='apache'
+```
+However, if you had a file with organizations and wanted to join this with the API, the query would fail. Using the functions listed above you could get this data as follows:
+
+```sql
+SELECT http_request('github.repos', `org`)
+FROM dfs.`some_data.csvh`
+```
+or
+```sql
+SELECT http_get('https://github.com/orgs/{org}/repos', `org`)
+FROM dfs.`some_data.csvh`
+```
+
+** WARNING: This functionality will execute an HTTP Request FOR EVERY ROW IN YOUR DATA. Use with caution. **
+
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
new file mode 100644
index 0000000..256fbee
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http.udfs;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+
+import javax.inject.Inject;
+
+public class HttpHelperFunctions {
+
+ @FunctionTemplate(names = {"http_get", "httpGet"},
+ scope = FunctionTemplate.FunctionScope.SIMPLE,
+ isVarArg = true)
+ public static class HttpGetFunction implements DrillSimpleFunc {
+
+ @Param
+ VarCharHolder rawInput;
+
+ @Param
+ VarCharHolder[] inputReaders;
+
+ @Output
+ ComplexWriter writer;
+
+ @Inject
+ OptionManager options;
+
+ @Inject
+ DrillBuf buffer;
+
+ @Workspace
+ org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
+
+ @Override
+ public void setup() {
+ jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
+ .defaultSchemaPathColumns()
+ .readNumbersAsDouble(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).bool_val)
+ .allTextMode(options.getOption(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE).bool_val)
+ .enableNanInf(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val)
+ .build();
+ }
+
+ @Override
+ public void eval() {
+ // Get the URL
+ String url = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
+
+ // Process Positional Arguments
+ java.util.List args = org.apache.drill.exec.store.http.util.SimpleHttp.buildParameterList(inputReaders);
+ String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args);
+
+ // Make the API call
+ String results = org.apache.drill.exec.store.http.util.SimpleHttp.makeSimpleGetRequest(finalUrl);
+
+ // If the result string is null or empty, return an empty map
+ if (results == null || results.length() == 0) {
+ // Return empty map
+ org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
+ mapWriter.start();
+ mapWriter.end();
+ return;
+ }
+
+ try {
+ jsonReader.setSource(results);
+ jsonReader.setIgnoreJSONParseErrors(true); // Reduce number of errors
+ jsonReader.write(writer);
+ buffer = jsonReader.getWorkBuf();
+ } catch (Exception e) {
+ throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
+ }
+ }
+ }
+
+
+ @FunctionTemplate(names = {"http_request", "httpRequest"},
+ scope = FunctionTemplate.FunctionScope.SIMPLE,
+ isVarArg = true)
+ public static class HttpGetFromStoragePluginFunction implements DrillSimpleFunc {
+
+ @Param(constant = true)
+ VarCharHolder rawInput;
+
+ @Param
+ VarCharHolder[] inputReaders;
+
+ @Output
+ ComplexWriter writer;
+
+ @Inject
+ OptionManager options;
+
+ @Inject
+ DrillbitContext drillbitContext;
+
+ @Inject
+ DrillBuf buffer;
+
+ @Workspace
+ org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
+
+ @Override
+ public void setup() {
+ jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
+ .defaultSchemaPathColumns()
+ .readNumbersAsDouble(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).bool_val)
+ .allTextMode(options.getOption(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE).bool_val)
+ .enableNanInf(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val)
+ .build();
+ }
+
+ @Override
+ public void eval() {
+ // Get the plugin name
+ String pluginName = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
+
+ // Process Positional Arguments
+ java.util.List args = org.apache.drill.exec.store.http.util.SimpleHttp.buildParameterList(inputReaders);
+ String results = org.apache.drill.exec.store.http.util.SimpleHttp.makeAPICall(pluginName, drillbitContext, args);
+
+ // If the result string is null or empty, return an empty map
+ if (results == null || results.length() == 0) {
+ // Return empty map
+ org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
+ mapWriter.start();
+ mapWriter.end();
+ return;
+ }
+
+ try {
+ jsonReader.setSource(results);
+ jsonReader.setIgnoreJSONParseErrors(true); // Reduce number of errors
+ jsonReader.write(writer);
+ buffer = jsonReader.getWorkBuf();
+ } catch (Exception e) {
+ throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
+ }
+ }
+ }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 971e039..b5812c1 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.http.util;
+import com.typesafe.config.Config;
import okhttp3.Cache;
import okhttp3.Credentials;
import okhttp3.FormBody;
@@ -30,20 +31,30 @@ import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
import org.apache.drill.exec.oauth.PersistentTokenTable;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
import org.apache.drill.exec.store.http.HttpApiConfig;
import org.apache.drill.exec.store.http.HttpApiConfig.HttpMethod;
import org.apache.drill.exec.store.http.HttpApiConfig.PostLocation;
import org.apache.drill.exec.store.http.HttpOAuthConfig;
+import org.apache.drill.exec.store.http.HttpStoragePlugin;
import org.apache.drill.exec.store.http.HttpStoragePluginConfig;
import org.apache.drill.exec.store.http.HttpSubScan;
import org.apache.drill.exec.store.http.paginator.Paginator;
import org.apache.drill.exec.store.http.oauth.AccessTokenAuthenticator;
import org.apache.drill.exec.store.http.oauth.AccessTokenInterceptor;
import org.apache.drill.exec.store.http.oauth.AccessTokenRepository;
+import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyBuilder;
import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
import org.jetbrains.annotations.NotNull;
import org.json.simple.JSONObject;
@@ -55,23 +66,28 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
+import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
@@ -81,12 +97,11 @@ import java.util.regex.Pattern;
*/
public class SimpleHttp {
private static final Logger logger = LoggerFactory.getLogger(SimpleHttp.class);
-
+ private static final int DEFAULT_TIMEOUT = 1;
private static final Pattern URL_PARAM_REGEX = Pattern.compile("\\{(\\w+)(?:=(\\w*))?}");
public static final MediaType JSON_MEDIA_TYPE = MediaType.get("application/json; charset=utf-8");
private final OkHttpClient client;
- private final HttpSubScan scanDefn;
private final File tempDir;
private final HttpProxyConfig proxyConfig;
private final CustomErrorContext errorContext;
@@ -94,7 +109,10 @@ public class SimpleHttp {
private final HttpUrl url;
private final PersistentTokenTable tokenTable;
private final Map<String, String> filters;
+ private final String connection;
+ private final HttpStoragePluginConfig pluginConfig;
private final HttpApiConfig apiConfig;
+ private final HttpOAuthConfig oAuthConfig;
private String responseMessage;
private int responseCode;
private String responseProtocol;
@@ -103,8 +121,10 @@ public class SimpleHttp {
public SimpleHttp(HttpSubScan scanDefn, HttpUrl url, File tempDir,
HttpProxyConfig proxyConfig, CustomErrorContext errorContext, Paginator paginator) {
- this.scanDefn = scanDefn;
this.apiConfig = scanDefn.tableSpec().connectionConfig();
+ this.pluginConfig = scanDefn.tableSpec().config();
+ this.connection = scanDefn.tableSpec().connection();
+ this.oAuthConfig = scanDefn.tableSpec().config().oAuthConfig();
this.filters = scanDefn.filters();
this.url = url;
this.tempDir = tempDir;
@@ -115,6 +135,48 @@ public class SimpleHttp {
this.client = setupHttpClient();
}
+ /**
+ * This constructor does not have an HttpSubScan and can be used outside the context of the HttpStoragePlugin.
+ * @param url The URL for an HTTP request
+ * @param tempDir Temp directory for caching
+ * @param proxyConfig Proxy configuration for making API calls
+ * @param errorContext The error context for error messages
+ * @param paginator The {@link Paginator} object for pagination.
+ * @param tokenTable The OAuth token table
+ * @param pluginConfig HttpStoragePlugin configuration. The plugin obtains OAuth and timeout info from this config.
+ * @param endpointConfig The
+ * @param connection The name of the connection
+ * @param filters A Key/value set of filters and values
+ */
+ public SimpleHttp(HttpUrl url, File tempDir, HttpProxyConfig proxyConfig, CustomErrorContext errorContext,
+ Paginator paginator, PersistentTokenTable tokenTable, HttpStoragePluginConfig pluginConfig,
+ HttpApiConfig endpointConfig, String connection, Map<String, String> filters) {
+ this.url = url;
+ this.tempDir = tempDir;
+ this.proxyConfig = proxyConfig;
+
+ if (errorContext == null) {
+ this.errorContext = new EmptyErrorContext() {
+ @Override
+ public void addContext(UserException.Builder builder) {
+ super.addContext(builder);
+ builder.addContext("URL", url.toString());
+ }
+ };
+ } else {
+ this.errorContext = errorContext;
+ }
+
+ this.paginator = paginator;
+ this.tokenTable = tokenTable;
+ this.pluginConfig = pluginConfig;
+ this.apiConfig = endpointConfig;
+ this.connection = connection;
+ this.filters = filters;
+ this.oAuthConfig = pluginConfig.oAuthConfig();
+ this.client = setupHttpClient();
+ }
+
public static SimpleHttpBuilder builder() {
return new SimpleHttpBuilder();
}
@@ -124,24 +186,21 @@ public class SimpleHttp {
*
* @return OkHttpClient configured server
*/
- private OkHttpClient setupHttpClient() {
+ protected OkHttpClient setupHttpClient() {
Builder builder = new OkHttpClient.Builder();
// Set up the HTTP Cache. Future possibilities include making the cache size and retention configurable but
// right now it is on or off. The writer will write to the Drill temp directory if it is accessible and
// output a warning if not.
- HttpStoragePluginConfig config = scanDefn.tableSpec().config();
- if (config.cacheResults()) {
+ if (pluginConfig.cacheResults()) {
setupCache(builder);
}
- HttpApiConfig apiConfig = scanDefn.tableSpec().connectionConfig();
// If OAuth information is provided, we will assume that the user does not want to use
// basic authentication
- HttpOAuthConfig oAuthConfig = scanDefn.tableSpec().config().oAuthConfig();
if (oAuthConfig != null) {
// Add interceptors for OAuth2
logger.debug("Adding OAuth2 Interceptor");
- AccessTokenRepository repository = new AccessTokenRepository(proxyConfig, config, tokenTable);
+ AccessTokenRepository repository = new AccessTokenRepository(proxyConfig, pluginConfig, tokenTable);
builder.authenticator(new AccessTokenAuthenticator(repository));
builder.addInterceptor(new AccessTokenInterceptor(repository));
@@ -154,7 +213,7 @@ public class SimpleHttp {
}
// Set timeouts
- int timeout = Math.max(1, config.timeout());
+ int timeout = Math.max(1, pluginConfig.timeout());
builder.connectTimeout(timeout, TimeUnit.SECONDS);
builder.writeTimeout(timeout, TimeUnit.SECONDS);
builder.readTimeout(timeout, TimeUnit.SECONDS);
@@ -255,7 +314,6 @@ public class SimpleHttp {
.url(url);
// The configuration does not allow for any other request types other than POST and GET.
- HttpApiConfig apiConfig = scanDefn.tableSpec().connectionConfig();
if (apiConfig.getMethodType() == HttpMethod.POST) {
// Handle POST requests
FormBody.Builder formBodyBuilder;
@@ -284,7 +342,7 @@ public class SimpleHttp {
// Log the URL and method to aid in debugging user issues.
logger.info("Connection: {}, Method {}, URL: {}",
- scanDefn.tableSpec().connection(),
+ connection,
apiConfig.getMethodType().name(), url());
// Add headers to request
@@ -344,6 +402,30 @@ public class SimpleHttp {
}
}
+ public String getResultsFromApiCall() {
+ InputStream inputStream = getInputStream();
+ return new BufferedReader(
+ new InputStreamReader(inputStream, StandardCharsets.UTF_8))
+ .lines()
+ .collect(Collectors.joining("\n"));
+ }
+
+ public static HttpProxyConfig getProxySettings(HttpStoragePluginConfig config, Config drillConfig, HttpUrl url) {
+ final ProxyBuilder builder = HttpProxyConfig.builder()
+ .fromConfigForURL(drillConfig, url.toString());
+ final String proxyType = config.proxyType();
+ if (proxyType != null && !"direct".equals(proxyType)) {
+ UsernamePasswordCredentials credentials = config.getUsernamePasswordCredentials();
+ builder
+ .type(config.proxyType())
+ .host(config.proxyHost())
+ .port(config.proxyPort())
+ .username(credentials.getUsername())
+ .password(credentials.getPassword());
+ }
+ return builder.build();
+ }
+
/**
* This function is a replacement for the isSuccessful() function which comes
* with okhttp3. The issue is that in some cases, a user may not want Drill to throw
@@ -354,7 +436,7 @@ public class SimpleHttp {
* @return True if the response code is 200-299 and possibly 400-499, false if other
*/
private boolean isSuccessful(int responseCode) {
- if (scanDefn.tableSpec().connectionConfig().errorOn400()) {
+ if (apiConfig.errorOn400()) {
return responseCode >= 200 && responseCode <= 299;
} else {
return ((responseCode >= 200 && responseCode <= 299) ||
@@ -386,7 +468,7 @@ public class SimpleHttp {
if (hasEndpointCredentials(apiConfig)) {
return apiConfig.getUsernamePasswordCredentials();
} else {
- return scanDefn.tableSpec().config().getUsernamePasswordCredentials();
+ return pluginConfig.getUsernamePasswordCredentials();
}
}
@@ -660,6 +742,214 @@ public class SimpleHttp {
return tempUrl;
}
+
+ public static String mapPositionalParameters(String rawUrl, List<String> params) {
+ HttpUrl url = HttpUrl.parse(rawUrl);
+
+ // Validate URL
+ if (url == null) {
+ throw UserException.functionError()
+ .message("URL provided must be a valid URL. " + rawUrl + " is not valid.")
+ .build(logger);
+ }
+
+ if (!hasURLParameters(url)) {
+ return url.toString();
+ }
+
+ if (params == null) {
+ throw UserException
+ .parseError()
+ .message("API Query with URL Parameters must be populated.")
+ .build(logger);
+ }
+
+ String tempUrl = decodedURL(url);
+ int startIndex;
+ int endIndex;
+ int counter = 0;
+ while (counter < params.size()) {
+ startIndex = tempUrl.indexOf("{");
+ endIndex = tempUrl.indexOf("}");
+
+ if (startIndex == -1 || endIndex == -1) {
+ break;
+ }
+
+ StringBuffer tempUrlBuffer = new StringBuffer(tempUrl);
+ tempUrlBuffer.replace(startIndex, endIndex + 1, params.get(counter));
+ tempUrl = tempUrlBuffer.toString();
+ counter++;
+ }
+ return tempUrl;
+ }
+
+ /**
+ * Validates a URL.
+ * @param url The input URL. Should be a string.
+ * @return True of the URL is valid, false if not.
+ */
+ public static boolean validateUrl(String url) {
+ return HttpUrl.parse(url) != null;
+ }
+
+ /**
+ * Accepts a list of input readers and converts that into an ArrayList of Strings
+ * @param inputReaders The array of FieldReaders
+ * @return A List of Strings containing the values from the FieldReaders.
+ */
+ public static List<String> buildParameterList(VarCharHolder[] inputReaders) {
+ if (inputReaders == null || inputReaders.length == 0) {
+ return Collections.emptyList();
+ }
+
+ List<String> inputArguments = new ArrayList<>();
+ for (int i = 0; i < inputReaders.length; i++) {
+ inputArguments.add(StringFunctionHelpers.getStringFromVarCharHolder(inputReaders[i]));
+ }
+
+ return inputArguments;
+ }
+
+ public static HttpStoragePluginConfig getPluginConfig(String name, DrillbitContext context) throws PluginException {
+ HttpStoragePlugin httpStoragePlugin = getStoragePlugin(context, name);
+ return httpStoragePlugin.getConfig();
+ }
+
+ public static HttpApiConfig getEndpointConfig(String name, HttpStoragePluginConfig pluginConfig) {
+ // Get the plugin name and endpoint name
+ String[] parts = name.split("\\.");
+ if (parts.length < 2) {
+ throw UserException.functionError()
+ .message("You must call this function with a connection name and endpoint.")
+ .build(logger);
+ }
+
+ String endpoint = parts[1];
+ HttpApiConfig endpointConfig = pluginConfig.getConnection(endpoint);
+ if (endpointConfig == null) {
+ throw UserException.functionError()
+ .message("You must call this function with a valid endpoint name.")
+ .build(logger);
+ } else if (endpointConfig.inputType() != "json") {
+ throw UserException.functionError()
+ .message("Http_get only supports API endpoints which return json.")
+ .build(logger);
+ }
+
+ return endpointConfig;
+ }
+
+ private static HttpStoragePlugin getStoragePlugin(DrillbitContext context, String pluginName) {
+ StoragePluginRegistry storage = context.getStorage();
+ try {
+ StoragePlugin pluginInstance = storage.getPlugin(pluginName);
+ if (pluginInstance == null) {
+ throw UserException.functionError()
+ .message(pluginName + " is not a valid plugin.")
+ .build(logger);
+ }
+
+ if (!(pluginInstance instanceof HttpStoragePlugin)) {
+ throw UserException.functionError()
+ .message("You can only include HTTP plugins in this function.")
+ .build(logger);
+ }
+ return (HttpStoragePlugin) pluginInstance;
+ } catch (PluginException e) {
+ throw UserException.functionError()
+ .message("Could not access plugin " + pluginName)
+ .build(logger);
+ }
+ }
+
+
+ /**
+ * This function makes an API call and returns a string of the parsed results. It is used in the http_get() UDF
+ * and retrieves all the configuration parameters contained in the storage plugin and endpoint configuration. The exception
+ * is pagination. This does not support pagination.
+ * @param schemaPath The path of storage_plugin.endpoint from which the data will be retrieved
+ * @param context {@link DrillbitContext} The context from the current query
+ * @param args An optional list of parameter arguments which will be included in the URL
+ * @return A String of the results.
+ */
+ public static String makeAPICall(String schemaPath, DrillbitContext context, List<String> args) {
+ HttpStoragePluginConfig pluginConfig;
+ HttpApiConfig endpointConfig;
+
+ // Get the plugin name and endpoint name
+ String[] parts = schemaPath.split("\\.");
+ if (parts.length < 2) {
+ throw UserException.functionError()
+ .message("You must call this function with a connection name and endpoint.")
+ .build(logger);
+ }
+ String pluginName = parts[0];
+
+ HttpStoragePlugin plugin = getStoragePlugin(context, pluginName);
+
+ try {
+ pluginConfig = getPluginConfig(pluginName, context);
+ endpointConfig = getEndpointConfig(schemaPath, pluginConfig);
+ } catch (PluginException e) {
+ throw UserException.functionError()
+ .message("Could not access plugin " + pluginName)
+ .build(logger);
+ }
+
+ // Get proxy settings
+ HttpProxyConfig proxyConfig = SimpleHttp.getProxySettings(pluginConfig, context.getConfig(), endpointConfig.getHttpUrl());
+
+ // For this use case, we will replace the URL parameters here, rather than doing it in the SimpleHttp client
+ // because we are using positional mapping rather than k/v pairs for this.
+ String finalUrl;
+ if (SimpleHttp.hasURLParameters(endpointConfig.getHttpUrl())) {
+ finalUrl = SimpleHttp.mapPositionalParameters(endpointConfig.url(), args);
+ } else {
+ finalUrl = endpointConfig.url();
+ }
+
+ // Now get the client
+ SimpleHttp client = new SimpleHttpBuilder()
+ .pluginConfig(pluginConfig)
+ .endpointConfig(endpointConfig)
+ .tempDir(new File(context.getConfig().getString(ExecConstants.DRILL_TMP_DIR)))
+ .url(HttpUrl.parse(finalUrl))
+ .proxyConfig(proxyConfig)
+ .tokenTable(plugin.getTokenTable())
+ .build();
+
+ return client.getResultsFromApiCall();
+ }
+
+ public static OkHttpClient getSimpleHttpClient() {
+ return new OkHttpClient.Builder()
+ .connectTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
+ .writeTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
+ .readTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
+ .build();
+ }
+
+ public static String makeSimpleGetRequest(String url) {
+ OkHttpClient client = getSimpleHttpClient();
+ Request.Builder requestBuilder = new Request.Builder()
+ .url(url);
+
+ // Build the request object
+ Request request = requestBuilder.build();
+
+ // Execute the request
+ try {
+ Response response = client.newCall(request).execute();
+ return response.body().string();
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("HTTP request failed")
+ .build(logger);
+ }
+ }
+
/**
* Intercepts requests and adds authentication headers to the request
*/
@@ -684,19 +974,25 @@ public class SimpleHttp {
public static class SimpleHttpBuilder {
private HttpSubScan scanDefn;
-
private HttpUrl url;
-
private File tempDir;
-
private HttpProxyConfig proxyConfig;
-
private CustomErrorContext errorContext;
-
private Paginator paginator;
+ private PersistentTokenTable tokenTable;
+ private HttpStoragePluginConfig pluginConfig;
+ private HttpApiConfig endpointConfig;
+ private HttpOAuthConfig oAuthConfig;
+ private Map<String,String> filters;
+ private String connection;
public SimpleHttpBuilder scanDefn(HttpSubScan scanDefn) {
this.scanDefn = scanDefn;
+ this.pluginConfig = scanDefn.tableSpec().config();
+ this.endpointConfig = scanDefn.tableSpec().connectionConfig();
+ this.oAuthConfig = scanDefn.tableSpec().config().oAuthConfig();
+ this.tokenTable = scanDefn.tableSpec().getTokenTable();
+ this.filters = scanDefn.filters();
return this;
}
@@ -725,8 +1021,39 @@ public class SimpleHttp {
return this;
}
+ public SimpleHttpBuilder tokenTable(PersistentTokenTable tokenTable) {
+ this.tokenTable = tokenTable;
+ return this;
+ }
+
+ public SimpleHttpBuilder pluginConfig(HttpStoragePluginConfig config) {
+ this.pluginConfig = config;
+ this.oAuthConfig = config.oAuthConfig();
+ return this;
+ }
+
+ public SimpleHttpBuilder endpointConfig(HttpApiConfig endpointConfig) {
+ this.endpointConfig = endpointConfig;
+ return this;
+ }
+
+ public SimpleHttpBuilder connection(String connection) {
+ this.connection = connection;
+ return this;
+ }
+
+ public SimpleHttpBuilder filters(Map<String,String> filters) {
+ this.filters = filters;
+ return this;
+ }
+
+
public SimpleHttp build() {
- return new SimpleHttp(scanDefn, url, tempDir, proxyConfig, errorContext, paginator);
+ if (this.scanDefn != null) {
+ return new SimpleHttp(scanDefn, url, tempDir, proxyConfig, errorContext, paginator);
+ } else {
+ return new SimpleHttp(url, tempDir, proxyConfig, errorContext, paginator, tokenTable, pluginConfig, endpointConfig, connection, filters);
+ }
}
}
}
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
new file mode 100644
index 0000000..0439979
--- /dev/null
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http;
+
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.store.http.util.SimpleHttp;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestHttpUDFFunctions extends ClusterTest {
+
+ private static final int MOCK_SERVER_PORT = 47770;
+ private static String TEST_JSON_RESPONSE;
+ private static String DUMMY_URL = "http://localhost:" + MOCK_SERVER_PORT + "/";
+
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher));
+ TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/simple.json"), Charsets.UTF_8).read();
+
+ HttpApiConfig mockGithubWithDuplicateParam = HttpApiConfig.builder()
+ .url("http://localhost:47770/orgs/{org}/repos")
+ .method("GET")
+ .params(Arrays.asList("org", "lng", "date"))
+ .dataPath("results")
+ .requireTail(false)
+ .build();
+
+ Map<String, HttpApiConfig> configs = new HashMap<>();
+ configs.put("github", mockGithubWithDuplicateParam);
+
+ HttpStoragePluginConfig mockStorageConfigWithWorkspace =
+ new HttpStoragePluginConfig(false, configs, 2, "globaluser", "globalpass", "",
+ 80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
+ UsernamePasswordCredentials.USERNAME, "globaluser",
+ UsernamePasswordCredentials.PASSWORD, "globalpass")));
+ mockStorageConfigWithWorkspace.setEnabled(true);
+ cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
+ }
+
+ @Test
+ public void testHttpGetWithNoParams() throws Exception {
+ try (MockWebServer server = startServer()) {
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_RESPONSE));
+ String sql = "SELECT http_get('" + DUMMY_URL + "') AS result FROM (values(1))";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ assertEquals(1, results.rowCount());
+ results.clear();
+
+ RecordedRequest recordedRequest = server.takeRequest();
+ assertEquals("GET", recordedRequest.getMethod());
+ assertEquals("http://localhost:47770/", recordedRequest.getRequestUrl().toString());
+ }
+ }
+
+ @Test
+ public void testHttpGetWithParams() throws Exception {
+ try (MockWebServer server = startServer()) {
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_RESPONSE));
+ String sql = "SELECT http_get('" + DUMMY_URL + "{p1}/{p2}', 'param1', 'param2') AS result FROM (values(1))";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ assertEquals(1, results.rowCount());
+ results.clear();
+
+ RecordedRequest recordedRequest = server.takeRequest();
+ assertEquals("GET", recordedRequest.getMethod());
+ assertEquals("http://localhost:47770/param1/param2", recordedRequest.getRequestUrl().toString());
+ }
+ }
+
+ @Test
+ public void testHttpGetFromPlugin() throws Exception {
+ try (MockWebServer server = startServer()) {
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_RESPONSE));
+ String sql = "SELECT http_request('local.github', 'apache') AS result FROM (values(1))";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ assertEquals(1, results.rowCount());
+ results.clear();
+
+ RecordedRequest recordedRequest = server.takeRequest();
+ assertEquals("GET", recordedRequest.getMethod());
+ assertEquals("http://localhost:47770/orgs/apache/repos", recordedRequest.getRequestUrl().toString());
+ }
+ }
+
+ @Test
+ public void testHttpGetWithInvalidPlugin() {
+ try {
+ String sql = "SELECT http_request('nope.nothere', 'apache') AS result FROM (values(1))";
+ client.queryBuilder().sql(sql).run();
+ fail();
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("FUNCTION ERROR: nope is not a valid plugin."));
+ }
+ }
+
+ @Test
+ public void testPositionalReplacement() {
+ String url = "http://somesite.com/{p1}/{p2}/path/{}";
+ List<String> params = new ArrayList<>();
+ params.add("foo");
+ params.add("bar");
+ params.add("baz");
+ assertEquals("http://somesite.com/foo/bar/path/baz", SimpleHttp.mapPositionalParameters(url, params));
+ }
+
+ /**
+ * Helper function to start the MockHTTPServer
+ * @return Started Mock server
+ * @throws IOException If the server cannot start, throws IOException
+ */
+ public static MockWebServer startServer() throws IOException {
+ MockWebServer server = new MockWebServer();
+ server.start(MOCK_SERVER_PORT);
+ return server;
+ }
+}
diff --git a/contrib/storage-http/src/test/resources/data/simple.json b/contrib/storage-http/src/test/resources/data/simple.json
new file mode 100644
index 0000000..bde2d5b
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/simple.json
@@ -0,0 +1,14 @@
+{"results":
+ {"sunrise":"6:13:58 AM",
+ "sunset":"5:59:55 PM",
+ "solar_noon":"12:06:56 PM",
+ "day_length":"11:45:57",
+ "civil_twilight_begin":"5:48:14 AM",
+ "civil_twilight_end":"6:25:38 PM",
+ "nautical_twilight_begin":"5:18:16 AM",
+ "nautical_twilight_end":"6:55:36 PM",
+ "astronomical_twilight_begin":"4:48:07 AM",
+ "astronomical_twilight_end":"7:25:45 PM"
+ },
+ "status":"OK"
+}
diff --git a/contrib/storage-http/src/test/resources/data/weather.json b/contrib/storage-http/src/test/resources/data/weather.json
new file mode 100644
index 0000000..75df005
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/weather.json
@@ -0,0 +1,94 @@
+{
+ "@context": [
+ "https://geojson.org/geojson-ld/geojson-context.jsonld",
+ {
+ "@version": "1.1",
+ "wx": "https://api.weather.gov/ontology#",
+ "s": "https://schema.org/",
+ "geo": "http://www.opengis.net/ont/geosparql#",
+ "unit": "http://codes.wmo.int/common/unit/",
+ "@vocab": "https://api.weather.gov/ontology#",
+ "geometry": {
+ "@id": "s:GeoCoordinates",
+ "@type": "geo:wktLiteral"
+ },
+ "city": "s:addressLocality",
+ "state": "s:addressRegion",
+ "distance": {
+ "@id": "s:Distance",
+ "@type": "s:QuantitativeValue"
+ },
+ "bearing": {
+ "@type": "s:QuantitativeValue"
+ },
+ "value": {
+ "@id": "s:value"
+ },
+ "unitCode": {
+ "@id": "s:unitCode",
+ "@type": "@id"
+ },
+ "forecastOffice": {
+ "@type": "@id"
+ },
+ "forecastGridData": {
+ "@type": "@id"
+ },
+ "publicZone": {
+ "@type": "@id"
+ },
+ "county": {
+ "@type": "@id"
+ }
+ }
+ ],
+ "id": "https://api.weather.gov/points/39.7456,-97.0892",
+ "type": "Feature",
+ "geometry": {
+ "type": "Point",
+ "coordinates": [
+ -97.089200000000005,
+ 39.745600000000003
+ ]
+ },
+ "properties": {
+ "@id": "https://api.weather.gov/points/39.7456,-97.0892",
+ "@type": "wx:Point",
+ "cwa": "TOP",
+ "forecastOffice": "https://api.weather.gov/offices/TOP",
+ "gridId": "TOP",
+ "gridX": 31,
+ "gridY": 80,
+ "forecast": "https://api.weather.gov/gridpoints/TOP/31,80/forecast",
+ "forecastHourly": "https://api.weather.gov/gridpoints/TOP/31,80/forecast/hourly",
+ "forecastGridData": "https://api.weather.gov/gridpoints/TOP/31,80",
+ "observationStations": "https://api.weather.gov/gridpoints/TOP/31,80/stations",
+ "relativeLocation": {
+ "type": "Feature",
+ "geometry": {
+ "type": "Point",
+ "coordinates": [
+ -97.086661000000007,
+ 39.679375999999998
+ ]
+ },
+ "properties": {
+ "city": "Linn",
+ "state": "KS",
+ "distance": {
+ "unitCode": "wmoUnit:m",
+ "value": 7366.9851976443997
+ },
+ "bearing": {
+ "unitCode": "wmoUnit:degree_(angle)",
+ "value": 358
+ }
+ }
+ },
+ "forecastZone": "https://api.weather.gov/zones/forecast/KSZ009",
+ "county": "https://api.weather.gov/zones/county/KSC201",
+ "fireWeatherZone": "https://api.weather.gov/zones/fire/KSZ009",
+ "timeZone": "America/Chicago",
+ "radarStation": "KTWX"
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index 7ce944d..0da319f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -342,6 +342,11 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
}
@Override
+ public DrillbitContext getDrillbitContext() {
+ return context;
+ }
+
+ @Override
public DrillbitEndpoint getForemanEndpoint() {
return fragment.getForeman();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 82e9994..76b44ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -305,6 +305,11 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
}
@Override
+ public DrillbitContext getDrillbitContext() {
+ return drillbitContext;
+ }
+
+ @Override
public PartitionExplorer getPartitionExplorer() {
return new PartitionExplorerImpl(getRootSchema());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
index 6617b25..cee74ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.ops;
import io.netty.buffer.DrillBuf;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.expr.holders.ValueHolder;
+import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.PartitionExplorer;
import org.apache.drill.shaded.guava.com.google.common.base.Function;
@@ -42,6 +43,7 @@ public interface UdfUtilities {
.put(ContextInformation.class, "getContextInformation")
.put(OptionManager.class, "getOptions")
.put(BufferManager.class, "getManagedBufferManager")
+ .put(DrillbitContext.class, "getDrillbitContext")
.build();
@@ -90,6 +92,13 @@ public interface UdfUtilities {
PartitionExplorer getPartitionExplorer();
/**
+ * The Drillbit context allows UDFs to view storage information and other pieces
+ * of information about the running system. See the http_get implementation for details.
+ * @return - an object for accessing drillbit information such as storage configs.
+ */
+ DrillbitContext getDrillbitContext();
+
+ /**
* Works with value holders cache which holds constant value and its wrapper by type.
* If value is absent uses holderInitializer to create holder and adds it to cache.
*
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index 30c60c2..d73d252 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -18,6 +18,7 @@
package org.apache.drill.test;
import org.apache.drill.exec.alias.AliasRegistryProvider;
+import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.metastore.MetastoreRegistry;
import org.apache.drill.shaded.guava.com.google.common.base.Function;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -239,6 +240,11 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
}
@Override
+ public DrillbitContext getDrillbitContext() {
+ return null;
+ }
+
+ @Override
public ExecutorState getExecutorState() {
return executorState;
}