You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2020/06/03 20:28:40 UTC
[drill] branch master updated: DRILL-7746: Add REST API Ability to
Accept CSV Input
This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 7eafac6 DRILL-7746: Add REST API Ability to Accept CSV Input
7eafac6 is described below
commit 7eafac6629cd50b8e329d74b69fd78379c7d6666
Author: Charles Givre <cg...@apache.org>
AuthorDate: Tue Jun 2 15:23:21 2020 -0400
DRILL-7746: Add REST API Ability to Accept CSV Input
---
contrib/storage-http/README.md | 2 +-
contrib/storage-http/pom.xml | 1 -
.../drill/exec/store/http/HttpApiConfig.java | 17 +-
.../drill/exec/store/http/HttpBatchReader.java | 24 ++-
.../drill/exec/store/http/HttpCSVBatchReader.java | 186 +++++++++++++++++++++
.../exec/store/http/HttpScanBatchCreator.java | 19 ++-
.../exec/store/http/HttpStoragePluginConfig.java | 9 -
.../drill/exec/store/http/TestHttpPlugin.java | 77 ++++++---
.../src/test/resources/data/response.csv | 3 +
9 files changed, 282 insertions(+), 56 deletions(-)
diff --git a/contrib/storage-http/README.md b/contrib/storage-http/README.md
index 0daa74a..19da0d1 100644
--- a/contrib/storage-http/README.md
+++ b/contrib/storage-http/README.md
@@ -500,7 +500,7 @@ ORDER BY issue_count DESC
supported). Join pushdown has the potential to improve performance if you use the HTTP service
joined to another table.
-4. This plugin only reads JSON responses.
+4. This plugin only reads JSON and CSV responses.
5. `POST` bodies can only be in the format of key/value pairs. Some APIs accept
JSON based `POST` bodies but this is not currently supported.
diff --git a/contrib/storage-http/pom.xml b/contrib/storage-http/pom.xml
index 0c2c875..2e6fb3d 100644
--- a/contrib/storage-http/pom.xml
+++ b/contrib/storage-http/pom.xml
@@ -46,7 +46,6 @@
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
</dependency>
-
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.drill.exec</groupId>
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
index e93ff92..d32821a 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
@@ -67,6 +67,8 @@ public class HttpApiConfig {
private final String authType;
private final String userName;
private final String password;
+ private final String inputType;
+
public enum HttpMethod {
/**
@@ -88,7 +90,8 @@ public class HttpApiConfig {
@JsonProperty("postBody") String postBody,
@JsonProperty("params") List<String> params,
@JsonProperty("dataPath") String dataPath,
- @JsonProperty("requireTail") Boolean requireTail) {
+ @JsonProperty("requireTail") Boolean requireTail,
+ @JsonProperty("inputType") String inputType) {
this.headers = headers;
this.method = Strings.isNullOrEmpty(method)
@@ -125,6 +128,9 @@ public class HttpApiConfig {
// Default to true for backward compatibility with first PR.
this.requireTail = requireTail == null ? true : requireTail;
+
+ this.inputType = inputType == null
+ ? "json" : inputType.trim().toLowerCase();
}
@JsonProperty("url")
@@ -162,10 +168,13 @@ public class HttpApiConfig {
return HttpMethod.valueOf(this.method());
}
+ @JsonProperty("inputType")
+ public String inputType() { return inputType; }
+
@Override
public int hashCode() {
return Objects.hash(url, method, requireTail, params, headers,
- authType, userName, password, postBody);
+ authType, userName, password, postBody, inputType);
}
@Override
@@ -181,6 +190,7 @@ public class HttpApiConfig {
.maskedField("password", password)
.field("postBody", postBody)
.field("filterFields", params)
+ .field("inputType", inputType)
.toString();
}
@@ -202,6 +212,7 @@ public class HttpApiConfig {
&& Objects.equals(postBody, other.postBody)
&& Objects.equals(params, other.params)
&& Objects.equals(dataPath, other.dataPath)
- && Objects.equals(requireTail, other.requireTail);
+ && Objects.equals(requireTail, other.requireTail)
+ && Objects.equals(inputType, other.inputType);
}
}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index 7f0efd9..1b322dd 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -17,11 +17,9 @@
*/
package org.apache.drill.exec.store.http;
-import java.io.File;
-import java.io.InputStream;
-import java.util.List;
-import java.util.Map;
-
+import com.typesafe.config.Config;
+import okhttp3.HttpUrl;
+import okhttp3.HttpUrl.Builder;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.ChildErrorContext;
import org.apache.drill.common.exceptions.CustomErrorContext;
@@ -35,10 +33,10 @@ import org.apache.drill.exec.store.http.util.HttpProxyConfig;
import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyBuilder;
import org.apache.drill.exec.store.http.util.SimpleHttp;
-import com.typesafe.config.Config;
-
-import okhttp3.HttpUrl;
-import okhttp3.HttpUrl.Builder;
+import java.io.File;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
private final HttpSubScan subScan;
@@ -95,7 +93,7 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
return true; // Please read the first batch
}
- private HttpUrl buildUrl() {
+ protected HttpUrl buildUrl() {
HttpApiConfig apiConfig = subScan.tableSpec().connectionConfig();
String baseUrl = apiConfig.url();
@@ -115,7 +113,7 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
* Convert equality filter conditions into HTTP query parameters
* Parameters must appear in the order defined in the config.
*/
- private void addFilters(Builder urlBuilder, List<String> params,
+ protected void addFilters(Builder urlBuilder, List<String> params,
Map<String, String> filters) {
for (String param : params) {
String value = filters.get(param);
@@ -125,7 +123,7 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
}
}
- private HttpProxyConfig proxySettings(Config drillConfig, HttpUrl url) {
+ protected HttpProxyConfig proxySettings(Config drillConfig, HttpUrl url) {
final HttpStoragePluginConfig config = subScan.tableSpec().config();
final ProxyBuilder builder = HttpProxyConfig.builder()
.fromConfigForURL(drillConfig, url.toString());
@@ -141,7 +139,7 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
return builder.build();
}
- @Override
+ @Override
public boolean next() {
return jsonLoader.readBatch();
}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
new file mode 100644
index 0000000..f7db13b
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http;
+
+import com.univocity.parsers.csv.CsvParser;
+import com.univocity.parsers.csv.CsvParserSettings;
+import okhttp3.HttpUrl;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.http.util.SimpleHttp;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HttpCSVBatchReader extends HttpBatchReader {
+ private final HttpSubScan subScan;
+ private final CsvParserSettings csvSettings;
+ private CsvParser csvReader;
+ private List<StringColumnWriter> columnWriters;
+ private String[] firstRow;
+ private SchemaBuilder builder;
+ private RowSetLoader rowWriter;
+ private InputStream inStream;
+
+
+ public HttpCSVBatchReader(HttpSubScan subScan) {
+ super(subScan);
+ this.subScan = subScan;
+
+ this.csvSettings = new CsvParserSettings();
+ csvSettings.setLineSeparatorDetectionEnabled(true);
+ }
+
+ @Override
+ public boolean open(SchemaNegotiator negotiator) {
+
+ // Result set loader setup
+ String tempDirPath = negotiator.drillConfig().getString(ExecConstants.DRILL_TMP_DIR);
+
+ HttpUrl url = buildUrl();
+
+ CustomErrorContext errorContext = new ChildErrorContext(negotiator.parentErrorContext()) {
+ @Override
+ public void addContext(UserException.Builder builder) {
+ super.addContext(builder);
+ builder.addContext("URL", url.toString());
+ }
+ };
+ negotiator.setErrorContext(errorContext);
+
+ // Http client setup
+ SimpleHttp http = new SimpleHttp(subScan, url, new File(tempDirPath), proxySettings(negotiator.drillConfig(), url), errorContext);
+
+ // CSV loader setup
+ inStream = http.getInputStream();
+
+ this.csvReader = new CsvParser(csvSettings);
+ csvReader.beginParsing(inStream);
+
+ // Build the Schema
+ builder = new SchemaBuilder();
+ TupleMetadata drillSchema = buildSchema();
+ negotiator.tableSchema(drillSchema, true);
+ ResultSetLoader resultLoader = negotiator.build();
+
+ // Create ScalarWriters
+ rowWriter = resultLoader.writer();
+ populateWriterArray();
+
+ return true;
+ }
+
+ @Override
+ public boolean next() {
+ while (!rowWriter.isFull()) {
+ if (!processRow()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void close() {
+ AutoCloseables.closeSilently(inStream);
+ }
+
+ private TupleMetadata buildSchema() {
+ firstRow = csvReader.parseNext();
+ if (firstRow != null) {
+ for (String value : firstRow) {
+ builder.addNullable(value, TypeProtos.MinorType.VARCHAR);
+ }
+ }
+ return builder.buildSchema();
+ }
+
+ private void populateWriterArray() {
+ // Case for empty result set
+ if (firstRow == null || firstRow.length == 0) {
+ return;
+ }
+
+ columnWriters = new ArrayList<>();
+
+ int colPosition = 0;
+ for (String value : firstRow) {
+ columnWriters.add(new StringColumnWriter(value, rowWriter, colPosition));
+ colPosition++;
+ }
+ }
+
+ private boolean processRow() {
+ String[] nextRow = csvReader.parseNext();
+ if (nextRow == null) {
+ return false;
+ }
+ rowWriter.start();
+ for (StringColumnWriter columnWriter : columnWriters) {
+ columnWriter.load(nextRow);
+ }
+ rowWriter.save();
+ return true;
+ }
+
+ public abstract static class ColumnWriter {
+
+ final String colName;
+ ScalarWriter columnWriter;
+ int columnIndex;
+
+ public ColumnWriter(String colName, ScalarWriter writer, int columnIndex) {
+ this.colName = colName;
+ this.columnWriter = writer;
+ this.columnIndex = columnIndex;
+ }
+
+ public void load(String[] record) {}
+ }
+
+ public static class StringColumnWriter extends ColumnWriter {
+
+ StringColumnWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+ super(colName, rowWriter.scalar(colName), columnIndex);
+ }
+
+ @Override
+ public void load(String[] record) {
+ String value = record[columnIndex];
+ if (Strings.isNullOrEmpty(value)) {
+ columnWriter.setNull();
+ } else {
+ columnWriter.setString(value);
+ }
+ }
+ }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
index effe0d3..0f93495 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.store.http;
-import java.util.List;
-
import org.apache.drill.common.exceptions.ChildErrorContext;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
@@ -36,6 +34,8 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import java.util.List;
+
public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> {
@Override
@@ -92,11 +92,18 @@ public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> {
@Override
public ManagedReader<SchemaNegotiator> next() {
+ // Get the expected input type
+ String inputType = subScan.tableSpec().connectionConfig().inputType();
+
// Only a single scan (in a single thread)
if (count++ == 0) {
- return new HttpBatchReader(subScan);
- } else {
- return null;
+ if (inputType.equalsIgnoreCase("csv")) {
+ return new HttpCSVBatchReader(subScan);
+ } else {
+ return new HttpBatchReader(subScan);
+ }
}
+ return null;
}
- }}
+ }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
index 620fc9f..35e5654 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
@@ -37,23 +37,14 @@ import java.util.Objects;
@JsonTypeName(HttpStoragePluginConfig.NAME)
public class HttpStoragePluginConfig extends StoragePluginConfigBase {
private static final Logger logger = LoggerFactory.getLogger(HttpStoragePluginConfig.class);
-
public static final String NAME = "http";
-
public final Map<String, HttpApiConfig> connections;
-
public final boolean cacheResults;
-
public final String proxyHost;
-
public final int proxyPort;
-
public final String proxyType;
-
public final String proxyUsername;
-
public final String proxyPassword;
-
/**
* Timeout in seconds.
*/
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index 6434cff..4c20e99 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -18,19 +18,9 @@
package org.apache.drill.exec.store.http;
-import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.util.DrillFileUtils;
import org.apache.drill.exec.physical.rowSet.RowSet;
@@ -46,9 +36,18 @@ import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
-import okhttp3.mockwebserver.MockResponse;
-import okhttp3.mockwebserver.MockWebServer;
-import okhttp3.mockwebserver.RecordedRequest;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Tests the HTTP Storage plugin. Since the plugin makes use of REST requests,
@@ -64,12 +63,14 @@ public class TestHttpPlugin extends ClusterTest {
private static final int MOCK_SERVER_PORT = 8091;
private static String TEST_JSON_RESPONSE;
+ private static String TEST_CSV_RESPONSE;
@BeforeClass
public static void setup() throws Exception {
startCluster(ClusterFixture.builder(dirTestWatcher));
TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response.json"), Charsets.UTF_8).read();
+ TEST_CSV_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response.csv"), Charsets.UTF_8).read();
dirTestWatcher.copyResourceToRoot(Paths.get("data/"));
makeLiveConfig();
@@ -83,12 +84,12 @@ public class TestHttpPlugin extends ClusterTest {
*/
private static void makeLiveConfig() {
- HttpApiConfig sunriseConfig = new HttpApiConfig("https://api.sunrise-sunset.org/json", "GET", null, null, null, null, null, null, null, null);
+ HttpApiConfig sunriseConfig = new HttpApiConfig("https://api.sunrise-sunset.org/json", "GET", null, null, null, null, null, null, null, null, null);
HttpApiConfig sunriseWithParamsConfig = new HttpApiConfig("https://api.sunrise-sunset.org/json", "GET", null, null, null, null, null,
- Arrays.asList("lat", "lng", "date"), "results", false);
+ Arrays.asList("lat", "lng", "date"), "results", false, null);
HttpApiConfig stockConfig = new HttpApiConfig("https://api.worldtradingdata.com/api/v1/stock?symbol=SNAP,TWTR,VOD" +
- ".L&api_token=zuHlu2vZaehdZN6GmJdTiVlp7xgZn6gl6sfgmI4G6TY4ej0NLOzvy0TUl4D4", "get", null, null, null, null, null, null, null, null);
+ ".L&api_token=zuHlu2vZaehdZN6GmJdTiVlp7xgZn6gl6sfgmI4G6TY4ej0NLOzvy0TUl4D4", "get", null, null, null, null, null, null, null, null, null);
Map<String, HttpApiConfig> configs = new HashMap<>();
configs.put("stock", stockConfig);
@@ -111,11 +112,11 @@ public class TestHttpPlugin extends ClusterTest {
headers.put("header1", "value1");
headers.put("header2", "value2");
- // Use the mock server with HTTP parameters passed as table name.
+ // Use the mock server with HTTP parameters passed as table name.
// The connection acts like a schema.
// Ignores the message body except for data.
HttpApiConfig mockSchema = new HttpApiConfig("http://localhost:8091/json", "GET", headers,
- "basic", "user", "pass", null, null, "results", null);
+ "basic", "user", "pass", null, null, "results", null, null);
// Use the mock server with the HTTP parameters passed as WHERE
// clause filters. The connection acts like a table.
@@ -123,14 +124,18 @@ public class TestHttpPlugin extends ClusterTest {
// This is the preferred approach, the base URL contains as much info as possible;
// all other parameters are specified in SQL. See README for an example.
HttpApiConfig mockTable = new HttpApiConfig("http://localhost:8091/json", "GET", headers,
- "basic", "user", "pass", null, Arrays.asList("lat", "lng", "date"), "results", false);
+ "basic", "user", "pass", null, Arrays.asList("lat", "lng", "date"), "results", false, null);
- HttpApiConfig mockPostConfig = new HttpApiConfig("http://localhost:8091/", "POST", headers, null, null, null, "key1=value1\nkey2=value2", null, null, null);
+ HttpApiConfig mockPostConfig = new HttpApiConfig("http://localhost:8091/", "POST", headers, null, null, null, "key1=value1\nkey2=value2", null, null, null, null);
+
+ HttpApiConfig mockCsvConfig = new HttpApiConfig("http://localhost:8091/csv", "GET", headers,
+ "basic", "user", "pass", null, null, "results", null, "csv");
Map<String, HttpApiConfig> configs = new HashMap<>();
configs.put("sunrise", mockSchema);
configs.put("mocktable", mockTable);
configs.put("mockpost", mockPostConfig);
+ configs.put("mockcsv", mockCsvConfig);
HttpStoragePluginConfig mockStorageConfigWithWorkspace = new HttpStoragePluginConfig(false, configs, 2, "", 80, "", "", "");
mockStorageConfigWithWorkspace.setEnabled(true);
@@ -155,6 +160,7 @@ public class TestHttpPlugin extends ClusterTest {
.addRow("live.stock", "http")
.addRow("live.sunrise", "http")
.addRow("local", "http")
+ .addRow("local.mockcsv", "http")
.addRow("local.mockpost", "http")
.addRow("local.sunrise", "http")
.build();
@@ -321,6 +327,31 @@ public class TestHttpPlugin extends ClusterTest {
doSimpleTestWithMockServer(sql);
}
+ @Test
+ public void testCsvResponse() throws Exception {
+ String sql = "SELECT * FROM local.mockcsv.`csv?arg1=4`";
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_CSV_RESPONSE));
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("col1", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("col2", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("col3", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("1", "2", "3")
+ .addRow("4", "5", "6")
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+ }
+
+
private void doSimpleTestWithMockServer(String sql) throws Exception {
try (MockWebServer server = startServer()) {
diff --git a/contrib/storage-http/src/test/resources/data/response.csv b/contrib/storage-http/src/test/resources/data/response.csv
new file mode 100644
index 0000000..5527e73
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/response.csv
@@ -0,0 +1,3 @@
+"col1","col2","col3"
+"1","2","3"
+"4","5","6"
\ No newline at end of file