You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2020/06/03 20:28:40 UTC

[drill] branch master updated: DRILL-7746: Add REST API Ability to Accept CSV Input

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 7eafac6  DRILL-7746: Add REST API Ability to Accept CSV Input
7eafac6 is described below

commit 7eafac6629cd50b8e329d74b69fd78379c7d6666
Author: Charles Givre <cg...@apache.org>
AuthorDate: Tue Jun 2 15:23:21 2020 -0400

    DRILL-7746: Add REST API Ability to Accept CSV Input
---
 contrib/storage-http/README.md                     |   2 +-
 contrib/storage-http/pom.xml                       |   1 -
 .../drill/exec/store/http/HttpApiConfig.java       |  17 +-
 .../drill/exec/store/http/HttpBatchReader.java     |  24 ++-
 .../drill/exec/store/http/HttpCSVBatchReader.java  | 186 +++++++++++++++++++++
 .../exec/store/http/HttpScanBatchCreator.java      |  19 ++-
 .../exec/store/http/HttpStoragePluginConfig.java   |   9 -
 .../drill/exec/store/http/TestHttpPlugin.java      |  77 ++++++---
 .../src/test/resources/data/response.csv           |   3 +
 9 files changed, 282 insertions(+), 56 deletions(-)

diff --git a/contrib/storage-http/README.md b/contrib/storage-http/README.md
index 0daa74a..19da0d1 100644
--- a/contrib/storage-http/README.md
+++ b/contrib/storage-http/README.md
@@ -500,7 +500,7 @@ ORDER BY issue_count DESC
    supported). Join pushdown has the potential to improve performance if you use the HTTP service
    joined to another table.
 
-4. This plugin only reads JSON responses.
+4. This plugin only reads JSON and CSV responses.
 
 5. `POST` bodies can only be in the format of key/value pairs. Some APIs accept
     JSON based `POST` bodies but this is not currently supported.
diff --git a/contrib/storage-http/pom.xml b/contrib/storage-http/pom.xml
index 0c2c875..2e6fb3d 100644
--- a/contrib/storage-http/pom.xml
+++ b/contrib/storage-http/pom.xml
@@ -46,7 +46,6 @@
       <artifactId>okhttp</artifactId>
       <version>${okhttp.version}</version>
     </dependency>
-
     <!-- Test dependencies -->
     <dependency>
       <groupId>org.apache.drill.exec</groupId>
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
index e93ff92..d32821a 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
@@ -67,6 +67,8 @@ public class HttpApiConfig {
   private final String authType;
   private final String userName;
   private final String password;
+  private final String inputType;
+
 
   public enum HttpMethod {
     /**
@@ -88,7 +90,8 @@ public class HttpApiConfig {
                        @JsonProperty("postBody") String postBody,
                        @JsonProperty("params") List<String> params,
                        @JsonProperty("dataPath") String dataPath,
-                       @JsonProperty("requireTail") Boolean requireTail) {
+                       @JsonProperty("requireTail") Boolean requireTail,
+                       @JsonProperty("inputType") String inputType) {
 
     this.headers = headers;
     this.method = Strings.isNullOrEmpty(method)
@@ -125,6 +128,9 @@ public class HttpApiConfig {
 
     // Default to true for backward compatibility with first PR.
     this.requireTail = requireTail == null ? true : requireTail;
+
+    this.inputType = inputType == null
+      ? "json" : inputType.trim().toLowerCase();
   }
 
   @JsonProperty("url")
@@ -162,10 +168,13 @@ public class HttpApiConfig {
     return HttpMethod.valueOf(this.method());
   }
 
+  @JsonProperty("inputType")
+  public String inputType() { return inputType; }
+
   @Override
   public int hashCode() {
     return Objects.hash(url, method, requireTail, params, headers,
-        authType, userName, password, postBody);
+        authType, userName, password, postBody, inputType);
   }
 
   @Override
@@ -181,6 +190,7 @@ public class HttpApiConfig {
       .maskedField("password", password)
       .field("postBody", postBody)
       .field("filterFields", params)
+      .field("inputType", inputType)
       .toString();
   }
 
@@ -202,6 +212,7 @@ public class HttpApiConfig {
       && Objects.equals(postBody, other.postBody)
       && Objects.equals(params, other.params)
       && Objects.equals(dataPath, other.dataPath)
-      && Objects.equals(requireTail, other.requireTail);
+      && Objects.equals(requireTail, other.requireTail)
+      && Objects.equals(inputType, other.inputType);
   }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index 7f0efd9..1b322dd 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -17,11 +17,9 @@
  */
 package org.apache.drill.exec.store.http;
 
-import java.io.File;
-import java.io.InputStream;
-import java.util.List;
-import java.util.Map;
-
+import com.typesafe.config.Config;
+import okhttp3.HttpUrl;
+import okhttp3.HttpUrl.Builder;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.ChildErrorContext;
 import org.apache.drill.common.exceptions.CustomErrorContext;
@@ -35,10 +33,10 @@ import org.apache.drill.exec.store.http.util.HttpProxyConfig;
 import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyBuilder;
 import org.apache.drill.exec.store.http.util.SimpleHttp;
 
-import com.typesafe.config.Config;
-
-import okhttp3.HttpUrl;
-import okhttp3.HttpUrl.Builder;
+import java.io.File;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
 
 public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
   private final HttpSubScan subScan;
@@ -95,7 +93,7 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
     return true; // Please read the first batch
   }
 
-  private HttpUrl buildUrl() {
+  protected HttpUrl buildUrl() {
     HttpApiConfig apiConfig = subScan.tableSpec().connectionConfig();
     String baseUrl = apiConfig.url();
 
@@ -115,7 +113,7 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
    * Convert equality filter conditions into HTTP query parameters
    * Parameters must appear in the order defined in the config.
    */
-  private void addFilters(Builder urlBuilder, List<String> params,
+  protected void addFilters(Builder urlBuilder, List<String> params,
       Map<String, String> filters) {
     for (String param : params) {
       String value = filters.get(param);
@@ -125,7 +123,7 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
     }
   }
 
-  private HttpProxyConfig proxySettings(Config drillConfig, HttpUrl url) {
+  protected HttpProxyConfig proxySettings(Config drillConfig, HttpUrl url) {
     final HttpStoragePluginConfig config = subScan.tableSpec().config();
     final ProxyBuilder builder = HttpProxyConfig.builder()
         .fromConfigForURL(drillConfig, url.toString());
@@ -141,7 +139,7 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
     return builder.build();
   }
 
- @Override
+  @Override
   public boolean next() {
     return jsonLoader.readBatch();
   }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
new file mode 100644
index 0000000..f7db13b
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http;
+
+import com.univocity.parsers.csv.CsvParser;
+import com.univocity.parsers.csv.CsvParserSettings;
+import okhttp3.HttpUrl;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.http.util.SimpleHttp;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HttpCSVBatchReader extends HttpBatchReader {
+  private final HttpSubScan subScan;
+  private final CsvParserSettings csvSettings;
+  private CsvParser csvReader;
+  private List<StringColumnWriter> columnWriters;
+  private String[] firstRow;
+  private SchemaBuilder builder;
+  private RowSetLoader rowWriter;
+  private InputStream inStream;
+
+
+  public HttpCSVBatchReader(HttpSubScan subScan) {
+    super(subScan);
+    this.subScan = subScan;
+
+    this.csvSettings = new CsvParserSettings();
+    csvSettings.setLineSeparatorDetectionEnabled(true);
+  }
+
+  @Override
+  public boolean open(SchemaNegotiator negotiator) {
+
+    // Result set loader setup
+    String tempDirPath = negotiator.drillConfig().getString(ExecConstants.DRILL_TMP_DIR);
+
+    HttpUrl url = buildUrl();
+
+    CustomErrorContext errorContext = new ChildErrorContext(negotiator.parentErrorContext()) {
+      @Override
+      public void addContext(UserException.Builder builder) {
+        super.addContext(builder);
+        builder.addContext("URL", url.toString());
+      }
+    };
+    negotiator.setErrorContext(errorContext);
+
+    // Http client setup
+    SimpleHttp http = new SimpleHttp(subScan, url, new File(tempDirPath), proxySettings(negotiator.drillConfig(), url), errorContext);
+
+    // CSV loader setup
+    inStream = http.getInputStream();
+
+    this.csvReader = new CsvParser(csvSettings);
+    csvReader.beginParsing(inStream);
+
+    // Build the Schema
+    builder = new SchemaBuilder();
+    TupleMetadata drillSchema = buildSchema();
+    negotiator.tableSchema(drillSchema, true);
+    ResultSetLoader resultLoader = negotiator.build();
+
+    // Create ScalarWriters
+    rowWriter = resultLoader.writer();
+    populateWriterArray();
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    while (!rowWriter.isFull()) {
+      if (!processRow()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(inStream);
+  }
+
+  private TupleMetadata buildSchema() {
+    firstRow = csvReader.parseNext();
+    if (firstRow != null) {
+      for (String value : firstRow) {
+        builder.addNullable(value, TypeProtos.MinorType.VARCHAR);
+      }
+    }
+    return builder.buildSchema();
+  }
+
+  private void populateWriterArray() {
+    // Case for empty result set
+    if (firstRow == null || firstRow.length == 0) {
+      return;
+    }
+
+    columnWriters = new ArrayList<>();
+
+    int colPosition = 0;
+    for (String value : firstRow) {
+      columnWriters.add(new StringColumnWriter(value, rowWriter, colPosition));
+      colPosition++;
+    }
+  }
+
+  private boolean processRow() {
+    String[] nextRow = csvReader.parseNext();
+    if (nextRow == null) {
+      return false;
+    }
+    rowWriter.start();
+    for (StringColumnWriter columnWriter : columnWriters) {
+      columnWriter.load(nextRow);
+    }
+    rowWriter.save();
+    return true;
+  }
+
+  public abstract static class ColumnWriter {
+
+    final String colName;
+    ScalarWriter columnWriter;
+    int columnIndex;
+
+    public ColumnWriter(String colName, ScalarWriter writer, int columnIndex) {
+      this.colName = colName;
+      this.columnWriter = writer;
+      this.columnIndex = columnIndex;
+    }
+
+    public void load(String[] record) {}
+  }
+
+  public static class StringColumnWriter extends ColumnWriter {
+
+    StringColumnWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+      super(colName, rowWriter.scalar(colName), columnIndex);
+    }
+
+    @Override
+    public void load(String[] record) {
+      String value = record[columnIndex];
+      if (Strings.isNullOrEmpty(value)) {
+        columnWriter.setNull();
+      } else {
+        columnWriter.setString(value);
+      }
+    }
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
index effe0d3..0f93495 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.store.http;
 
-import java.util.List;
-
 import org.apache.drill.common.exceptions.ChildErrorContext;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
@@ -36,6 +34,8 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
+import java.util.List;
+
 public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> {
 
   @Override
@@ -92,11 +92,18 @@ public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> {
     @Override
     public ManagedReader<SchemaNegotiator> next() {
 
+      // Get the expected input type
+      String inputType = subScan.tableSpec().connectionConfig().inputType();
+
       // Only a single scan (in a single thread)
       if (count++ == 0) {
-        return new HttpBatchReader(subScan);
-      } else {
-        return null;
+        if (inputType.equalsIgnoreCase("csv")) {
+          return new HttpCSVBatchReader(subScan);
+        } else {
+          return new HttpBatchReader(subScan);
+        }
       }
+      return null;
     }
-  }}
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
index 620fc9f..35e5654 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
@@ -37,23 +37,14 @@ import java.util.Objects;
 @JsonTypeName(HttpStoragePluginConfig.NAME)
 public class HttpStoragePluginConfig extends StoragePluginConfigBase {
   private static final Logger logger = LoggerFactory.getLogger(HttpStoragePluginConfig.class);
-
   public static final String NAME = "http";
-
   public final Map<String, HttpApiConfig> connections;
-
   public final boolean cacheResults;
-
   public final String proxyHost;
-
   public final int proxyPort;
-
   public final String proxyType;
-
   public final String proxyUsername;
-
   public final String proxyPassword;
-
   /**
    * Timeout in seconds.
    */
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index 6434cff..4c20e99 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -18,19 +18,9 @@
 
 package org.apache.drill.exec.store.http;
 
-import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.util.DrillFileUtils;
 import org.apache.drill.exec.physical.rowSet.RowSet;
@@ -46,9 +36,18 @@ import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import okhttp3.mockwebserver.MockResponse;
-import okhttp3.mockwebserver.MockWebServer;
-import okhttp3.mockwebserver.RecordedRequest;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests the HTTP Storage plugin. Since the plugin makes use of REST requests,
@@ -64,12 +63,14 @@ public class TestHttpPlugin extends ClusterTest {
 
   private static final int MOCK_SERVER_PORT = 8091;
   private static String TEST_JSON_RESPONSE;
+  private static String TEST_CSV_RESPONSE;
 
   @BeforeClass
   public static void setup() throws Exception {
     startCluster(ClusterFixture.builder(dirTestWatcher));
 
     TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response.json"), Charsets.UTF_8).read();
+    TEST_CSV_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response.csv"), Charsets.UTF_8).read();
 
     dirTestWatcher.copyResourceToRoot(Paths.get("data/"));
     makeLiveConfig();
@@ -83,12 +84,12 @@ public class TestHttpPlugin extends ClusterTest {
    */
   private static void makeLiveConfig() {
 
-    HttpApiConfig sunriseConfig = new HttpApiConfig("https://api.sunrise-sunset.org/json", "GET", null, null, null, null, null, null, null, null);
+    HttpApiConfig sunriseConfig = new HttpApiConfig("https://api.sunrise-sunset.org/json", "GET", null, null, null, null, null, null, null, null, null);
     HttpApiConfig sunriseWithParamsConfig = new HttpApiConfig("https://api.sunrise-sunset.org/json", "GET", null, null, null, null, null,
-        Arrays.asList("lat", "lng", "date"), "results", false);
+        Arrays.asList("lat", "lng", "date"), "results", false, null);
 
     HttpApiConfig stockConfig = new HttpApiConfig("https://api.worldtradingdata.com/api/v1/stock?symbol=SNAP,TWTR,VOD" +
-      ".L&api_token=zuHlu2vZaehdZN6GmJdTiVlp7xgZn6gl6sfgmI4G6TY4ej0NLOzvy0TUl4D4", "get", null, null, null, null, null, null, null, null);
+      ".L&api_token=zuHlu2vZaehdZN6GmJdTiVlp7xgZn6gl6sfgmI4G6TY4ej0NLOzvy0TUl4D4", "get", null, null, null, null, null, null, null, null, null);
 
     Map<String, HttpApiConfig> configs = new HashMap<>();
     configs.put("stock", stockConfig);
@@ -111,11 +112,11 @@ public class TestHttpPlugin extends ClusterTest {
     headers.put("header1", "value1");
     headers.put("header2", "value2");
 
-    // Use the mock server with HTTP parameters passed as  table name.
+    // Use the mock server with HTTP parameters passed as table name.
     // The connection acts like a schema.
     // Ignores the message body except for data.
     HttpApiConfig mockSchema = new HttpApiConfig("http://localhost:8091/json", "GET", headers,
-        "basic", "user", "pass", null, null, "results", null);
+        "basic", "user", "pass", null, null, "results", null, null);
 
     // Use the mock server with the HTTP parameters passed as WHERE
     // clause filters. The connection acts like a table.
@@ -123,14 +124,18 @@ public class TestHttpPlugin extends ClusterTest {
     // This is the preferred approach, the base URL contains as much info as possible;
     // all other parameters are specified in SQL. See README for an example.
     HttpApiConfig mockTable = new HttpApiConfig("http://localhost:8091/json", "GET", headers,
-        "basic", "user", "pass", null, Arrays.asList("lat", "lng", "date"), "results", false);
+        "basic", "user", "pass", null, Arrays.asList("lat", "lng", "date"), "results", false, null);
 
-    HttpApiConfig mockPostConfig = new HttpApiConfig("http://localhost:8091/", "POST", headers, null, null, null, "key1=value1\nkey2=value2", null, null, null);
+    HttpApiConfig mockPostConfig = new HttpApiConfig("http://localhost:8091/", "POST", headers, null, null, null, "key1=value1\nkey2=value2", null, null, null, null);
+
+    HttpApiConfig mockCsvConfig = new HttpApiConfig("http://localhost:8091/csv", "GET", headers,
+      "basic", "user", "pass", null, null, "results", null, "csv");
 
     Map<String, HttpApiConfig> configs = new HashMap<>();
     configs.put("sunrise", mockSchema);
     configs.put("mocktable", mockTable);
     configs.put("mockpost", mockPostConfig);
+    configs.put("mockcsv", mockCsvConfig);
 
     HttpStoragePluginConfig mockStorageConfigWithWorkspace = new HttpStoragePluginConfig(false, configs, 2, "", 80, "", "", "");
     mockStorageConfigWithWorkspace.setEnabled(true);
@@ -155,6 +160,7 @@ public class TestHttpPlugin extends ClusterTest {
         .addRow("live.stock", "http")
         .addRow("live.sunrise", "http")
         .addRow("local", "http")
+        .addRow("local.mockcsv", "http")
         .addRow("local.mockpost", "http")
         .addRow("local.sunrise", "http")
         .build();
@@ -321,6 +327,31 @@ public class TestHttpPlugin extends ClusterTest {
     doSimpleTestWithMockServer(sql);
   }
 
+  @Test
+  public void testCsvResponse() throws Exception {
+    String sql = "SELECT * FROM local.mockcsv.`csv?arg1=4`";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_CSV_RESPONSE));
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("col1", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("col2", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("col3", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("1", "2", "3")
+        .addRow("4", "5", "6")
+        .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
+
   private void doSimpleTestWithMockServer(String sql) throws Exception {
     try (MockWebServer server = startServer()) {
 
diff --git a/contrib/storage-http/src/test/resources/data/response.csv b/contrib/storage-http/src/test/resources/data/response.csv
new file mode 100644
index 0000000..5527e73
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/response.csv
@@ -0,0 +1,3 @@
+"col1","col2","col3"
+"1","2","3"
+"4","5","6"
\ No newline at end of file