You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/06/07 15:41:47 UTC

[drill] branch master updated: DRILL-8242: Fix output for HttpHelperFunctions (#2568)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 570ce63088 DRILL-8242: Fix output for HttpHelperFunctions (#2568)
570ce63088 is described below

commit 570ce63088ab4e2b479b53a9b41696adbb0c9278
Author: Vitalii Diravka <vi...@apache.org>
AuthorDate: Tue Jun 7 18:41:41 2022 +0300

    DRILL-8242: Fix output for HttpHelperFunctions (#2568)
    
    * DRILL-8242: Fix output for HttpHelperFunctions
    
    - add test to check the http_get function output
    
    * DRILL-8242: Fix output for HttpHelperFunctions
    
    * DRILL-8242: Fix old approach for other functions
    
    * DRILL-8242: replace String with InputStream for SimpleHTTP requests
    
    * Fix TestHttpPlugin#testSlowResponse. Sometimes DATA_READ ERROR: Read timed out can be returned
---
 .../exec/store/http/udfs/HttpHelperFunctions.java  | 32 ++++-----------
 .../drill/exec/store/http/util/SimpleHttp.java     | 40 ++++++++++++------
 .../drill/exec/store/http/TestHttpPlugin.java      |  3 +-
 .../exec/store/http/TestHttpUDFFunctions.java      | 38 +++++++++++++++++
 .../exec/expr/fn/DrillComplexWriterFuncHolder.java | 34 ++++++++++-----
 .../apache/drill/exec/ops/BufferManagerImpl.java   |  9 ++--
 .../physical/impl/project/ProjectBatchBuilder.java |  4 +-
 .../impl/project/ProjectMemoryManager.java         |  2 +-
 .../physical/impl/project/ProjectRecordBatch.java  | 48 +++++++++++++++-------
 .../impl/project/ProjectionMaterializer.java       |  3 +-
 .../exec/physical/impl/project/Projector.java      |  8 ++--
 .../physical/impl/project/ProjectorTemplate.java   |  3 +-
 .../physical/resultSet/impl/ColumnBuilder.java     |  6 +--
 .../resultSet/impl/ResultSetLoaderImpl.java        |  3 +-
 .../drill/exec/record/AbstractRecordBatch.java     | 14 +------
 .../org/apache/drill/exec/record/BatchSchema.java  |  3 +-
 .../store/easy/json/loader/JsonLoaderImpl.java     | 12 ------
 .../vector/complex/impl/VectorContainerWriter.java |  4 ++
 .../rest/spnego/TestDrillSpnegoAuthenticator.java  |  1 +
 .../vector/complex/impl/ComplexWriterImpl.java     | 14 +++----
 20 files changed, 165 insertions(+), 116 deletions(-)

diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
index c669677f70..1ae05a501f 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
@@ -45,7 +45,7 @@ public class HttpHelperFunctions {
     @Param
     NullableVarCharHolder[] inputReaders;
 
-    @Output
+    @Output // todo: remove. Not used in this UDF
     ComplexWriter writer;
 
     @Inject
@@ -75,32 +75,25 @@ public class HttpHelperFunctions {
       // as an approximation of null-if-null handling.
       if (args == null) {
         // Return empty map
-        org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
-        mapWriter.start();
-        mapWriter.end();
         return;
       }
 
       String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args);
 
       // Make the API call
-      String results = org.apache.drill.exec.store.http.util.SimpleHttp.makeSimpleGetRequest(finalUrl);
+      java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.getRequestAndStreamResponse(finalUrl);
 
       // If the result string is null or empty, return an empty map
-      if (results == null || results.length() == 0) {
+      if (results == null) {
         // Return empty map
-        org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
-        mapWriter.start();
-        mapWriter.end();
         return;
       }
 
       try {
-        jsonLoaderBuilder.fromString(results);
+        jsonLoaderBuilder.fromStream(results);
         org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
         loader.startBatch();
         jsonLoader.readBatch();
-        loader.close();
       } catch (Exception e) {
         throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
       }
@@ -119,7 +112,7 @@ public class HttpHelperFunctions {
     @Param
     NullableVarCharHolder[] inputReaders;
 
-    @Output
+    @Output // todo: remove. Not used in this UDF
     ComplexWriter writer;
 
     @Inject
@@ -174,34 +167,27 @@ public class HttpHelperFunctions {
       // as an approximation of null-if-null handling.
       if (args == null) {
         // Return empty map
-        org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
-        mapWriter.start();
-        mapWriter.end();
         return;
       }
 
-      String results = org.apache.drill.exec.store.http.util.SimpleHttp.makeAPICall(
+      java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(
         plugin,
         endpointConfig,
         drillbitContext,
         args
-      );
+      ).getInputStream();
 
       // If the result string is null or empty, return an empty map
-      if (results == null || results.length() == 0) {
+      if (results == null) {
         // Return empty map
-        org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
-        mapWriter.start();
-        mapWriter.end();
         return;
       }
 
       try {
-        jsonLoaderBuilder.fromString(results);
+        jsonLoaderBuilder.fromStream(results);
         org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
         loader.startBatch();
         jsonLoader.readBatch();
-        loader.close();
       } catch (Exception e) {
         throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
       }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 0ec18de858..e5f0ce70d5 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -30,6 +30,7 @@ import okhttp3.Request;
 import okhttp3.RequestBody;
 import okhttp3.Response;
 
+import okhttp3.ResponseBody;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.logical.OAuthConfig;
@@ -873,7 +874,7 @@ public class SimpleHttp {
    * @param args An optional list of parameter arguments which will be included in the URL
    * @return A String of the results.
    */
-  public static String makeAPICall(
+  public static SimpleHttp apiCall(
     HttpStoragePlugin plugin,
     HttpApiConfig endpointConfig,
     DrillbitContext context,
@@ -895,7 +896,7 @@ public class SimpleHttp {
     }
 
     // Now get the client
-    SimpleHttp client = new SimpleHttpBuilder()
+    return new SimpleHttpBuilder()
       .pluginConfig(pluginConfig)
       .endpointConfig(endpointConfig)
       .tempDir(new File(context.getConfig().getString(ExecConstants.DRILL_TMP_DIR)))
@@ -903,8 +904,6 @@ public class SimpleHttp {
       .proxyConfig(proxyConfig)
       .tokenTable(plugin.getTokenTable())
       .build();
-
-    return client.getResultsFromApiCall();
   }
 
   public static OkHttpClient getSimpleHttpClient() {
@@ -915,7 +914,29 @@ public class SimpleHttp {
       .build();
   }
 
-  public static String makeSimpleGetRequest(String url) {
+  public static String getRequestAndStringResponse(String url) {
+    try {
+      return makeSimpleGetRequest(url).string();
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("HTTP request failed")
+        .build(logger);
+    }
+  }
+
+  public static InputStream getRequestAndStreamResponse(String url) {
+    try {
+      return makeSimpleGetRequest(url).byteStream();
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("HTTP request failed")
+        .build(logger);
+    }
+  }
+
+  public static ResponseBody makeSimpleGetRequest(String url) throws IOException {
     OkHttpClient client = getSimpleHttpClient();
     Request.Builder requestBuilder = new Request.Builder()
       .url(url);
@@ -924,15 +945,8 @@ public class SimpleHttp {
     Request request = requestBuilder.build();
 
     // Execute the request
-    try {
       Response response = client.newCall(request).execute();
-      return response.body().string();
-    } catch (IOException e) {
-      throw UserException
-        .dataReadError(e)
-        .message("HTTP request failed")
-        .build(logger);
-    }
+      return response.body();
   }
 
   /**
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index 458003f5f3..928975f17a 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -1127,7 +1127,8 @@ public class TestHttpPlugin extends ClusterTest {
         client.queryBuilder().sql(sql).rowSet();
         fail();
       } catch (Exception e) {
-         assertTrue("Not timeout exception, " + e, e.getMessage().contains("DATA_READ ERROR: timeout"));
+         assertTrue("Not timeout exception, " + e,
+           e.getMessage().contains("DATA_READ ERROR: timeout") || e.getMessage().contains("DATA_READ ERROR: Read timed out"));
       }
     }
   }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
index 24d73baa02..fb30bef5aa 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
@@ -24,10 +24,16 @@ import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
 import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.physical.impl.project.ProjectMemoryManager;
 import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
 import org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl;
 import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
 import org.apache.drill.exec.store.http.util.SimpleHttp;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
@@ -37,6 +43,7 @@ import org.apache.drill.shaded.guava.com.google.common.io.Files;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.LogFixture;
+import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -47,6 +54,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import static org.apache.drill.test.rowSet.RowSetUtilities.singleMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -63,9 +72,11 @@ public class TestHttpUDFFunctions extends ClusterTest {
   public static void setup() throws Exception {
     logFixture = LogFixture.builder()
       .toConsole()
+      .logger(ProjectMemoryManager.class, CURRENT_LOG_LEVEL)
       .logger(ProjectRecordBatch.class, CURRENT_LOG_LEVEL)
       .logger(JsonLoaderImpl.class, CURRENT_LOG_LEVEL)
       .logger(IteratorValidatorBatchIterator.class, CURRENT_LOG_LEVEL)
+      .logger(ResultSetLoaderImpl.class, CURRENT_LOG_LEVEL)
       .build();
     startCluster(ClusterFixture.builder(dirTestWatcher));
     TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/simple.json"), Charsets.UTF_8).read();
@@ -98,6 +109,33 @@ public class TestHttpUDFFunctions extends ClusterTest {
 
       RowSet results = client.queryBuilder().sql(sql).rowSet();
       assertEquals(1, results.rowCount());
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .addMap("result")
+          .addMap("results")
+            .addNullable("sunrise", TypeProtos.MinorType.VARCHAR)
+            .addNullable("sunset", TypeProtos.MinorType.VARCHAR)
+            .addNullable("solar_noon", TypeProtos.MinorType.VARCHAR)
+            .addNullable("day_length", TypeProtos.MinorType.VARCHAR)
+            .addNullable("civil_twilight_begin", TypeProtos.MinorType.VARCHAR)
+            .addNullable("civil_twilight_end", TypeProtos.MinorType.VARCHAR)
+            .addNullable("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR)
+            .addNullable("nautical_twilight_end", TypeProtos.MinorType.VARCHAR)
+            .addNullable("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR)
+            .addNullable("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR)
+            .resumeMap()
+          .addNullable("status", TypeProtos.MinorType.VARCHAR)
+          .resumeSchema()
+        .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow(singleMap(
+                  mapValue(
+                    mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM",
+          "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"),
+                    "OK")))
+        .build();
+
+      RowSetUtilities.verify(expected, results);
       results.clear();
 
       RecordedRequest recordedRequest = server.takeRequest();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java
index 92fb340128..c95415ef81 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
 import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.record.VectorAccessibleComplexWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
@@ -56,26 +57,39 @@ public class DrillComplexWriterFuncHolder extends DrillSimpleFuncHolder {
     JBlock sub = new JBlock(true, true);
     JBlock topSub = sub;
 
-    JVar complexWriter = classGenerator.declareClassField("complexWriter", classGenerator.getModel()._ref(ComplexWriter.class));
+    JVar rsLoader = null;
+    JVar complexWriter = null;
+    JInvocation container = null;
 
+    for (JVar workspaceJVar : workspaceJVars) {
+      if ("ResultSetLoader".equals(workspaceJVar.type().name())) {
+        rsLoader = workspaceJVar;
+      }
+    }
+    if (rsLoader == null) {
+      complexWriter = classGenerator.declareClassField("complexWriter", classGenerator.getModel()._ref(ComplexWriter.class));
 
-    JInvocation container = classGenerator.getMappingSet().getOutgoing().invoke("getOutgoingContainer");
 
+      container = classGenerator.getMappingSet().getOutgoing().invoke("getOutgoingContainer");
+    }
     //Default name is "col", if not passed in a reference name for the output vector.
     String refName = fieldReference == null ? "col" : fieldReference.getRootSegment().getPath();
 
     JClass cwClass = classGenerator.getModel().ref(VectorAccessibleComplexWriter.class);
-    classGenerator.getSetupBlock().assign(complexWriter, cwClass.staticInvoke("getWriter").arg(refName).arg(container));
+    if (rsLoader == null) {
+      classGenerator.getSetupBlock().assign(complexWriter, cwClass.staticInvoke("getWriter").arg(refName).arg(container));
+    }
 
     JClass projBatchClass = classGenerator.getModel().ref(ProjectRecordBatch.class);
     JExpression projBatch = JExpr.cast(projBatchClass, classGenerator.getMappingSet().getOutgoing());
-
-    classGenerator.getSetupBlock().add(projBatch.invoke("addComplexWriter").arg(complexWriter));
-
-
-    classGenerator.getEvalBlock().add(complexWriter.invoke("setPosition").arg(classGenerator.getMappingSet().getValueWriteIndex()));
-
-    sub.decl(classGenerator.getModel()._ref(ComplexWriter.class), getReturnValue().getName(), complexWriter);
+    if (rsLoader == null) {
+      classGenerator.getSetupBlock().add(projBatch.invoke("addComplexWriter").arg(complexWriter));
+      classGenerator.getEvalBlock().add(complexWriter.invoke("setPosition").arg(classGenerator.getMappingSet().getValueWriteIndex()));
+      sub.decl(classGenerator.getModel()._ref(ComplexWriter.class), getReturnValue().getName(), complexWriter);
+    } else {
+      classGenerator.getSetupBlock().add(projBatch.invoke("addLoader").arg(rsLoader));
+      sub.decl(classGenerator.getModel()._ref(ResultSetLoader.class), getReturnValue().getName(), rsLoader);
+    }
 
     // add the subblock after the out declaration.
     classGenerator.getEvalBlock().add(topSub);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
index c42e7de93a..ea516c6b5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
@@ -34,12 +34,9 @@ public class BufferManagerImpl implements BufferManager {
 
   @Override
   public void close() {
-    managedBuffers.forEach(new LongObjectPredicate<DrillBuf>() {
-      @Override
-      public boolean apply(long key, DrillBuf value) {
-        value.release();
-        return true;
-      }
+    managedBuffers.forEach((LongObjectPredicate<DrillBuf>) (key, value) -> {
+      value.release();
+      return true;
     });
     managedBuffers.clear();
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java
index efb40d4c49..d4a7e69d19 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java
@@ -95,7 +95,9 @@ public class ProjectBatchBuilder implements ProjectionMaterializer.BatchBuilder
 
   @Override
   public void addComplexField(FieldReference ref) {
-    initComplexWriters();
+    if (projectBatch.rsLoader == null) {
+      initComplexWriters();
+    }
     if (projectBatch.complexFieldReferencesList == null) {
       projectBatch.complexFieldReferencesList = Lists.newArrayList();
     } else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
index 571871b9fc..1391d64480 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
@@ -278,7 +278,7 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
                 + ", total variable width {}, total complex width {}, batchSizer time {} ms, update time {}  ms"
                 + ", manager {}, incoming {}",outPutRowCount, batchSizer.rowCount(), incomingBatch.getRecordCount(),
                 rowWidth, totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth,
-                (batchSizerEndTime - updateStartTime),(updateEndTime - updateStartTime), this, incomingBatch);
+                  (batchSizerEndTime - updateStartTime),(updateEndTime - updateStartTime), this, incomingBatch);
 
     RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), outgoingBatch.getRecordBatchStatsContext());
     updateIncomingStats();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 60f8a1edf4..89e1beb6ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -19,20 +19,25 @@ package org.apache.drill.exec.physical.impl.project;
 
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SimpleRecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.util.record.RecordBatchStats;
 import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +51,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   private static final Logger logger = LoggerFactory.getLogger(ProjectRecordBatch.class);
 
   protected List<ValueVector> allocationVectors;
+  @Deprecated // use new writer rsLoader
   protected List<ComplexWriter> complexWriters;
+  protected ResultSetLoader rsLoader;
   protected List<FieldReference> complexFieldReferencesList;
   protected ProjectMemoryManager memoryManager;
   private Projector projector;
@@ -103,7 +110,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     memoryManager.update();
 
     if (first && incomingRecordCount == 0) {
-      if (complexWriters != null) {
+      if (complexWriters != null || rsLoader != null ) {
         IterOutcome next = null;
         while (incomingRecordCount == 0) {
           if (getLastKnownOutcome() == EMIT) {
@@ -138,7 +145,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       }
     }
 
-    if (complexWriters != null && getLastKnownOutcome() == EMIT) {
+    if ((complexWriters != null || rsLoader != null) && getLastKnownOutcome() == EMIT) {
       throw UserException.unsupportedError()
           .message("Currently functions producing complex types as output are not " +
             "supported in project list for subquery between LATERAL and UNNEST. Please re-write the query using this " +
@@ -170,13 +177,20 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     }
     // In case of complex writer expression, vectors would be added to batch run-time.
     // We have to re-build the schema.
-    if (complexWriters != null) {
+    if (rsLoader != null && !rsLoader.isProjectionEmpty()) {
+      MapVector map = container.addOrGet(container.getLast().getField().getName(), Types.required(TypeProtos.MinorType.MAP), MapVector.class);
+      map.setMapValueCount(recordCount);
+      for (VectorWrapper<?> vectorWrapper : rsLoader.harvest()) {
+        ValueVector valueVector = vectorWrapper.getValueVector();
+        map.putChild(valueVector.getField().getName(), valueVector);
+      }
+      container.buildSchema(SelectionVectorMode.NONE);
+    } else if (complexWriters != null) {
       container.buildSchema(SelectionVectorMode.NONE);
     }
 
     memoryManager.updateOutgoingStats(outputRecords);
     RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
-
     // Get the final outcome based on hasRemainder since that will determine if all the incoming records were
     // consumed in current output batch or not
     return getFinalOutcome(hasRemainder);
@@ -210,7 +224,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     }
     // In case of complex writer expression, vectors would be added to batch run-time.
     // We have to re-build the schema.
-    if (complexWriters != null) {
+    if (complexWriters != null || rsLoader != null) {
       container.buildSchema(SelectionVectorMode.NONE);
     }
 
@@ -224,6 +238,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     complexWriters.add(writer);
   }
 
+  public void addLoader(ResultSetLoader loader) {
+    rsLoader = loader;
+  }
+
   private void doAlloc(int recordCount) {
     // Allocate vv in the allocationVectors.
     for (ValueVector v : allocationVectors) {
@@ -251,12 +269,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     // the transfer pairs or vector copies.
     container.setRecordCount(count);
 
-    if (complexWriters == null) {
-      return;
-    }
-
-    for (ComplexWriter writer : complexWriters) {
-      writer.setValueCount(count);
+    if (complexWriters != null) {
+      for (ComplexWriter writer : complexWriters) {
+        writer.setValueCount(count);
+      }
+    } else if (rsLoader != null) {
+      rsLoader.setTargetRowCount(count);
     }
   }
 
@@ -276,8 +294,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     setupNewSchema(incomingBatch, configuredBatchSize);
 
-    ProjectBatchBuilder batchBuilder = new ProjectBatchBuilder(this,
-        container, callBack, incomingBatch);
+    ProjectBatchBuilder batchBuilder = new ProjectBatchBuilder(this, container, callBack, incomingBatch);
     ProjectionMaterializer em = new ProjectionMaterializer(context.getOptions(),
         incomingBatch, popConfig.getExprs(), context.getFunctionRegistry(),
         batchBuilder, unionTypeEnabled);
@@ -341,7 +358,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   protected IterOutcome getFinalOutcome(boolean hasMoreRecordInBoundary) {
     // In a case of complex writers vectors are added at runtime, so the schema
     // may change (e.g. when a batch contains new column(s) not present in previous batches)
-    if (complexWriters != null) {
+    if (complexWriters != null || rsLoader != null) {
       return IterOutcome.OK_NEW_SCHEMA;
     }
     return super.getFinalOutcome(hasMoreRecordInBoundary);
@@ -359,6 +376,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
     if (complexWriters != null) {
       container.clear();
+    } else if (rsLoader != null) {
+      container.clear();
+      rsLoader.close();
     } else {
       // Release the underlying DrillBufs and reset the ValueVectors to empty
       // Not clearing the container here is fine since Project output schema is
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java
index ee011b658e..55c630ac23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java
@@ -342,8 +342,7 @@ class ProjectionMaterializer {
     transferFieldIds.add(fid);
   }
 
-  private void setupFnCall(NamedExpression namedExpression,
-      LogicalExpression expr) {
+  private void setupFnCall(NamedExpression namedExpression, LogicalExpression expr) {
 
     // Need to process ComplexWriter function evaluation.
     // The reference name will be passed to ComplexWriter, used as the name of
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
index 5e5e7ebe55..53d27807c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
@@ -27,9 +27,9 @@ import java.util.List;
 
 public interface Projector {
 
-  public abstract void setup(FragmentContext context, RecordBatch incoming,  RecordBatch outgoing, List<TransferPair> transfers)  throws SchemaChangeException;
-  public abstract int projectRecords(RecordBatch incomingBatch, int startIndex, int recordCount, int firstOutputIndex);
+  void setup(FragmentContext context, RecordBatch incoming,  RecordBatch outgoing, List<TransferPair> transfers)  throws SchemaChangeException;
+  int projectRecords(RecordBatch incomingBatch, int startIndex, int recordCount, int firstOutputIndex);
 
-  public static TemplateClassDefinition<Projector> TEMPLATE_DEFINITION = new TemplateClassDefinition<Projector>(Projector.class, ProjectorTemplate.class);
+  TemplateClassDefinition<Projector> TEMPLATE_DEFINITION = new TemplateClassDefinition<Projector>(Projector.class, ProjectorTemplate.class);
 
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index dd30c443f0..9c1aced107 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -88,8 +88,7 @@ public abstract class ProjectorTemplate implements Projector {
 
   @Override
   public final void setup(FragmentContext context, RecordBatch incoming,
-      RecordBatch outgoing, List<TransferPair> transfers)  throws SchemaChangeException{
-
+      RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException {
     this.svMode = incoming.getSchema().getSelectionVectorMode();
     switch (svMode) {
     case FOUR_BYTE:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
index 2731d3f6af..45f4b1f456 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
@@ -253,8 +253,7 @@ public class ColumnBuilder {
       // have content that varies from batch to batch. Only the leaf
       // vectors can be cached.
       assert columnSchema.tupleSchema().isEmpty();
-      mapVector = new RepeatedMapVector(mapColSchema.schema(),
-          parent.loader().allocator(), null);
+      mapVector = new RepeatedMapVector(mapColSchema.schema(), parent.loader().allocator(), null);
       offsetVector = mapVector.getOffsetVector();
     } else {
       mapVector = null;
@@ -262,8 +261,7 @@ public class ColumnBuilder {
     }
 
     // Create the writer using the offset vector
-    final AbstractObjectWriter writer = MapWriter.buildMapArray(
-        columnSchema, mapVector, new ArrayList<>());
+    final AbstractObjectWriter writer = MapWriter.buildMapArray(columnSchema, mapVector, new ArrayList<>());
 
     // Wrap the offset vector in a vector state
     VectorState offsetVectorState;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
index 5ec24ac871..10cca0b648 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
@@ -34,8 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Implementation of the result set loader. Caches vectors
- * for a row or map.
+ * Implementation of the result set loader. Caches vectors for a row or map.
  *
  * @see {@link ResultSetLoader}
  */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 6e363036a2..96e1fb16a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -31,7 +31,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,17 +67,8 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     this.batchStatsContext = new RecordBatchStatsContext(context, oContext);
     stats = oContext.getStats();
     container = new VectorContainer(this.oContext.getAllocator());
-    if (buildSchema) {
-      state = BatchState.BUILD_SCHEMA;
-    } else {
-      state = BatchState.FIRST;
-    }
-    OptionValue option = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE.getOptionName());
-    if (option != null) {
-      unionTypeEnabled = option.bool_val;
-    } else {
-      unionTypeEnabled = false;
-    }
+    state = buildSchema ? BatchState.BUILD_SCHEMA : BatchState.FIRST;
+    unionTypeEnabled = context.getOptions().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
   }
 
   public enum BatchState {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index d4d385c248..fcffeea8aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -247,8 +247,7 @@ public class BatchSchema implements Iterable<MaterializedField> {
         otherSchema.selectionVectorMode != SelectionVectorMode.NONE) {
       throw new IllegalArgumentException("Cannot merge schemas with selection vectors");
     }
-    List<MaterializedField> mergedFields =
-        new ArrayList<>(fields.size() + otherSchema.fields.size());
+    List<MaterializedField> mergedFields = new ArrayList<>(fields.size() + otherSchema.fields.size());
     mergedFields.addAll(this.fields);
     mergedFields.addAll(otherSchema.fields);
     return new BatchSchema(selectionVectorMode, mergedFields);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index a4ee14f075..e5755cb07b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -20,12 +20,10 @@ package org.apache.drill.exec.store.easy.json.loader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.exceptions.UserException;
@@ -191,16 +189,6 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
       return this;
     }
 
-    public JsonLoaderBuilder fromString(String jsonString) {
-      try (InputStream targetStream = IOUtils.toInputStream(jsonString, Charset.defaultCharset())) {
-        return fromStream(targetStream);
-      } catch (IOException e) {
-        throw UserException.dataReadError(e)
-          .message("Could not read JSON string: " + jsonString)
-          .build(logger);
-      }
-    }
-
     public JsonLoaderBuilder fromReader(Reader reader) {
       this.reader = reader;
       return this;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
index 5866a9562d..04e30cf0e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
@@ -26,6 +26,10 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
+/**
+ * @see {@link org.apache.drill.exec.physical.resultSet.ResultSetLoader} - the replacement for this class
+ */
+@Deprecated
 public class VectorContainerWriter extends AbstractFieldWriter implements ComplexWriter {
   //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainerWriter.class);
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestDrillSpnegoAuthenticator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestDrillSpnegoAuthenticator.java
index 914688e9b2..643af83bf2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestDrillSpnegoAuthenticator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestDrillSpnegoAuthenticator.java
@@ -67,6 +67,7 @@ import static org.mockito.Mockito.verify;
 /**
  * Test for validating {@link DrillSpnegoAuthenticator}
  */
+@Ignore("See DRILL-5387")
 @Category(SecurityTest.class)
 public class TestDrillSpnegoAuthenticator extends BaseTest {
 
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
index 0a254adb9f..8a2ffeab02 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
@@ -105,13 +105,13 @@ public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWri
   @Override
   public void setPosition(int index){
     super.setPosition(index);
-    switch(mode){
-    case MAP:
-      mapRoot.setPosition(index);
-      break;
-    case LIST:
-      listRoot.setPosition(index);
-      break;
+    switch (mode) {
+      case MAP:
+        mapRoot.setPosition(index);
+        break;
+      case LIST:
+        listRoot.setPosition(index);
+        break;
     }
   }