You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2022/06/15 08:31:08 UTC

[drill] branch master updated: DRILL-8248: Fix http_request for several rows (#2573)

This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new c58d8a080c DRILL-8248: Fix http_request for several rows (#2573)
c58d8a080c is described below

commit c58d8a080ccdf2b1361159de8c792906770be5f8
Author: Vitalii Diravka <vi...@apache.org>
AuthorDate: Wed Jun 15 11:31:01 2022 +0300

    DRILL-8248: Fix http_request for several rows (#2573)
---
 .../exec/store/http/udfs/HttpHelperFunctions.java  | 82 ++++++++++++----------
 .../drill/exec/store/http/udfs/HttpUdfUtils.java   | 56 ++++++++-------
 .../exec/store/http/TestHttpUDFFunctions.java      | 35 ++++++++-
 .../storage-http/src/test/resources/data/p4.json   |  2 +
 .../store/kafka/decoders/JsonMessageReader.java    | 22 +-----
 .../physical/impl/project/ProjectRecordBatch.java  | 19 ++---
 .../resultSet/impl/ResultSetLoaderImpl.java        |  3 +-
 .../store/easy/json/loader/JsonLoaderImpl.java     | 15 ++++
 .../easy/json/loader/SingleElementIterator.java    | 45 ++++++++++++
 9 files changed, 182 insertions(+), 97 deletions(-)

diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
index 5e73b716ca..a95bc9ff60 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
@@ -52,55 +52,57 @@ public class HttpHelperFunctions {
     OptionManager options;
 
     @Inject
-    ResultSetLoader loader;
+    ResultSetLoader rsLoader;
 
     @Workspace
-    org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
+    org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader;
+
+    @Workspace
+    org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<java.io.InputStream> stream;
 
     @Override
     public void setup() {
-      jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
-        .resultSetLoader(loader)
-        .standardOptions(options);
+      stream = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>();
+      rsLoader.startBatch();
     }
 
     @Override
     public void eval() {
       // Get the URL
       String url = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
-
       // Process Positional Arguments
       java.util.List args = org.apache.drill.exec.store.http.util.SimpleHttp.buildParameterList(inputReaders);
       // If the arg list is null, indicating at least one null arg, return an empty map
       // as an approximation of null-if-null handling.
       if (args == null) {
-        // Return empty map
         return;
       }
-
       String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args);
-
       // Make the API call
       java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.getRequestAndStreamResponse(finalUrl);
-
       // If the result string is null or empty, return an empty map
       if (results == null) {
-        // Return empty map
         return;
       }
-
       try {
-        jsonLoaderBuilder.fromStream(results);
-        org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
-        loader.startBatch();
-        jsonLoader.readBatch();
+        stream.setValue(results);
+        if (jsonLoader == null) {
+          jsonLoader = org.apache.drill.exec.store.http.udfs.HttpUdfUtils.createJsonLoader(rsLoader, options, stream);
+        }
+        org.apache.drill.exec.physical.resultSet.RowSetLoader rowWriter = rsLoader.writer();
+        rowWriter.start();
+        if (jsonLoader.parser().next()) {
+          rowWriter.save();
+        }
       } catch (Exception e) {
-        throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
+        throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
+          .message("Error while reading JSON. ")
+          .addContext(e.getMessage())
+          .build();
       }
     }
   }
 
-
   @FunctionTemplate(names = {"http_request", "httpRequest"},
     scope = FunctionTemplate.FunctionScope.SIMPLE,
     isVarArg = true)
@@ -122,10 +124,10 @@ public class HttpHelperFunctions {
     DrillbitContext drillbitContext;
 
     @Inject
-    ResultSetLoader loader;
+    ResultSetLoader rsLoader;
 
     @Workspace
-    org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
+    org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader;
 
     @Workspace
     org.apache.drill.exec.store.http.HttpStoragePlugin plugin;
@@ -133,6 +135,9 @@ public class HttpHelperFunctions {
     @Workspace
     org.apache.drill.exec.store.http.HttpApiConfig endpointConfig;
 
+    @Workspace
+    org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<java.io.InputStream> stream;
+
     @Override
     public void setup() {
       String schemaPath = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
@@ -154,10 +159,9 @@ public class HttpHelperFunctions {
         endpointName,
         plugin.getConfig()
       );
-
+      stream = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>();
       // Add JSON configuration from Storage plugin, if present.
-      jsonLoaderBuilder = org.apache.drill.exec.store.http.udfs.HttpUdfUtils.setupJsonBuilder(endpointConfig, loader, options);
-
+      rsLoader.startBatch();
     }
 
     @Override
@@ -167,30 +171,30 @@ public class HttpHelperFunctions {
       // If the arg list is null, indicating at least one null arg, return an empty map
       // as an approximation of null-if-null handling.
       if (args == null) {
-        // Return empty map
         return;
       }
-
-      java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(
-        plugin,
-        endpointConfig,
-        drillbitContext,
-        args
-      ).getInputStream();
-
+      java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(plugin, endpointConfig, drillbitContext, args)
+        .getInputStream();
       // If the result string is null or empty, return an empty map
       if (results == null) {
-        // Return empty map
         return;
       }
-
       try {
-        jsonLoaderBuilder.fromStream(results);
-        org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
-        loader.startBatch();
-        jsonLoader.readBatch();
+        stream.setValue(results);
+        if (jsonLoader == null) {
+          // Add JSON configuration from Storage plugin, if present.
+          jsonLoader = org.apache.drill.exec.store.http.udfs.HttpUdfUtils.createJsonLoader(endpointConfig, rsLoader, options, stream);
+        }
+        org.apache.drill.exec.physical.resultSet.RowSetLoader rowWriter = rsLoader.writer();
+        rowWriter.start();
+        if (jsonLoader.parser().next()) {
+          rowWriter.save();
+        }
       } catch (Exception e) {
-        throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
+        throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
+          .message("Error while reading JSON. ")
+          .addContext(e.getMessage())
+          .build();
       }
     }
   }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java
index 93f078f7e2..69dc859a35 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java
@@ -21,42 +21,50 @@ package org.apache.drill.exec.store.http.udfs;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
-import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
+import org.apache.drill.exec.store.easy.json.loader.SingleElementIterator;
 import org.apache.drill.exec.store.http.HttpApiConfig;
-import org.apache.drill.exec.store.http.HttpJsonOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.InputStream;
+
 public class HttpUdfUtils {
 
   private static final Logger logger = LoggerFactory.getLogger(HttpUdfUtils.class);
 
-  public static JsonLoaderBuilder setupJsonBuilder(HttpApiConfig endpointConfig, ResultSetLoader loader, OptionManager options) {
-    loader.setTargetRowCount(1);
+  public static JsonLoaderImpl createJsonLoader(ResultSetLoader rsLoader,
+                                                OptionManager options,
+                                                SingleElementIterator<InputStream> stream) {
+    return createJsonLoader(null, rsLoader, options, stream);
+  }
+  public static JsonLoaderImpl createJsonLoader(HttpApiConfig endpointConfig, ResultSetLoader rsLoader,
+                                                OptionManager options, SingleElementIterator<InputStream> stream) {
     // Add JSON configuration from Storage plugin, if present.
-    HttpJsonOptions jsonOptions = endpointConfig.jsonOptions();
-    JsonLoaderBuilder jsonLoaderBuilder = new JsonLoaderBuilder()
-      .resultSetLoader(loader)
-      .maxRows(1)
-      .standardOptions(options);
-
+    org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder =
+      new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
+      .resultSetLoader(rsLoader)
+      .standardOptions(options)
+      .fromStream(() -> stream);
     // Add data path if present
-    if (StringUtils.isNotEmpty(endpointConfig.dataPath())) {
-      jsonLoaderBuilder.dataPath(endpointConfig.dataPath());
-    }
-
-    if (jsonOptions != null) {
-      // Add options from endpoint configuration to jsonLoader
-      JsonLoaderOptions jsonLoaderOptions = jsonOptions.getJsonOptions(options);
-      jsonLoaderBuilder.options(jsonLoaderOptions);
+    if (endpointConfig != null) {
+      if (StringUtils.isNotEmpty(endpointConfig.dataPath())) {
+        jsonLoaderBuilder.dataPath(endpointConfig.dataPath());
+      }
+      // Add JSON configuration from Storage plugin, if present.
+      org.apache.drill.exec.store.http.HttpJsonOptions jsonOptions = endpointConfig.jsonOptions();
+      if (jsonOptions != null) {
+        // Add options from endpoint configuration to jsonLoader
+        org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions jsonLoaderOptions = jsonOptions.getJsonOptions(options);
+        jsonLoaderBuilder.options(jsonLoaderOptions);
 
-      // Add provided schema if present
-      if (jsonOptions.schema() != null) {
-        logger.debug("Found schema: {}", jsonOptions.schema());
-        jsonLoaderBuilder.providedSchema(jsonOptions.schema());
+        // Add provided schema if present
+        if (jsonOptions.schema() != null) {
+          logger.debug("Found schema: {}", jsonOptions.schema());
+          jsonLoaderBuilder.providedSchema(jsonOptions.schema());
+        }
       }
     }
-    return jsonLoaderBuilder;
+    return (org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl) jsonLoaderBuilder.build();
   }
 }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
index 2cbe8d4b85..c5901b9087 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
+import org.apache.drill.exec.store.http.udfs.HttpUdfUtils;
 import org.apache.drill.exec.store.http.util.SimpleHttp;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
@@ -68,7 +69,7 @@ public class TestHttpUDFFunctions extends ClusterTest {
   private static String TEST_JSON_PAGE1;
   private static String DUMMY_URL = "http://localhost:" + MOCK_SERVER_PORT;
   protected static LogFixture logFixture;
-  private final static Level CURRENT_LOG_LEVEL = Level.INFO;
+  private final static Level CURRENT_LOG_LEVEL = Level.DEBUG;
 
   @BeforeClass
   public static void setup() throws Exception {
@@ -79,6 +80,7 @@ public class TestHttpUDFFunctions extends ClusterTest {
       .logger(JsonLoaderImpl.class, CURRENT_LOG_LEVEL)
       .logger(IteratorValidatorBatchIterator.class, CURRENT_LOG_LEVEL)
       .logger(ResultSetLoaderImpl.class, CURRENT_LOG_LEVEL)
+      .logger(HttpUdfUtils.class, CURRENT_LOG_LEVEL)
       .build();
     startCluster(ClusterFixture.builder(dirTestWatcher));
     TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/simple.json"), Charsets.UTF_8).read();
@@ -115,7 +117,7 @@ public class TestHttpUDFFunctions extends ClusterTest {
     configs.put("basicJson", basicJson);
 
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
-      new HttpStoragePluginConfig(false, configs, 2, "globaluser", "globalpass", "",
+      new HttpStoragePluginConfig(false, configs, 200, "globaluser", "globalpass", "",
         80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
         UsernamePasswordCredentials.USERNAME, "globaluser",
         UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
@@ -130,6 +132,8 @@ public class TestHttpUDFFunctions extends ClusterTest {
       server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1));
       RowSet results = client.queryBuilder().sql(sql).rowSet();
 
+      assertEquals(1, results.rowCount());
+
       TupleMetadata expectedSchema = new SchemaBuilder()
         .addMap("data")
           .addNullable("col_1", MinorType.FLOAT8)
@@ -147,6 +151,33 @@ public class TestHttpUDFFunctions extends ClusterTest {
     }
   }
 
+  @Test
+  public void testSeveralRowsAndRequests() throws Exception {
+    String sql = "SELECT http_request('local.basicJson', `col1`) as data FROM cp.`/data/p4.json`";
+    try (MockWebServer server = startServer()) {
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1));
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      assertEquals(2, results.rowCount());
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .addMap("data")
+          .addNullable("col_1", MinorType.FLOAT8)
+          .addNullable("col_2", MinorType.FLOAT8)
+          .addNullable("col_3", MinorType.FLOAT8)
+        .resumeSchema()
+        .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow(singleMap(mapValue(1.0, 2.0, 3.0)))
+        .addRow(singleMap(mapValue(4.0, 5.0, 6.0)))
+        .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
   @Test
   public void testHttpGetWithNoParams() throws Exception {
     try (MockWebServer server = startServer()) {
diff --git a/contrib/storage-http/src/test/resources/data/p4.json b/contrib/storage-http/src/test/resources/data/p4.json
new file mode 100644
index 0000000000..43a890b171
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/p4.json
@@ -0,0 +1,2 @@
+{"col1": "apache"}
+{"col1": "ddr"}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
index a9aee5a990..ab135ade82 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
+import org.apache.drill.exec.store.easy.json.loader.SingleElementIterator;
 import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
 import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
 import org.apache.drill.exec.store.kafka.MetaDataField;
@@ -38,7 +39,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
-import java.util.Iterator;
 import java.util.Properties;
 import java.util.StringJoiner;
 
@@ -156,24 +156,4 @@ public class JsonMessageReader implements MessageReader {
         .add("resultSetLoader=" + resultSetLoader)
         .toString();
   }
-
-  public static class SingleElementIterator<T> implements Iterator<T> {
-    private T value;
-
-    @Override
-    public boolean hasNext() {
-      return value != null;
-    }
-
-    @Override
-    public T next() {
-      T value = this.value;
-      this.value = null;
-      return value;
-    }
-
-    public void setValue(T value) {
-      this.value = value;
-    }
-  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 89e1beb6ea..a23227a275 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.types.TypeProtos;
@@ -110,7 +111,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     memoryManager.update();
 
     if (first && incomingRecordCount == 0) {
-      if (complexWriters != null || rsLoader != null ) {
+      if (!CollectionUtils.isEmpty(complexWriters) || rsLoader != null ) {
         IterOutcome next = null;
         while (incomingRecordCount == 0) {
           if (getLastKnownOutcome() == EMIT) {
@@ -145,7 +146,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       }
     }
 
-    if ((complexWriters != null || rsLoader != null) && getLastKnownOutcome() == EMIT) {
+    if ((!CollectionUtils.isEmpty(complexWriters) || rsLoader != null) && getLastKnownOutcome() == EMIT) {
       throw UserException.unsupportedError()
           .message("Currently functions producing complex types as output are not " +
             "supported in project list for subquery between LATERAL and UNNEST. Please re-write the query using this " +
@@ -177,7 +178,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     }
     // In case of complex writer expression, vectors would be added to batch run-time.
     // We have to re-build the schema.
-    if (rsLoader != null && !rsLoader.isProjectionEmpty()) {
+    if (rsLoader != null) {
       MapVector map = container.addOrGet(container.getLast().getField().getName(), Types.required(TypeProtos.MinorType.MAP), MapVector.class);
       map.setMapValueCount(recordCount);
       for (VectorWrapper<?> vectorWrapper : rsLoader.harvest()) {
@@ -185,7 +186,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         map.putChild(valueVector.getField().getName(), valueVector);
       }
       container.buildSchema(SelectionVectorMode.NONE);
-    } else if (complexWriters != null) {
+    } else if (!CollectionUtils.isEmpty(complexWriters)) {
       container.buildSchema(SelectionVectorMode.NONE);
     }
 
@@ -224,7 +225,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     }
     // In case of complex writer expression, vectors would be added to batch run-time.
     // We have to re-build the schema.
-    if (complexWriters != null || rsLoader != null) {
+    if (!CollectionUtils.isEmpty(complexWriters) || rsLoader != null) {
       container.buildSchema(SelectionVectorMode.NONE);
     }
 
@@ -358,7 +359,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   protected IterOutcome getFinalOutcome(boolean hasMoreRecordInBoundary) {
     // In a case of complex writers vectors are added at runtime, so the schema
     // may change (e.g. when a batch contains new column(s) not present in previous batches)
-    if (complexWriters != null || rsLoader != null) {
+    if (!CollectionUtils.isEmpty(complexWriters) || rsLoader != null) {
       return IterOutcome.OK_NEW_SCHEMA;
     }
     return super.getFinalOutcome(hasMoreRecordInBoundary);
@@ -374,11 +375,11 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     }
     allocationVectors = new ArrayList<>();
 
-    if (complexWriters != null) {
-      container.clear();
-    } else if (rsLoader != null) {
+    if (rsLoader != null) {
       container.clear();
       rsLoader.close();
+    } else if (!CollectionUtils.isEmpty(complexWriters)) {
+      container.clear();
     } else {
       // Release the underlying DrillBufs and reset the ValueVectors to empty
       // Not clearing the container here is fine since Project output schema is
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
index 10cca0b648..dd5973c482 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
@@ -434,8 +434,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
         throw new IllegalStateException("Unexpected state: " + state);
     }
 
-    // Update the visible schema with any pending overflow batch
-    // updates
+    // Update the visible schema with any pending overflow batch updates
     harvestSchemaVersion = activeSchemaVersion;
     pendingRowCount = 0;
     batchSizeLimit = (int) Math.min(targetRowCount, options.scanLimit - totalRowCount());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index e5755cb07b..7e9d9d541e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -20,10 +20,14 @@ package org.apache.drill.exec.store.easy.json.loader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
+import io.netty.buffer.DrillBuf;
+import org.apache.commons.io.IOUtils;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.exceptions.UserException;
@@ -43,6 +47,7 @@ import org.apache.drill.exec.store.easy.json.parser.TokenIterator.RecoverableJso
 import org.apache.drill.exec.store.easy.json.parser.ValueDef;
 import org.apache.drill.exec.store.easy.json.parser.ValueDef.JsonType;
 import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
+import org.apache.drill.exec.vector.complex.fn.DrillBufInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -189,6 +194,16 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
       return this;
     }
 
+    public JsonLoaderBuilder fromStream(int start, int end, DrillBuf buf) {
+      this.streams = Collections.singletonList(DrillBufInputStream.getStream(start, end, buf));
+      return this;
+    }
+
+    public JsonLoaderBuilder fromString(String jsonString) {
+      this.streams = Collections.singletonList(IOUtils.toInputStream(jsonString, Charset.defaultCharset()));
+      return this;
+    }
+
     public JsonLoaderBuilder fromReader(Reader reader) {
       this.reader = reader;
       return this;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/SingleElementIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/SingleElementIterator.java
new file mode 100644
index 0000000000..6073c0e06e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/SingleElementIterator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import java.util.Iterator;
+
+/**
+ * It allows setting the current value in the iterator and can be used once after {@link #next} call
+ *
+ * @param <T> type of the value
+ */
+public class SingleElementIterator<T> implements Iterator<T> {
+    private T value;
+
+    @Override
+    public boolean hasNext() {
+      return value != null;
+    }
+
+    @Override
+    public T next() {
+      T value = this.value;
+      this.value = null;
+      return value;
+    }
+
+    public void setValue(T value) {
+      this.value = value;
+    }
+  }