You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2022/06/15 08:31:08 UTC
[drill] branch master updated: DRILL-8248: Fix http_request for several rows (#2573)
This is an automated email from the ASF dual-hosted git repository.
vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new c58d8a080c DRILL-8248: Fix http_request for several rows (#2573)
c58d8a080c is described below
commit c58d8a080ccdf2b1361159de8c792906770be5f8
Author: Vitalii Diravka <vi...@apache.org>
AuthorDate: Wed Jun 15 11:31:01 2022 +0300
DRILL-8248: Fix http_request for several rows (#2573)
---
.../exec/store/http/udfs/HttpHelperFunctions.java | 82 ++++++++++++----------
.../drill/exec/store/http/udfs/HttpUdfUtils.java | 56 ++++++++-------
.../exec/store/http/TestHttpUDFFunctions.java | 35 ++++++++-
.../storage-http/src/test/resources/data/p4.json | 2 +
.../store/kafka/decoders/JsonMessageReader.java | 22 +-----
.../physical/impl/project/ProjectRecordBatch.java | 19 ++---
.../resultSet/impl/ResultSetLoaderImpl.java | 3 +-
.../store/easy/json/loader/JsonLoaderImpl.java | 15 ++++
.../easy/json/loader/SingleElementIterator.java | 45 ++++++++++++
9 files changed, 182 insertions(+), 97 deletions(-)
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
index 5e73b716ca..a95bc9ff60 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
@@ -52,55 +52,57 @@ public class HttpHelperFunctions {
OptionManager options;
@Inject
- ResultSetLoader loader;
+ ResultSetLoader rsLoader;
@Workspace
- org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
+ org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader;
+
+ @Workspace
+ org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<java.io.InputStream> stream;
@Override
public void setup() {
- jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
- .resultSetLoader(loader)
- .standardOptions(options);
+ stream = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>();
+ rsLoader.startBatch();
}
@Override
public void eval() {
// Get the URL
String url = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
-
// Process Positional Arguments
java.util.List args = org.apache.drill.exec.store.http.util.SimpleHttp.buildParameterList(inputReaders);
// If the arg list is null, indicating at least one null arg, return an empty map
// as an approximation of null-if-null handling.
if (args == null) {
- // Return empty map
return;
}
-
String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args);
-
// Make the API call
java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.getRequestAndStreamResponse(finalUrl);
-
// If the result string is null or empty, return an empty map
if (results == null) {
- // Return empty map
return;
}
-
try {
- jsonLoaderBuilder.fromStream(results);
- org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
- loader.startBatch();
- jsonLoader.readBatch();
+ stream.setValue(results);
+ if (jsonLoader == null) {
+ jsonLoader = org.apache.drill.exec.store.http.udfs.HttpUdfUtils.createJsonLoader(rsLoader, options, stream);
+ }
+ org.apache.drill.exec.physical.resultSet.RowSetLoader rowWriter = rsLoader.writer();
+ rowWriter.start();
+ if (jsonLoader.parser().next()) {
+ rowWriter.save();
+ }
} catch (Exception e) {
- throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
+ throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
+ .message("Error while reading JSON. ")
+ .addContext(e.getMessage())
+ .build();
}
}
}
-
@FunctionTemplate(names = {"http_request", "httpRequest"},
scope = FunctionTemplate.FunctionScope.SIMPLE,
isVarArg = true)
@@ -122,10 +124,10 @@ public class HttpHelperFunctions {
DrillbitContext drillbitContext;
@Inject
- ResultSetLoader loader;
+ ResultSetLoader rsLoader;
@Workspace
- org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
+ org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader;
@Workspace
org.apache.drill.exec.store.http.HttpStoragePlugin plugin;
@@ -133,6 +135,9 @@ public class HttpHelperFunctions {
@Workspace
org.apache.drill.exec.store.http.HttpApiConfig endpointConfig;
+ @Workspace
+ org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<java.io.InputStream> stream;
+
@Override
public void setup() {
String schemaPath = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
@@ -154,10 +159,9 @@ public class HttpHelperFunctions {
endpointName,
plugin.getConfig()
);
-
+ stream = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>();
// Add JSON configuration from Storage plugin, if present.
- jsonLoaderBuilder = org.apache.drill.exec.store.http.udfs.HttpUdfUtils.setupJsonBuilder(endpointConfig, loader, options);
-
+ rsLoader.startBatch();
}
@Override
@@ -167,30 +171,30 @@ public class HttpHelperFunctions {
// If the arg list is null, indicating at least one null arg, return an empty map
// as an approximation of null-if-null handling.
if (args == null) {
- // Return empty map
return;
}
-
- java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(
- plugin,
- endpointConfig,
- drillbitContext,
- args
- ).getInputStream();
-
+ java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(plugin, endpointConfig, drillbitContext, args)
+ .getInputStream();
// If the result string is null or empty, return an empty map
if (results == null) {
- // Return empty map
return;
}
-
try {
- jsonLoaderBuilder.fromStream(results);
- org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
- loader.startBatch();
- jsonLoader.readBatch();
+ stream.setValue(results);
+ if (jsonLoader == null) {
+ // Add JSON configuration from Storage plugin, if present.
+ jsonLoader = org.apache.drill.exec.store.http.udfs.HttpUdfUtils.createJsonLoader(endpointConfig, rsLoader, options, stream);
+ }
+ org.apache.drill.exec.physical.resultSet.RowSetLoader rowWriter = rsLoader.writer();
+ rowWriter.start();
+ if (jsonLoader.parser().next()) {
+ rowWriter.save();
+ }
} catch (Exception e) {
- throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
+ throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
+ .message("Error while reading JSON. ")
+ .addContext(e.getMessage())
+ .build();
}
}
}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java
index 93f078f7e2..69dc859a35 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java
@@ -21,42 +21,50 @@ package org.apache.drill.exec.store.http.udfs;
import org.apache.commons.lang3.StringUtils;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
-import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
+import org.apache.drill.exec.store.easy.json.loader.SingleElementIterator;
import org.apache.drill.exec.store.http.HttpApiConfig;
-import org.apache.drill.exec.store.http.HttpJsonOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.InputStream;
+
public class HttpUdfUtils {
private static final Logger logger = LoggerFactory.getLogger(HttpUdfUtils.class);
- public static JsonLoaderBuilder setupJsonBuilder(HttpApiConfig endpointConfig, ResultSetLoader loader, OptionManager options) {
- loader.setTargetRowCount(1);
+ public static JsonLoaderImpl createJsonLoader(ResultSetLoader rsLoader,
+ OptionManager options,
+ SingleElementIterator<InputStream> stream) {
+ return createJsonLoader(null, rsLoader, options, stream);
+ }
+ public static JsonLoaderImpl createJsonLoader(HttpApiConfig endpointConfig, ResultSetLoader rsLoader,
+ OptionManager options, SingleElementIterator<InputStream> stream) {
// Add JSON configuration from Storage plugin, if present.
- HttpJsonOptions jsonOptions = endpointConfig.jsonOptions();
- JsonLoaderBuilder jsonLoaderBuilder = new JsonLoaderBuilder()
- .resultSetLoader(loader)
- .maxRows(1)
- .standardOptions(options);
-
+ org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder =
+ new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
+ .resultSetLoader(rsLoader)
+ .standardOptions(options)
+ .fromStream(() -> stream);
// Add data path if present
- if (StringUtils.isNotEmpty(endpointConfig.dataPath())) {
- jsonLoaderBuilder.dataPath(endpointConfig.dataPath());
- }
-
- if (jsonOptions != null) {
- // Add options from endpoint configuration to jsonLoader
- JsonLoaderOptions jsonLoaderOptions = jsonOptions.getJsonOptions(options);
- jsonLoaderBuilder.options(jsonLoaderOptions);
+ if (endpointConfig != null) {
+ if (StringUtils.isNotEmpty(endpointConfig.dataPath())) {
+ jsonLoaderBuilder.dataPath(endpointConfig.dataPath());
+ }
+ // Add JSON configuration from Storage plugin, if present.
+ org.apache.drill.exec.store.http.HttpJsonOptions jsonOptions = endpointConfig.jsonOptions();
+ if (jsonOptions != null) {
+ // Add options from endpoint configuration to jsonLoader
+ org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions jsonLoaderOptions = jsonOptions.getJsonOptions(options);
+ jsonLoaderBuilder.options(jsonLoaderOptions);
- // Add provided schema if present
- if (jsonOptions.schema() != null) {
- logger.debug("Found schema: {}", jsonOptions.schema());
- jsonLoaderBuilder.providedSchema(jsonOptions.schema());
+ // Add provided schema if present
+ if (jsonOptions.schema() != null) {
+ logger.debug("Found schema: {}", jsonOptions.schema());
+ jsonLoaderBuilder.providedSchema(jsonOptions.schema());
+ }
}
}
- return jsonLoaderBuilder;
+ return (org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl) jsonLoaderBuilder.build();
}
}
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
index 2cbe8d4b85..c5901b9087 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
+import org.apache.drill.exec.store.http.udfs.HttpUdfUtils;
import org.apache.drill.exec.store.http.util.SimpleHttp;
import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
@@ -68,7 +69,7 @@ public class TestHttpUDFFunctions extends ClusterTest {
private static String TEST_JSON_PAGE1;
private static String DUMMY_URL = "http://localhost:" + MOCK_SERVER_PORT;
protected static LogFixture logFixture;
- private final static Level CURRENT_LOG_LEVEL = Level.INFO;
+ private final static Level CURRENT_LOG_LEVEL = Level.DEBUG;
@BeforeClass
public static void setup() throws Exception {
@@ -79,6 +80,7 @@ public class TestHttpUDFFunctions extends ClusterTest {
.logger(JsonLoaderImpl.class, CURRENT_LOG_LEVEL)
.logger(IteratorValidatorBatchIterator.class, CURRENT_LOG_LEVEL)
.logger(ResultSetLoaderImpl.class, CURRENT_LOG_LEVEL)
+ .logger(HttpUdfUtils.class, CURRENT_LOG_LEVEL)
.build();
startCluster(ClusterFixture.builder(dirTestWatcher));
TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/simple.json"), Charsets.UTF_8).read();
@@ -115,7 +117,7 @@ public class TestHttpUDFFunctions extends ClusterTest {
configs.put("basicJson", basicJson);
HttpStoragePluginConfig mockStorageConfigWithWorkspace =
- new HttpStoragePluginConfig(false, configs, 2, "globaluser", "globalpass", "",
+ new HttpStoragePluginConfig(false, configs, 200, "globaluser", "globalpass", "",
80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
UsernamePasswordCredentials.USERNAME, "globaluser",
UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
@@ -130,6 +132,8 @@ public class TestHttpUDFFunctions extends ClusterTest {
server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1));
RowSet results = client.queryBuilder().sql(sql).rowSet();
+ assertEquals(1, results.rowCount());
+
TupleMetadata expectedSchema = new SchemaBuilder()
.addMap("data")
.addNullable("col_1", MinorType.FLOAT8)
@@ -147,6 +151,33 @@ public class TestHttpUDFFunctions extends ClusterTest {
}
}
+ @Test
+ public void testSeveralRowsAndRequests() throws Exception {
+ String sql = "SELECT http_request('local.basicJson', `col1`) as data FROM cp.`/data/p4.json`";
+ try (MockWebServer server = startServer()) {
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1));
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1));
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ assertEquals(2, results.rowCount());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addMap("data")
+ .addNullable("col_1", MinorType.FLOAT8)
+ .addNullable("col_2", MinorType.FLOAT8)
+ .addNullable("col_3", MinorType.FLOAT8)
+ .resumeSchema()
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(singleMap(mapValue(1.0, 2.0, 3.0)))
+ .addRow(singleMap(mapValue(4.0, 5.0, 6.0)))
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+ }
+
@Test
public void testHttpGetWithNoParams() throws Exception {
try (MockWebServer server = startServer()) {
diff --git a/contrib/storage-http/src/test/resources/data/p4.json b/contrib/storage-http/src/test/resources/data/p4.json
new file mode 100644
index 0000000000..43a890b171
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/p4.json
@@ -0,0 +1,2 @@
+{"col1": "apache"}
+{"col1": "ddr"}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
index a9aee5a990..ab135ade82 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
+import org.apache.drill.exec.store.easy.json.loader.SingleElementIterator;
import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
import org.apache.drill.exec.store.kafka.MetaDataField;
@@ -38,7 +39,6 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
-import java.util.Iterator;
import java.util.Properties;
import java.util.StringJoiner;
@@ -156,24 +156,4 @@ public class JsonMessageReader implements MessageReader {
.add("resultSetLoader=" + resultSetLoader)
.toString();
}
-
- public static class SingleElementIterator<T> implements Iterator<T> {
- private T value;
-
- @Override
- public boolean hasNext() {
- return value != null;
- }
-
- @Override
- public T next() {
- T value = this.value;
- this.value = null;
- return value;
- }
-
- public void setValue(T value) {
- this.value = value;
- }
- }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 89e1beb6ea..a23227a275 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.project;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.types.TypeProtos;
@@ -110,7 +111,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
memoryManager.update();
if (first && incomingRecordCount == 0) {
- if (complexWriters != null || rsLoader != null ) {
+ if (!CollectionUtils.isEmpty(complexWriters) || rsLoader != null ) {
IterOutcome next = null;
while (incomingRecordCount == 0) {
if (getLastKnownOutcome() == EMIT) {
@@ -145,7 +146,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
}
- if ((complexWriters != null || rsLoader != null) && getLastKnownOutcome() == EMIT) {
+ if ((!CollectionUtils.isEmpty(complexWriters) || rsLoader != null) && getLastKnownOutcome() == EMIT) {
throw UserException.unsupportedError()
.message("Currently functions producing complex types as output are not " +
"supported in project list for subquery between LATERAL and UNNEST. Please re-write the query using this " +
@@ -177,7 +178,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
// In case of complex writer expression, vectors would be added to batch run-time.
// We have to re-build the schema.
- if (rsLoader != null && !rsLoader.isProjectionEmpty()) {
+ if (rsLoader != null) {
MapVector map = container.addOrGet(container.getLast().getField().getName(), Types.required(TypeProtos.MinorType.MAP), MapVector.class);
map.setMapValueCount(recordCount);
for (VectorWrapper<?> vectorWrapper : rsLoader.harvest()) {
@@ -185,7 +186,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
map.putChild(valueVector.getField().getName(), valueVector);
}
container.buildSchema(SelectionVectorMode.NONE);
- } else if (complexWriters != null) {
+ } else if (!CollectionUtils.isEmpty(complexWriters)) {
container.buildSchema(SelectionVectorMode.NONE);
}
@@ -224,7 +225,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
// In case of complex writer expression, vectors would be added to batch run-time.
// We have to re-build the schema.
- if (complexWriters != null || rsLoader != null) {
+ if (!CollectionUtils.isEmpty(complexWriters) || rsLoader != null) {
container.buildSchema(SelectionVectorMode.NONE);
}
@@ -358,7 +359,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
protected IterOutcome getFinalOutcome(boolean hasMoreRecordInBoundary) {
// In a case of complex writers vectors are added at runtime, so the schema
// may change (e.g. when a batch contains new column(s) not present in previous batches)
- if (complexWriters != null || rsLoader != null) {
+ if (!CollectionUtils.isEmpty(complexWriters) || rsLoader != null) {
return IterOutcome.OK_NEW_SCHEMA;
}
return super.getFinalOutcome(hasMoreRecordInBoundary);
@@ -374,11 +375,11 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
allocationVectors = new ArrayList<>();
- if (complexWriters != null) {
- container.clear();
- } else if (rsLoader != null) {
+ if (rsLoader != null) {
container.clear();
rsLoader.close();
+ } else if (!CollectionUtils.isEmpty(complexWriters)) {
+ container.clear();
} else {
// Release the underlying DrillBufs and reset the ValueVectors to empty
// Not clearing the container here is fine since Project output schema is
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
index 10cca0b648..dd5973c482 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
@@ -434,8 +434,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
throw new IllegalStateException("Unexpected state: " + state);
}
- // Update the visible schema with any pending overflow batch
- // updates
+ // Update the visible schema with any pending overflow batch updates
harvestSchemaVersion = activeSchemaVersion;
pendingRowCount = 0;
batchSizeLimit = (int) Math.min(targetRowCount, options.scanLimit - totalRowCount());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index e5755cb07b..7e9d9d541e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -20,10 +20,14 @@ package org.apache.drill.exec.store.easy.json.loader;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import io.netty.buffer.DrillBuf;
+import org.apache.commons.io.IOUtils;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.exceptions.UserException;
@@ -43,6 +47,7 @@ import org.apache.drill.exec.store.easy.json.parser.TokenIterator.RecoverableJso
import org.apache.drill.exec.store.easy.json.parser.ValueDef;
import org.apache.drill.exec.store.easy.json.parser.ValueDef.JsonType;
import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
+import org.apache.drill.exec.vector.complex.fn.DrillBufInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -189,6 +194,16 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
return this;
}
+ public JsonLoaderBuilder fromStream(int start, int end, DrillBuf buf) {
+ this.streams = Collections.singletonList(DrillBufInputStream.getStream(start, end, buf));
+ return this;
+ }
+
+ public JsonLoaderBuilder fromString(String jsonString) {
+ this.streams = Collections.singletonList(IOUtils.toInputStream(jsonString, Charset.defaultCharset()));
+ return this;
+ }
+
public JsonLoaderBuilder fromReader(Reader reader) {
this.reader = reader;
return this;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/SingleElementIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/SingleElementIterator.java
new file mode 100644
index 0000000000..6073c0e06e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/SingleElementIterator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import java.util.Iterator;
+
+/**
+ * It allows setting the current value in the iterator and can be used once after {@link #next} call
+ *
+ * @param <T> type of the value
+ */
+public class SingleElementIterator<T> implements Iterator<T> {
+ private T value;
+
+ @Override
+ public boolean hasNext() {
+ return value != null;
+ }
+
+ @Override
+ public T next() {
+ T value = this.value;
+ this.value = null;
+ return value;
+ }
+
+ public void setValue(T value) {
+ this.value = value;
+ }
+ }