You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/03/18 17:12:34 UTC

[drill] branch master updated: DRILL-8169: Add UDFs to HTTP Plugin to Facilitate Joins (#2496)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ef203b  DRILL-8169: Add UDFs to HTTP Plugin to Facilitate Joins (#2496)
7ef203b is described below

commit 7ef203b840f873fca0625d2fe77d9d4fab6ad99f
Author: Charles S. Givre <cg...@apache.org>
AuthorDate: Fri Mar 18 13:12:28 2022 -0400

    DRILL-8169: Add UDFs to HTTP Plugin to Facilitate Joins (#2496)
    
    * Initial commit
    
    * Unit tests wroking
    
    * Added configs from constants
    
    * Working with Storage plugins
    
    * Removed cache files
    
    * Fixed cache directory
    
    * Updated docs
    
    * Removed unneeded pom.xml modifications
    
    * Addressed Review comments
    
    * Converted holder to VarCharHolder[]
---
 contrib/storage-http/README.md                     |  34 ++
 .../exec/store/http/udfs/HttpHelperFunctions.java  | 166 ++++++++++
 .../drill/exec/store/http/util/SimpleHttp.java     | 367 +++++++++++++++++++--
 .../exec/store/http/TestHttpUDFFunctions.java      | 159 +++++++++
 .../src/test/resources/data/simple.json            |  14 +
 .../src/test/resources/data/weather.json           |  94 ++++++
 .../apache/drill/exec/ops/FragmentContextImpl.java |   5 +
 .../org/apache/drill/exec/ops/QueryContext.java    |   5 +
 .../org/apache/drill/exec/ops/UdfUtilities.java    |   9 +
 .../org/apache/drill/test/OperatorFixture.java     |   6 +
 10 files changed, 839 insertions(+), 20 deletions(-)

diff --git a/contrib/storage-http/README.md b/contrib/storage-http/README.md
index 8154f0a..6c19ae6 100644
--- a/contrib/storage-http/README.md
+++ b/contrib/storage-http/README.md
@@ -659,3 +659,37 @@ The HTTP plugin includes four implicit fields which can be used for debugging.
 * `_response_message`:  The response message.
 * `_response_protocol`:  The response protocol.
 * `_response_url`:  The actual URL sent to the API. 
+
+## Joining Data
+There are some situations where a user might want to join data with an API result and the pushdowns prevent that from happening.  The main situation where this happens is when 
+an API has parameters which are part of the URL AND these parameters are dynamically populated via a join. 
+
+In this case, there are two functions `http_get_url` and `http_get` which you can use to faciliate these joins. 
+
+* `http_request('<storage_plugin_name>', <params>)`:  This function accepts a storage plugin as input and an optional list of parameters to include in a URL.
+* `http_get(<url>, <params>)`:  This function works in the same way except that it does not pull any configuration information from existing storage plugins.  The input url for 
+  the `http_get` function must be a valid URL. 
+
+### Example Queries
+Let's say that you have a storage plugin called `github` with an endpoint called `repos` which points to the url: https://github.com/orgs/{org}/repos.  It is easy enough to 
+write a query like this:
+
+```sql
+SELECT * 
+FROM github.repos
+WHERE org='apache'
+```
+However, if you had a file with organizations and wanted to join this with the API, the query would fail.  Using the functions listed above you could get this data as follows:
+
+```sql
+SELECT http_request('github.repos', `org`)
+FROM dfs.`some_data.csvh`
+```
+or
+```sql
+SELECT http_get('https://github.com/orgs/{org}/repos', `org`)
+FROM dfs.`some_data.csvh`
+```
+
+** WARNING:  This functionality will execute an HTTP Request FOR EVERY ROW IN YOUR DATA.  Use with caution. **
+
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
new file mode 100644
index 0000000..256fbee
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http.udfs;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+
+import javax.inject.Inject;
+
+public class HttpHelperFunctions {
+
+  @FunctionTemplate(names = {"http_get", "httpGet"},
+    scope = FunctionTemplate.FunctionScope.SIMPLE,
+    isVarArg = true)
+  public static class HttpGetFunction implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput;
+
+    @Param
+    VarCharHolder[] inputReaders;
+
+    @Output
+    ComplexWriter writer;
+
+    @Inject
+    OptionManager options;
+
+    @Inject
+    DrillBuf buffer;
+
+    @Workspace
+    org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
+
+    @Override
+    public void setup() {
+      jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
+        .defaultSchemaPathColumns()
+        .readNumbersAsDouble(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).bool_val)
+        .allTextMode(options.getOption(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE).bool_val)
+        .enableNanInf(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val)
+        .build();
+    }
+
+    @Override
+    public void eval() {
+      // Get the URL
+      String url = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
+
+      // Process Positional Arguments
+      java.util.List args = org.apache.drill.exec.store.http.util.SimpleHttp.buildParameterList(inputReaders);
+      String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args);
+
+      // Make the API call
+      String results = org.apache.drill.exec.store.http.util.SimpleHttp.makeSimpleGetRequest(finalUrl);
+
+      // If the result string is null or empty, return an empty map
+      if (results == null || results.length() == 0) {
+        // Return empty map
+        org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
+        mapWriter.start();
+        mapWriter.end();
+        return;
+      }
+
+      try {
+        jsonReader.setSource(results);
+        jsonReader.setIgnoreJSONParseErrors(true);  // Reduce number of errors
+        jsonReader.write(writer);
+        buffer = jsonReader.getWorkBuf();
+      } catch (Exception e) {
+        throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
+      }
+    }
+  }
+
+
+  @FunctionTemplate(names = {"http_request", "httpRequest"},
+    scope = FunctionTemplate.FunctionScope.SIMPLE,
+    isVarArg = true)
+  public static class HttpGetFromStoragePluginFunction implements DrillSimpleFunc {
+
+    @Param(constant = true)
+    VarCharHolder rawInput;
+
+    @Param
+    VarCharHolder[] inputReaders;
+
+    @Output
+    ComplexWriter writer;
+
+    @Inject
+    OptionManager options;
+
+    @Inject
+    DrillbitContext drillbitContext;
+
+    @Inject
+    DrillBuf buffer;
+
+    @Workspace
+    org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
+
+    @Override
+    public void setup() {
+      jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
+        .defaultSchemaPathColumns()
+        .readNumbersAsDouble(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).bool_val)
+        .allTextMode(options.getOption(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE).bool_val)
+        .enableNanInf(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val)
+        .build();
+    }
+
+    @Override
+    public void eval() {
+      // Get the plugin name
+      String pluginName = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
+
+      // Process Positional Arguments
+      java.util.List args = org.apache.drill.exec.store.http.util.SimpleHttp.buildParameterList(inputReaders);
+      String results = org.apache.drill.exec.store.http.util.SimpleHttp.makeAPICall(pluginName, drillbitContext, args);
+
+      // If the result string is null or empty, return an empty map
+      if (results == null || results.length() == 0) {
+        // Return empty map
+        org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
+        mapWriter.start();
+        mapWriter.end();
+        return;
+      }
+
+      try {
+        jsonReader.setSource(results);
+        jsonReader.setIgnoreJSONParseErrors(true);  // Reduce number of errors
+        jsonReader.write(writer);
+        buffer = jsonReader.getWorkBuf();
+      } catch (Exception e) {
+        throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
+      }
+    }
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 971e039..b5812c1 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.http.util;
 
+import com.typesafe.config.Config;
 import okhttp3.Cache;
 import okhttp3.Credentials;
 import okhttp3.FormBody;
@@ -30,20 +31,30 @@ import okhttp3.RequestBody;
 import okhttp3.Response;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
 import org.apache.drill.exec.oauth.PersistentTokenTable;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
 import org.apache.drill.exec.store.http.HttpApiConfig;
 import org.apache.drill.exec.store.http.HttpApiConfig.HttpMethod;
 import org.apache.drill.exec.store.http.HttpApiConfig.PostLocation;
 import org.apache.drill.exec.store.http.HttpOAuthConfig;
+import org.apache.drill.exec.store.http.HttpStoragePlugin;
 import org.apache.drill.exec.store.http.HttpStoragePluginConfig;
 import org.apache.drill.exec.store.http.HttpSubScan;
 import org.apache.drill.exec.store.http.paginator.Paginator;
 import org.apache.drill.exec.store.http.oauth.AccessTokenAuthenticator;
 import org.apache.drill.exec.store.http.oauth.AccessTokenInterceptor;
 import org.apache.drill.exec.store.http.oauth.AccessTokenRepository;
+import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyBuilder;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.jetbrains.annotations.NotNull;
 import org.json.simple.JSONObject;
@@ -55,23 +66,28 @@ import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLSocketFactory;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.X509TrustManager;
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.UnsupportedEncodingException;
 import java.net.InetSocketAddress;
 import java.net.Proxy;
 import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
 import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
 import java.security.cert.X509Certificate;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 
 /**
@@ -81,12 +97,11 @@ import java.util.regex.Pattern;
  */
 public class SimpleHttp {
   private static final Logger logger = LoggerFactory.getLogger(SimpleHttp.class);
-
+  private static final int DEFAULT_TIMEOUT = 1;
   private static final Pattern URL_PARAM_REGEX = Pattern.compile("\\{(\\w+)(?:=(\\w*))?}");
   public static final MediaType JSON_MEDIA_TYPE = MediaType.get("application/json; charset=utf-8");
 
   private final OkHttpClient client;
-  private final HttpSubScan scanDefn;
   private final File tempDir;
   private final HttpProxyConfig proxyConfig;
   private final CustomErrorContext errorContext;
@@ -94,7 +109,10 @@ public class SimpleHttp {
   private final HttpUrl url;
   private final PersistentTokenTable tokenTable;
   private final Map<String, String> filters;
+  private final String connection;
+  private final HttpStoragePluginConfig pluginConfig;
   private final HttpApiConfig apiConfig;
+  private final HttpOAuthConfig oAuthConfig;
   private String responseMessage;
   private int responseCode;
   private String responseProtocol;
@@ -103,8 +121,10 @@ public class SimpleHttp {
 
   public SimpleHttp(HttpSubScan scanDefn, HttpUrl url, File tempDir,
     HttpProxyConfig proxyConfig, CustomErrorContext errorContext, Paginator paginator) {
-    this.scanDefn = scanDefn;
     this.apiConfig = scanDefn.tableSpec().connectionConfig();
+    this.pluginConfig = scanDefn.tableSpec().config();
+    this.connection = scanDefn.tableSpec().connection();
+    this.oAuthConfig = scanDefn.tableSpec().config().oAuthConfig();
     this.filters = scanDefn.filters();
     this.url = url;
     this.tempDir = tempDir;
@@ -115,6 +135,48 @@ public class SimpleHttp {
     this.client = setupHttpClient();
   }
 
+  /**
+   * This constructor does not have an HttpSubScan and can be used outside the context of the HttpStoragePlugin.
+   * @param url The URL for an HTTP request
+   * @param tempDir Temp directory for caching
+   * @param proxyConfig Proxy configuration for making API calls
+   * @param errorContext The error context for error messages
+   * @param paginator The {@link Paginator} object for pagination.
+   * @param tokenTable The OAuth token table
+   * @param pluginConfig HttpStoragePlugin configuration.  The plugin obtains OAuth and timeout info from this config.
+   * @param endpointConfig The
+   * @param connection The name of the connection
+   * @param filters A Key/value set of filters and values
+   */
+  public SimpleHttp(HttpUrl url, File tempDir, HttpProxyConfig proxyConfig, CustomErrorContext errorContext,
+                    Paginator paginator, PersistentTokenTable tokenTable, HttpStoragePluginConfig pluginConfig,
+                    HttpApiConfig endpointConfig, String connection, Map<String, String> filters) {
+    this.url = url;
+    this.tempDir = tempDir;
+    this.proxyConfig = proxyConfig;
+
+    if (errorContext == null) {
+      this.errorContext = new EmptyErrorContext() {
+        @Override
+        public void addContext(UserException.Builder builder) {
+          super.addContext(builder);
+          builder.addContext("URL", url.toString());
+        }
+      };
+    } else {
+      this.errorContext = errorContext;
+    }
+
+    this.paginator = paginator;
+    this.tokenTable = tokenTable;
+    this.pluginConfig = pluginConfig;
+    this.apiConfig = endpointConfig;
+    this.connection = connection;
+    this.filters = filters;
+    this.oAuthConfig = pluginConfig.oAuthConfig();
+    this.client = setupHttpClient();
+  }
+
   public static SimpleHttpBuilder builder() {
     return new SimpleHttpBuilder();
   }
@@ -124,24 +186,21 @@ public class SimpleHttp {
    *
    * @return OkHttpClient configured server
    */
-  private OkHttpClient setupHttpClient() {
+  protected OkHttpClient setupHttpClient() {
     Builder builder = new OkHttpClient.Builder();
 
     // Set up the HTTP Cache.   Future possibilities include making the cache size and retention configurable but
     // right now it is on or off.  The writer will write to the Drill temp directory if it is accessible and
     // output a warning if not.
-    HttpStoragePluginConfig config = scanDefn.tableSpec().config();
-    if (config.cacheResults()) {
+    if (pluginConfig.cacheResults()) {
       setupCache(builder);
     }
-    HttpApiConfig apiConfig = scanDefn.tableSpec().connectionConfig();
     // If OAuth information is provided, we will assume that the user does not want to use
     // basic authentication
-    HttpOAuthConfig oAuthConfig = scanDefn.tableSpec().config().oAuthConfig();
     if (oAuthConfig != null) {
       // Add interceptors for OAuth2
       logger.debug("Adding OAuth2 Interceptor");
-      AccessTokenRepository repository = new AccessTokenRepository(proxyConfig, config, tokenTable);
+      AccessTokenRepository repository = new AccessTokenRepository(proxyConfig, pluginConfig, tokenTable);
 
       builder.authenticator(new AccessTokenAuthenticator(repository));
       builder.addInterceptor(new AccessTokenInterceptor(repository));
@@ -154,7 +213,7 @@ public class SimpleHttp {
     }
 
     // Set timeouts
-    int timeout = Math.max(1, config.timeout());
+    int timeout = Math.max(1, pluginConfig.timeout());
     builder.connectTimeout(timeout, TimeUnit.SECONDS);
     builder.writeTimeout(timeout, TimeUnit.SECONDS);
     builder.readTimeout(timeout, TimeUnit.SECONDS);
@@ -255,7 +314,6 @@ public class SimpleHttp {
       .url(url);
 
     // The configuration does not allow for any other request types other than POST and GET.
-    HttpApiConfig apiConfig = scanDefn.tableSpec().connectionConfig();
     if (apiConfig.getMethodType() == HttpMethod.POST) {
       // Handle POST requests
       FormBody.Builder formBodyBuilder;
@@ -284,7 +342,7 @@ public class SimpleHttp {
 
     // Log the URL and method to aid in debugging user issues.
     logger.info("Connection: {}, Method {}, URL: {}",
-      scanDefn.tableSpec().connection(),
+      connection,
       apiConfig.getMethodType().name(), url());
 
     // Add headers to request
@@ -344,6 +402,30 @@ public class SimpleHttp {
     }
   }
 
+  public String getResultsFromApiCall() {
+    InputStream inputStream = getInputStream();
+    return new BufferedReader(
+      new InputStreamReader(inputStream, StandardCharsets.UTF_8))
+      .lines()
+      .collect(Collectors.joining("\n"));
+  }
+
+  public static HttpProxyConfig getProxySettings(HttpStoragePluginConfig config, Config drillConfig, HttpUrl url) {
+    final ProxyBuilder builder = HttpProxyConfig.builder()
+      .fromConfigForURL(drillConfig, url.toString());
+    final String proxyType = config.proxyType();
+    if (proxyType != null && !"direct".equals(proxyType)) {
+      UsernamePasswordCredentials credentials = config.getUsernamePasswordCredentials();
+      builder
+        .type(config.proxyType())
+        .host(config.proxyHost())
+        .port(config.proxyPort())
+        .username(credentials.getUsername())
+        .password(credentials.getPassword());
+    }
+    return builder.build();
+  }
+
   /**
    * This function is a replacement for the isSuccessful() function which comes
    * with okhttp3.  The issue is that in some cases, a user may not want Drill to throw
@@ -354,7 +436,7 @@ public class SimpleHttp {
    * @return True if the response code is 200-299 and possibly 400-499, false if other
    */
   private boolean isSuccessful(int responseCode) {
-    if (scanDefn.tableSpec().connectionConfig().errorOn400()) {
+    if (apiConfig.errorOn400()) {
       return responseCode >= 200 && responseCode <= 299;
     } else {
       return ((responseCode >= 200 && responseCode <= 299) ||
@@ -386,7 +468,7 @@ public class SimpleHttp {
     if (hasEndpointCredentials(apiConfig)) {
       return apiConfig.getUsernamePasswordCredentials();
     } else {
-      return scanDefn.tableSpec().config().getUsernamePasswordCredentials();
+      return pluginConfig.getUsernamePasswordCredentials();
     }
   }
 
@@ -660,6 +742,214 @@ public class SimpleHttp {
     return tempUrl;
   }
 
+
+  public static String mapPositionalParameters(String rawUrl, List<String> params) {
+    HttpUrl url = HttpUrl.parse(rawUrl);
+
+    // Validate URL
+    if (url == null) {
+      throw UserException.functionError()
+        .message("URL provided must be a valid URL. " + rawUrl + " is not valid.")
+        .build(logger);
+    }
+
+    if (!hasURLParameters(url)) {
+      return url.toString();
+    }
+
+    if (params == null) {
+      throw UserException
+        .parseError()
+        .message("API Query with URL Parameters must be populated.")
+        .build(logger);
+    }
+
+    String tempUrl = decodedURL(url);
+    int startIndex;
+    int endIndex;
+    int counter = 0;
+    while (counter < params.size()) {
+      startIndex = tempUrl.indexOf("{");
+      endIndex = tempUrl.indexOf("}");
+
+      if (startIndex == -1 || endIndex == -1) {
+        break;
+      }
+
+      StringBuffer tempUrlBuffer = new StringBuffer(tempUrl);
+      tempUrlBuffer.replace(startIndex, endIndex + 1, params.get(counter));
+      tempUrl = tempUrlBuffer.toString();
+      counter++;
+    }
+    return tempUrl;
+  }
+
+  /**
+   * Validates a URL.
+   * @param url The input URL.  Should be a string.
+   * @return True of the URL is valid, false if not.
+   */
+  public static boolean validateUrl(String url) {
+    return HttpUrl.parse(url) != null;
+  }
+
+  /**
+   * Accepts a list of input readers and converts that into an ArrayList of Strings
+   * @param inputReaders The array of FieldReaders
+   * @return A List of Strings containing the values from the FieldReaders.
+   */
+  public static List<String> buildParameterList(VarCharHolder[] inputReaders) {
+    if (inputReaders == null || inputReaders.length == 0) {
+      return Collections.emptyList();
+    }
+
+    List<String> inputArguments = new ArrayList<>();
+    for (int i = 0; i < inputReaders.length; i++) {
+      inputArguments.add(StringFunctionHelpers.getStringFromVarCharHolder(inputReaders[i]));
+    }
+
+    return inputArguments;
+  }
+
+  public static HttpStoragePluginConfig getPluginConfig(String name, DrillbitContext context) throws PluginException {
+    HttpStoragePlugin httpStoragePlugin = getStoragePlugin(context, name);
+    return httpStoragePlugin.getConfig();
+  }
+
+  public static HttpApiConfig getEndpointConfig(String name, HttpStoragePluginConfig pluginConfig) {
+    // Get the plugin name and endpoint name
+    String[] parts = name.split("\\.");
+    if (parts.length < 2) {
+      throw UserException.functionError()
+        .message("You must call this function with a connection name and endpoint.")
+        .build(logger);
+    }
+
+    String endpoint = parts[1];
+    HttpApiConfig endpointConfig = pluginConfig.getConnection(endpoint);
+    if (endpointConfig == null) {
+      throw UserException.functionError()
+        .message("You must call this function with a valid endpoint name.")
+        .build(logger);
+    } else if (endpointConfig.inputType() != "json") {
+      throw UserException.functionError()
+        .message("Http_get only supports API endpoints which return json.")
+        .build(logger);
+    }
+
+    return endpointConfig;
+  }
+
+  private static HttpStoragePlugin getStoragePlugin(DrillbitContext context, String pluginName) {
+    StoragePluginRegistry storage = context.getStorage();
+    try {
+      StoragePlugin pluginInstance = storage.getPlugin(pluginName);
+      if (pluginInstance == null) {
+        throw UserException.functionError()
+          .message(pluginName + " is not a valid plugin.")
+          .build(logger);
+      }
+
+      if (!(pluginInstance instanceof HttpStoragePlugin)) {
+        throw UserException.functionError()
+          .message("You can only include HTTP plugins in this function.")
+          .build(logger);
+      }
+      return (HttpStoragePlugin) pluginInstance;
+    } catch (PluginException e) {
+      throw UserException.functionError()
+        .message("Could not access plugin " + pluginName)
+        .build(logger);
+    }
+  }
+
+
+  /**
+   * This function makes an API call and returns a string of the parsed results. It is used in the http_get() UDF
+   * and retrieves all the configuration parameters contained in the storage plugin and endpoint configuration. The exception
+   * is pagination.  This does not support pagination.
+   * @param schemaPath The path of storage_plugin.endpoint from which the data will be retrieved
+   * @param context {@link DrillbitContext} The context from the current query
+   * @param args An optional list of parameter arguments which will be included in the URL
+   * @return A String of the results.
+   */
+  public static String makeAPICall(String schemaPath, DrillbitContext context, List<String> args) {
+    HttpStoragePluginConfig pluginConfig;
+    HttpApiConfig endpointConfig;
+
+    // Get the plugin name and endpoint name
+    String[] parts = schemaPath.split("\\.");
+    if (parts.length < 2) {
+      throw UserException.functionError()
+        .message("You must call this function with a connection name and endpoint.")
+        .build(logger);
+    }
+    String pluginName = parts[0];
+
+    HttpStoragePlugin plugin = getStoragePlugin(context, pluginName);
+
+    try {
+      pluginConfig = getPluginConfig(pluginName, context);
+      endpointConfig = getEndpointConfig(schemaPath, pluginConfig);
+    } catch (PluginException e) {
+      throw UserException.functionError()
+        .message("Could not access plugin " + pluginName)
+        .build(logger);
+    }
+
+    // Get proxy settings
+    HttpProxyConfig proxyConfig = SimpleHttp.getProxySettings(pluginConfig, context.getConfig(), endpointConfig.getHttpUrl());
+
+    // For this use case, we will replace the URL parameters here, rather than doing it in the SimpleHttp client
+    // because we are using positional mapping rather than k/v pairs for this.
+    String finalUrl;
+    if (SimpleHttp.hasURLParameters(endpointConfig.getHttpUrl())) {
+      finalUrl = SimpleHttp.mapPositionalParameters(endpointConfig.url(), args);
+    } else {
+      finalUrl = endpointConfig.url();
+    }
+
+    // Now get the client
+    SimpleHttp client = new SimpleHttpBuilder()
+      .pluginConfig(pluginConfig)
+      .endpointConfig(endpointConfig)
+      .tempDir(new File(context.getConfig().getString(ExecConstants.DRILL_TMP_DIR)))
+      .url(HttpUrl.parse(finalUrl))
+      .proxyConfig(proxyConfig)
+      .tokenTable(plugin.getTokenTable())
+      .build();
+
+    return client.getResultsFromApiCall();
+  }
+
+  public static OkHttpClient getSimpleHttpClient() {
+    return new OkHttpClient.Builder()
+      .connectTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
+      .writeTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
+      .readTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
+      .build();
+  }
+
+  public static String makeSimpleGetRequest(String url) {
+    OkHttpClient client = getSimpleHttpClient();
+    Request.Builder requestBuilder = new Request.Builder()
+      .url(url);
+
+    // Build the request object
+    Request request = requestBuilder.build();
+
+    // Execute the request
+    try {
+      Response response = client.newCall(request).execute();
+      return response.body().string();
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("HTTP request failed")
+        .build(logger);
+    }
+  }
+
   /**
    * Intercepts requests and adds authentication headers to the request
    */
@@ -684,19 +974,25 @@ public class SimpleHttp {
 
   public static class SimpleHttpBuilder {
     private HttpSubScan scanDefn;
-
     private HttpUrl url;
-
     private File tempDir;
-
     private HttpProxyConfig proxyConfig;
-
     private CustomErrorContext errorContext;
-
     private Paginator paginator;
+    private PersistentTokenTable tokenTable;
+    private HttpStoragePluginConfig pluginConfig;
+    private HttpApiConfig endpointConfig;
+    private HttpOAuthConfig oAuthConfig;
+    private Map<String,String> filters;
+    private String connection;
 
     public SimpleHttpBuilder scanDefn(HttpSubScan scanDefn) {
       this.scanDefn = scanDefn;
+      this.pluginConfig = scanDefn.tableSpec().config();
+      this.endpointConfig = scanDefn.tableSpec().connectionConfig();
+      this.oAuthConfig = scanDefn.tableSpec().config().oAuthConfig();
+      this.tokenTable = scanDefn.tableSpec().getTokenTable();
+      this.filters = scanDefn.filters();
       return this;
     }
 
@@ -725,8 +1021,39 @@ public class SimpleHttp {
       return this;
     }
 
+    public SimpleHttpBuilder tokenTable(PersistentTokenTable tokenTable) {
+      this.tokenTable = tokenTable;
+      return this;
+    }
+
+    public SimpleHttpBuilder pluginConfig(HttpStoragePluginConfig config) {
+      this.pluginConfig = config;
+      this.oAuthConfig = config.oAuthConfig();
+      return this;
+    }
+
+    public SimpleHttpBuilder endpointConfig(HttpApiConfig endpointConfig) {
+      this.endpointConfig = endpointConfig;
+      return this;
+    }
+
+    public SimpleHttpBuilder connection(String connection) {
+      this.connection = connection;
+      return this;
+    }
+
+    public SimpleHttpBuilder filters(Map<String,String> filters) {
+      this.filters = filters;
+      return this;
+    }
+
+
     public SimpleHttp build() {
-      return new SimpleHttp(scanDefn, url, tempDir, proxyConfig, errorContext, paginator);
+      if (this.scanDefn != null) {
+        return new SimpleHttp(scanDefn, url, tempDir, proxyConfig, errorContext, paginator);
+      } else {
+        return new SimpleHttp(url, tempDir, proxyConfig, errorContext, paginator, tokenTable, pluginConfig, endpointConfig, connection, filters);
+      }
     }
   }
 }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
new file mode 100644
index 0000000..0439979
--- /dev/null
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http;
+
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.store.http.util.SimpleHttp;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestHttpUDFFunctions extends ClusterTest {
+
+  private static final int MOCK_SERVER_PORT = 47770;
+  private static String TEST_JSON_RESPONSE;
+  private static String DUMMY_URL = "http://localhost:" + MOCK_SERVER_PORT + "/";
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+    TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/simple.json"), Charsets.UTF_8).read();
+
+    HttpApiConfig mockGithubWithDuplicateParam = HttpApiConfig.builder()
+      .url("http://localhost:47770/orgs/{org}/repos")
+      .method("GET")
+      .params(Arrays.asList("org", "lng", "date"))
+      .dataPath("results")
+      .requireTail(false)
+      .build();
+
+    Map<String, HttpApiConfig> configs = new HashMap<>();
+    configs.put("github", mockGithubWithDuplicateParam);
+
+    HttpStoragePluginConfig mockStorageConfigWithWorkspace =
+      new HttpStoragePluginConfig(false, configs, 2, "globaluser", "globalpass", "",
+        80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
+        UsernamePasswordCredentials.USERNAME, "globaluser",
+        UsernamePasswordCredentials.PASSWORD, "globalpass")));
+    mockStorageConfigWithWorkspace.setEnabled(true);
+    cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
+  }
+
+  @Test
+  public void testHttpGetWithNoParams() throws Exception {
+    try (MockWebServer server = startServer()) {
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_RESPONSE));
+      String sql = "SELECT http_get('" + DUMMY_URL + "') AS result FROM (values(1))";
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+      assertEquals(1, results.rowCount());
+      results.clear();
+
+      RecordedRequest recordedRequest = server.takeRequest();
+      assertEquals("GET", recordedRequest.getMethod());
+      assertEquals("http://localhost:47770/", recordedRequest.getRequestUrl().toString());
+    }
+  }
+
+  @Test
+  public void testHttpGetWithParams() throws Exception {
+    try (MockWebServer server = startServer()) {
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_RESPONSE));
+      String sql = "SELECT http_get('" + DUMMY_URL + "{p1}/{p2}', 'param1', 'param2') AS result FROM (values(1))";
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+      assertEquals(1, results.rowCount());
+      results.clear();
+
+      RecordedRequest recordedRequest = server.takeRequest();
+      assertEquals("GET", recordedRequest.getMethod());
+      assertEquals("http://localhost:47770/param1/param2", recordedRequest.getRequestUrl().toString());
+    }
+  }
+
+  @Test
+  public void testHttpGetFromPlugin() throws Exception {
+    try (MockWebServer server = startServer()) {
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_RESPONSE));
+      String sql = "SELECT http_request('local.github', 'apache') AS result FROM (values(1))";
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+      assertEquals(1, results.rowCount());
+      results.clear();
+
+      RecordedRequest recordedRequest = server.takeRequest();
+      assertEquals("GET", recordedRequest.getMethod());
+      assertEquals("http://localhost:47770/orgs/apache/repos", recordedRequest.getRequestUrl().toString());
+    }
+  }
+
+  @Test
+  public void testHttpGetWithInvalidPlugin() {
+    try {
+      String sql = "SELECT http_request('nope.nothere', 'apache') AS result FROM (values(1))";
+      client.queryBuilder().sql(sql).run();
+      fail();
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("FUNCTION ERROR: nope is not a valid plugin."));
+    }
+  }
+
+  @Test
+  public void testPositionalReplacement() {
+    String url = "http://somesite.com/{p1}/{p2}/path/{}";
+    List<String> params = new ArrayList<>();
+    params.add("foo");
+    params.add("bar");
+    params.add("baz");
+    assertEquals("http://somesite.com/foo/bar/path/baz", SimpleHttp.mapPositionalParameters(url, params));
+  }
+
+  /**
+   * Helper function to start the MockHTTPServer
+   * @return Started Mock server
+   * @throws IOException If the server cannot start, throws IOException
+   */
+  public static MockWebServer startServer() throws IOException {
+    MockWebServer server = new MockWebServer();
+    server.start(MOCK_SERVER_PORT);
+    return server;
+  }
+}
diff --git a/contrib/storage-http/src/test/resources/data/simple.json b/contrib/storage-http/src/test/resources/data/simple.json
new file mode 100644
index 0000000..bde2d5b
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/simple.json
@@ -0,0 +1,14 @@
+{"results":
+  {"sunrise":"6:13:58 AM",
+    "sunset":"5:59:55 PM",
+    "solar_noon":"12:06:56 PM",
+    "day_length":"11:45:57",
+    "civil_twilight_begin":"5:48:14 AM",
+    "civil_twilight_end":"6:25:38 PM",
+    "nautical_twilight_begin":"5:18:16 AM",
+    "nautical_twilight_end":"6:55:36 PM",
+    "astronomical_twilight_begin":"4:48:07 AM",
+    "astronomical_twilight_end":"7:25:45 PM"
+  },
+  "status":"OK"
+}
diff --git a/contrib/storage-http/src/test/resources/data/weather.json b/contrib/storage-http/src/test/resources/data/weather.json
new file mode 100644
index 0000000..75df005
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/weather.json
@@ -0,0 +1,94 @@
+{
+  "@context": [
+    "https://geojson.org/geojson-ld/geojson-context.jsonld",
+    {
+      "@version": "1.1",
+      "wx": "https://api.weather.gov/ontology#",
+      "s": "https://schema.org/",
+      "geo": "http://www.opengis.net/ont/geosparql#",
+      "unit": "http://codes.wmo.int/common/unit/",
+      "@vocab": "https://api.weather.gov/ontology#",
+      "geometry": {
+        "@id": "s:GeoCoordinates",
+        "@type": "geo:wktLiteral"
+      },
+      "city": "s:addressLocality",
+      "state": "s:addressRegion",
+      "distance": {
+        "@id": "s:Distance",
+        "@type": "s:QuantitativeValue"
+      },
+      "bearing": {
+        "@type": "s:QuantitativeValue"
+      },
+      "value": {
+        "@id": "s:value"
+      },
+      "unitCode": {
+        "@id": "s:unitCode",
+        "@type": "@id"
+      },
+      "forecastOffice": {
+        "@type": "@id"
+      },
+      "forecastGridData": {
+        "@type": "@id"
+      },
+      "publicZone": {
+        "@type": "@id"
+      },
+      "county": {
+        "@type": "@id"
+      }
+    }
+  ],
+  "id": "https://api.weather.gov/points/39.7456,-97.0892",
+  "type": "Feature",
+  "geometry": {
+    "type": "Point",
+    "coordinates": [
+      -97.089200000000005,
+      39.745600000000003
+    ]
+  },
+  "properties": {
+    "@id": "https://api.weather.gov/points/39.7456,-97.0892",
+    "@type": "wx:Point",
+    "cwa": "TOP",
+    "forecastOffice": "https://api.weather.gov/offices/TOP",
+    "gridId": "TOP",
+    "gridX": 31,
+    "gridY": 80,
+    "forecast": "https://api.weather.gov/gridpoints/TOP/31,80/forecast",
+    "forecastHourly": "https://api.weather.gov/gridpoints/TOP/31,80/forecast/hourly",
+    "forecastGridData": "https://api.weather.gov/gridpoints/TOP/31,80",
+    "observationStations": "https://api.weather.gov/gridpoints/TOP/31,80/stations",
+    "relativeLocation": {
+      "type": "Feature",
+      "geometry": {
+        "type": "Point",
+        "coordinates": [
+          -97.086661000000007,
+          39.679375999999998
+        ]
+      },
+      "properties": {
+        "city": "Linn",
+        "state": "KS",
+        "distance": {
+          "unitCode": "wmoUnit:m",
+          "value": 7366.9851976443997
+        },
+        "bearing": {
+          "unitCode": "wmoUnit:degree_(angle)",
+          "value": 358
+        }
+      }
+    },
+    "forecastZone": "https://api.weather.gov/zones/forecast/KSZ009",
+    "county": "https://api.weather.gov/zones/county/KSC201",
+    "fireWeatherZone": "https://api.weather.gov/zones/fire/KSZ009",
+    "timeZone": "America/Chicago",
+    "radarStation": "KTWX"
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index 7ce944d..0da319f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -342,6 +342,11 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
   }
 
   @Override
+  public DrillbitContext getDrillbitContext() {
+    return context;
+  }
+
+  @Override
   public DrillbitEndpoint getForemanEndpoint() {
     return fragment.getForeman();
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 82e9994..76b44ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -305,6 +305,11 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
   }
 
   @Override
+  public DrillbitContext getDrillbitContext() {
+    return drillbitContext;
+  }
+
+  @Override
   public PartitionExplorer getPartitionExplorer() {
     return new PartitionExplorerImpl(getRootSchema());
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
index 6617b25..cee74ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.ops;
 import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.expr.holders.ValueHolder;
+import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.PartitionExplorer;
 import org.apache.drill.shaded.guava.com.google.common.base.Function;
@@ -42,6 +43,7 @@ public interface UdfUtilities {
           .put(ContextInformation.class, "getContextInformation")
           .put(OptionManager.class, "getOptions")
           .put(BufferManager.class, "getManagedBufferManager")
+          .put(DrillbitContext.class, "getDrillbitContext")
           .build();
 
 
@@ -90,6 +92,13 @@ public interface UdfUtilities {
   PartitionExplorer getPartitionExplorer();
 
   /**
+   * The Drillbit context allows UDFs to view storage information and other pieces
+   * of information about the running system.  See the http_get implementation for details.
+   * @return - an object for accessing drillbit information such as storage configs.
+   */
+  DrillbitContext getDrillbitContext();
+
+  /**
    * Works with value holders cache which holds constant value and its wrapper by type.
    * If value is absent uses holderInitializer to create holder and adds it to cache.
    *
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index 30c60c2..d73d252 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -18,6 +18,7 @@
 package org.apache.drill.test;
 
 import org.apache.drill.exec.alias.AliasRegistryProvider;
+import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.metastore.MetastoreRegistry;
 import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -239,6 +240,11 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
     }
 
     @Override
+    public DrillbitContext getDrillbitContext() {
+      return null;
+    }
+
+    @Override
     public ExecutorState getExecutorState() {
       return executorState;
     }