You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2022/04/27 09:47:28 UTC

[drill] 02/03: DRILL-8037: Add V2 JSON Format Plugin based on EVF

This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit e98793badd27272316e60ea2a1ea1c230bff7036
Author: Vitalii Diravka <vi...@apache.org>
AuthorDate: Fri Nov 5 04:21:40 2021 +0200

    DRILL-8037: Add V2 JSON Format Plugin based on EVF
    
    * Enable store.json.enable_v2_reader by default
    * Fix TestJsonReader doulbe quotes test cases. Update jackson 2.12.1 -> 2.13.0
    * Disable V2 for experimental UNION datatype
    * Fix regressions
    * Fix json Schema Provision (it wasn't provided for JsonLoaderBuilder). The previous schema provision was a fake, the reader schema was infered from the json content. It fixes the scan and reader schema validation. And it starts to apply the provided schema to ANALYZE COMMANDS, fixed TestMetastoreWithEasyFormatPlugin#testAnalyzeOnJsonTable
---
 .../drill/exec/store/esri/ShpFormatPlugin.java     |   7 +-
 .../drill/exec/store/excel/ExcelFormatPlugin.java  |   6 +-
 .../drill/exec/store/hdf5/HDF5FormatPlugin.java    |   5 +-
 .../exec/store/httpd/HttpdLogFormatPlugin.java     |   1 -
 .../drill/exec/store/image/ImageFormatPlugin.java  |   8 +-
 .../store/pcap/plugin/BasePcapFormatPlugin.java    |   6 +-
 .../drill/exec/store/pdf/PdfFormatPlugin.java      |   7 +-
 .../drill/exec/store/sas/SasFormatPlugin.java      |   7 +-
 .../drill/exec/store/spss/SpssFormatPlugin.java    |   7 +-
 .../exec/store/syslog/SyslogFormatPlugin.java      |   7 +-
 docs/dev/{JUnit.md => JUnit4.md}                   |   0
 docs/dev/Testing.md                                |   2 +-
 .../org/apache/drill/exec/client/DrillClient.java  |   2 +-
 .../physical/impl/common/HashTableTemplate.java    |   4 +-
 .../exec/physical/impl/scan/ScanOperatorExec.java  |   3 +-
 .../impl/scan/file/FileMetadataColumnsParser.java  |  11 +-
 .../scan/project/ExplicitSchemaProjection.java     |  71 ++++----
 .../impl/scan/project/ReaderLevelProjection.java   |   3 +-
 .../scan/project/ReaderSchemaOrchestrator.java     |  11 +-
 .../physical/impl/scan/project/ResolvedTuple.java  |  22 +--
 .../impl/scan/project/ScanLevelProjection.java     |   6 +-
 .../physical/impl/validate/BatchValidator.java     |   4 +-
 .../physical/resultSet/impl/ProjectionFilter.java  |  12 +-
 .../resultSet/impl/ResultSetLoaderImpl.java        |   2 +-
 .../physical/resultSet/impl/SingleVectorState.java |   2 +-
 .../resultSet/project/ProjectionChecker.java       |   5 +-
 .../planner/sql/handlers/CreateTableHandler.java   |   3 +-
 .../exec/record/VectorAccessibleUtilities.java     |   2 +-
 .../org/apache/drill/exec/store/RecordReader.java  |   3 +
 .../exec/store/dfs/WorkspaceSchemaFactory.java     |   6 +-
 .../exec/store/dfs/easy/EasyFormatPlugin.java      |  13 +-
 .../drill/exec/store/dfs/easy/EasyGroupScan.java   |   6 +-
 .../exec/store/dfs/easy/EvfV1ScanBuilder.java      |  10 +-
 .../exec/store/easy/json/JSONFormatConfig.java     | 123 ++++++++++++++
 .../exec/store/easy/json/JSONFormatPlugin.java     | 187 ++++++++-------------
 .../exec/store/easy/json/JSONRecordReader.java     |   6 +-
 .../exec/store/easy/json/JsonBatchReader.java      |   5 +
 .../store/easy/json/loader/BaseFieldFactory.java   |  38 ++---
 .../exec/store/easy/json/loader/FieldDefn.java     |   6 +-
 .../easy/json/loader/InferredFieldFactory.java     |  29 ++--
 .../store/easy/json/loader/JsonLoaderImpl.java     |   3 +-
 .../exec/store/easy/json/loader/TupleParser.java   |   9 +-
 .../easy/json/parser/JsonStructureParser.java      |  35 ++--
 .../store/easy/json/parser/JsonValueParser.java    |   4 +-
 .../store/easy/json/parser/ObjectValueParser.java  |   1 +
 .../store/easy/json/values/DateValueListener.java  |   6 +-
 .../drill/exec/store/log/LogFormatPlugin.java      |   3 +-
 .../drill/exec/vector/complex/fn/JsonReader.java   |  14 +-
 .../drill/exec/vector/complex/fn/VectorOutput.java |  23 ++-
 .../java/org/apache/drill/TestFrameworkTest.java   |  40 +++--
 .../java/org/apache/drill/TestStarQueries.java     |   1 -
 .../drill/exec/expr/fn/impl/TestTypeFns.java       |  54 +++---
 .../TestMetastoreWithEasyFormatPlugin.java         |  63 +------
 .../physical/impl/TopN/TestTopNSchemaChanges.java  | 111 ++++++------
 .../impl/join/TestMergeJoinWithSchemaChanges.java  |  16 +-
 .../impl/lateraljoin/TestE2EUnnestAndLateral.java  |  28 ++-
 .../scan/project/TestReaderLevelProjection.java    |   4 +-
 .../exec/physical/impl/xsort/TestExternalSort.java |  42 +++--
 .../impl/TestResultSetLoaderProtocol.java          |   3 -
 .../drill/exec/server/rest/TestRestJson.java       |   2 +-
 .../drill/exec/sql/TestMetastoreCommands.java      | 165 +++++++++---------
 .../drill/exec/store/DropboxFileSystemTest.java    |   2 +-
 .../drill/exec/store/TestImplicitFileColumns.java  |   4 +-
 .../store/easy/json/loader/TestRepeatedList.java   |   2 +-
 .../exec/store/easy/json/loader/TestUnknowns.java  |   2 +-
 .../json}/TestJsonEscapeAnyChar.java               |  12 +-
 .../drill/exec/store/json/TestJsonModes.java       |   8 +
 .../writer => store/json}/TestJsonNanInf.java      | 102 +++++------
 .../writer => store/json}/TestJsonReader.java      |  59 ++++---
 .../drill/exec/store/json/TestJsonReaderFns.java   |  25 ++-
 .../exec/store/json/TestJsonReaderQueries.java     |  13 +-
 .../exec/store/json/TestJsonReaderWithSchema.java  |  25 +++
 .../exec/store/json/TestJsonRecordReader.java      | 141 +++++++++-------
 .../exec/store/parquet/TestVarlenDecimal.java      |   4 -
 .../vector/complex/writer/TestExtendedTypes.java   |   4 +-
 .../java/org/apache/drill/test/ClusterFixture.java |   4 +-
 .../java/org/apache/drill/test/ClusterTest.java    |   2 +-
 .../java/org/apache/drill/test/TestBuilder.java    |  10 ++
 exec/java-exec/src/test/resources/rest/cust20.json |  44 ++---
 exec/java-exec/src/test/resources/rest/small.json  |  24 +--
 .../record/metadata/AbstractColumnMetadata.java    |   8 +-
 .../drill/exec/record/metadata/ColumnMetadata.java |   1 +
 .../drill/exec/record/metadata/MetadataUtils.java  |  21 ++-
 .../record/metadata/PrimitiveColumnMetadata.java   |  17 ++
 .../apache/drill/exec/vector/NullableVector.java   |   2 +-
 .../accessor/writer/AbstractTupleWriter.java       |   6 +-
 86 files changed, 971 insertions(+), 856 deletions(-)

diff --git a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
index 1d84491058..5023ff61ce 100644
--- a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
+++ b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.esri;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
@@ -27,7 +26,7 @@ import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchem
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.hadoop.conf.Configuration;
@@ -55,12 +54,12 @@ public class ShpFormatPlugin extends EasyFormatPlugin<ShpFormatConfig> {
   }
 
   @Override
-  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) throws ExecutionSetupException {
+  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options) {
     return new ShpBatchReader(scan.getMaxRecords());
   }
 
   @Override
-  protected FileScanFramework.FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
+  protected FileScanFramework.FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
     FileScanFramework.FileScanBuilder builder = new FileScanFramework.FileScanBuilder();
     builder.setReaderFactory(new ShpReaderFactory(scan.getMaxRecords()));
     initScanBuilder(builder, scan);
diff --git a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java
index 063f1f7a96..e37027bb61 100644
--- a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java
+++ b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java
@@ -28,7 +28,7 @@ import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchem
 
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.hadoop.conf.Configuration;
@@ -80,12 +80,12 @@ public class ExcelFormatPlugin extends EasyFormatPlugin<ExcelFormatConfig> {
 
   @Override
   public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
-    EasySubScan scan, OptionManager options) {
+    EasySubScan scan, OptionSet options) {
     return new ExcelBatchReader(formatConfig.getReaderConfig(this), scan.getMaxRecords());
   }
 
   @Override
-  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
+  protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
     FileScanBuilder builder = new FileScanBuilder();
     ExcelReaderConfig readerConfig = new ExcelReaderConfig(this);
 
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
index 5551af50f7..d56ffa462b 100644
--- a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
@@ -26,14 +26,13 @@ import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanB
 
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.hdf5.HDF5BatchReader.HDF5ReaderConfig;
 
 
-
 public class HDF5FormatPlugin extends EasyFormatPlugin<HDF5FormatConfig> {
 
   public static final String DEFAULT_NAME = "hdf5";
@@ -62,7 +61,7 @@ public class HDF5FormatPlugin extends EasyFormatPlugin<HDF5FormatConfig> {
   }
 
   @Override
-  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
+  protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
     FileScanBuilder builder = new FileScanBuilder();
 
     builder.setReaderFactory(new HDF5ReaderFactory(new HDF5BatchReader.HDF5ReaderConfig(this, formatConfig), scan.getMaxRecords()));
diff --git a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
index 5c04fa3247..282859a5e4 100644
--- a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
+++ b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
@@ -29,7 +29,6 @@ import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
 import org.apache.hadoop.conf.Configuration;
 
 public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig> {
diff --git a/contrib/format-image/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java b/contrib/format-image/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java
index 977faecdba..f3bd1ec527 100644
--- a/contrib/format-image/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java
+++ b/contrib/format-image/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java
@@ -26,7 +26,7 @@ import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanB
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
@@ -74,13 +74,13 @@ public class ImageFormatPlugin extends EasyFormatPlugin<ImageFormatConfig> {
   }
 
   @Override
-  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options)
+  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options)
       throws ExecutionSetupException {
     return new ImageBatchReader(formatConfig, scan);
   }
 
   @Override
-  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan)
+  protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options)
       throws ExecutionSetupException {
     FileScanBuilder builder = new FileScanBuilder();
     builder.setReaderFactory(new ImageReaderFactory(formatConfig, scan));
@@ -89,4 +89,4 @@ public class ImageFormatPlugin extends EasyFormatPlugin<ImageFormatConfig> {
     builder.nullType(Types.optional(MinorType.VARCHAR));
     return builder;
   }
-}
\ No newline at end of file
+}
diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
index 836c254ed9..87b7c9562f 100644
--- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
@@ -27,7 +27,7 @@ import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchem
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
@@ -111,7 +111,7 @@ public abstract class BasePcapFormatPlugin<T extends PcapFormatConfig> extends E
   }
 
   @Override
-  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) {
+  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options) {
     return createReader(scan, formatConfig);
   }
 
@@ -125,7 +125,7 @@ public abstract class BasePcapFormatPlugin<T extends PcapFormatConfig> extends E
   }
 
   @Override
-  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
+  protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
     FileScanBuilder builder = new FileScanBuilder();
     builder.setReaderFactory(new PcapReaderFactory(formatConfig, scan));
 
diff --git a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java
index 53fb4870bc..380653d66d 100644
--- a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java
+++ b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java
@@ -26,7 +26,7 @@ import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanB
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
@@ -74,13 +74,12 @@ public class PdfFormatPlugin extends EasyFormatPlugin<PdfFormatConfig> {
   }
 
   @Override
-  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
-    EasySubScan scan, OptionManager options) {
+  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options) {
     return new PdfBatchReader(formatConfig.getReaderConfig(this), scan.getMaxRecords());
   }
 
   @Override
-  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
+  protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
     FileScanBuilder builder = new FileScanBuilder();
     PdfBatchReader.PdfReaderConfig readerConfig = new PdfBatchReader.PdfReaderConfig(this);
     builder.setReaderFactory(new PdfReaderFactory(readerConfig, scan.getMaxRecords()));
diff --git a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
index da8bcbc607..b5a135d482 100644
--- a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
+++ b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
@@ -27,7 +27,7 @@ import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchem
 
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
@@ -75,13 +75,12 @@ public class SasFormatPlugin extends EasyFormatPlugin<SasFormatConfig> {
   }
 
   @Override
-  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
-    EasySubScan scan, OptionManager options)  {
+  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options)  {
     return new SasBatchReader(scan.getMaxRecords());
   }
 
   @Override
-  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
+  protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
     FileScanBuilder builder = new FileScanBuilder();
     builder.setReaderFactory(new SasReaderFactory(scan.getMaxRecords()));
 
diff --git a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
index e62699a273..35210e7928 100644
--- a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
+++ b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
@@ -27,7 +27,7 @@ import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchem
 
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
@@ -74,13 +74,12 @@ public class SpssFormatPlugin extends EasyFormatPlugin<SpssFormatConfig> {
   }
 
   @Override
-  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
-    EasySubScan scan, OptionManager options)  {
+  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options)  {
     return new SpssBatchReader(scan.getMaxRecords());
   }
 
   @Override
-  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
+  protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
     FileScanBuilder builder = new FileScanBuilder();
     builder.setReaderFactory(new SpssReaderFactory(scan.getMaxRecords()));
 
diff --git a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
index aec9369d18..c8af2325e7 100644
--- a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
+++ b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
@@ -26,7 +26,7 @@ import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanB
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
@@ -76,13 +76,12 @@ public class SyslogFormatPlugin extends EasyFormatPlugin<SyslogFormatConfig> {
   }
 
   @Override
-  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
-    EasySubScan scan, OptionManager options)  {
+  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options)  {
     return new SyslogBatchReader(scan.getMaxRecords(), formatConfig, scan);
   }
 
   @Override
-  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
+  protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
     FileScanBuilder builder = new FileScanBuilder();
     builder.setReaderFactory(new SyslogReaderFactory(scan.getMaxRecords(), formatConfig, scan));
 
diff --git a/docs/dev/JUnit.md b/docs/dev/JUnit4.md
similarity index 100%
rename from docs/dev/JUnit.md
rename to docs/dev/JUnit4.md
diff --git a/docs/dev/Testing.md b/docs/dev/Testing.md
index 337f932ba0..c465e814c2 100644
--- a/docs/dev/Testing.md
+++ b/docs/dev/Testing.md
@@ -8,7 +8,7 @@ Drill makes extensive use of [JUnit](http://junit.org/junit4/) and other librari
 
 * [Test Data Sets](TestDataSets.md)
 * [Temp Directory Utilities](TempDirectories.md)
-* [Testing with JUnit](JUnit.md)
+* [Testing with JUnit4](JUnit4.md)
 * [Test Logging](TestLogging.md)
 
 ## Deprecated Drill Testing Techniques
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 1e84608eea..372bbe666f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -894,7 +894,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
       results.add(result);
     }
 
-    public List<QueryDataBatch> getResults() throws RpcException{
+    public List<QueryDataBatch> getResults() throws RpcException {
       try {
         return future.get();
       } catch (Throwable t) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index c93de9ef47..ab176905fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -981,11 +981,11 @@ public abstract class HashTableTemplate implements HashTable {
 
   @Override
   public void setTargetBatchRowCount(int batchRowCount) {
-    batchHolders.get(batchHolders.size()-1).targetBatchRowCount = batchRowCount;
+    batchHolders.get(batchHolders.size() - 1).targetBatchRowCount = batchRowCount;
   }
 
   @Override
   public int getTargetBatchRowCount() {
-    return batchHolders.get(batchHolders.size()-1).targetBatchRowCount;
+    return batchHolders.get(batchHolders.size() - 1).targetBatchRowCount;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
index e1038d5600..c0ac30bc17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
@@ -180,8 +180,7 @@ public class ScanOperatorExec implements OperatorExec {
   private int readerCount;
   private ReaderState readerState;
 
-  public ScanOperatorExec(ScanOperatorEvents factory,
-      boolean allowEmptyResult) {
+  public ScanOperatorExec(ScanOperatorEvents factory, boolean allowEmptyResult) {
     this.factory = factory;
     this.allowEmptyResult = allowEmptyResult;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
index cb51c76247..7d61f44e82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
@@ -85,10 +85,7 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
     // Partition column
     int partitionIndex = Integer.parseInt(m.group(1));
     if (! referencedPartitions.contains(partitionIndex)) {
-      builder.addMetadataColumn(
-          new PartitionColumn(
-            inCol.name(),
-            partitionIndex));
+      builder.addMetadataColumn(new PartitionColumn(inCol.name(), partitionIndex));
 
       // Remember the partition for later wildcard expansion
       referencedPartitions.add(partitionIndex);
@@ -97,8 +94,7 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
     return true;
   }
 
-  private boolean buildMetadataColumn(FileMetadataColumnDefn defn,
-      RequestedColumn inCol) {
+  private boolean buildMetadataColumn(FileMetadataColumnDefn defn, RequestedColumn inCol) {
 
     // If the projected column is a map or array, then it shadows the
     // metadata column. Example: filename.x, filename[2].
@@ -136,8 +132,7 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
       if (referencedPartitions.contains(i)) {
         continue;
       }
-      builder.addMetadataColumn(new PartitionColumn(
-          metadataManager.partitionName(i), i));
+      builder.addMetadataColumn(new PartitionColumn(metadataManager.partitionName(i), i));
       referencedPartitions.add(i);
     }
     hasImplicitCols = true;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
index b287e6c822..7f1785da78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
@@ -26,6 +26,8 @@ import org.apache.drill.exec.physical.resultSet.project.RequestedColumn;
 import org.apache.drill.exec.physical.resultSet.project.RequestedTuple;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.PrimitiveColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.complex.DictVector;
 import org.slf4j.Logger;
@@ -58,8 +60,7 @@ public class ExplicitSchemaProjection extends ReaderLevelProjection {
     resolveRootTuple(rootTuple, readerSchema);
   }
 
-  private void resolveRootTuple(ResolvedTuple rootTuple,
-      TupleMetadata readerSchema) {
+  private void resolveRootTuple(ResolvedTuple rootTuple, TupleMetadata readerSchema) {
     for (ColumnProjection col : scanProj.columns()) {
       if (col instanceof UnresolvedColumn) {
         resolveColumn(rootTuple, ((UnresolvedColumn) col).element(), readerSchema);
@@ -69,15 +70,12 @@ public class ExplicitSchemaProjection extends ReaderLevelProjection {
     }
   }
 
-  private void resolveColumn(ResolvedTuple outputTuple,
-      RequestedColumn inputCol, TupleMetadata readerSchema) {
+  private void resolveColumn(ResolvedTuple outputTuple, RequestedColumn inputCol, TupleMetadata readerSchema) {
     int tableColIndex = readerSchema.index(inputCol.name());
     if (tableColIndex == -1) {
       resolveNullColumn(outputTuple, inputCol);
     } else {
-      resolveTableColumn(outputTuple, inputCol,
-          readerSchema.metadata(tableColIndex),
-          tableColIndex);
+      resolveTableColumn(outputTuple, inputCol, readerSchema.metadata(tableColIndex), tableColIndex);
     }
   }
 
@@ -87,16 +85,12 @@ public class ExplicitSchemaProjection extends ReaderLevelProjection {
     if (tableColIndex == -1) {
       resolveNullColumn(outputTuple, inputCol);
     } else {
-      resolveTableColumn(outputTuple, inputCol,
-          readerSchema.metadata(tableColIndex),
-          tableColIndex);
+      resolveTableColumn(outputTuple, inputCol, readerSchema.metadata(tableColIndex), tableColIndex);
     }
   }
 
   private void resolveTableColumn(ResolvedTuple outputTuple,
-      RequestedColumn requestedCol,
-      ColumnMetadata column, int sourceIndex) {
-
+      RequestedColumn requestedCol, ColumnMetadata column, int sourceIndex) {
     // Is the requested column implied to be a map?
     // A requested column is a map if the user requests x.y and we
     // are resolving column x. The presence of y as a member implies
@@ -128,30 +122,30 @@ public class ExplicitSchemaProjection extends ReaderLevelProjection {
     }
   }
 
-  private void resolveMap(ResolvedTuple outputTuple,
-      RequestedColumn requestedCol, ColumnMetadata column,
+  private void resolveMap(ResolvedTuple outputTuple, RequestedColumn requestedCol, ColumnMetadata column,
       int sourceIndex) {
 
-    // If the actual column isn't a map, then the request is invalid.
-
-    if (! column.isMap()) {
-      throw UserException
-        .validationError()
-        .message("Project list implies a map column, but actual column is not a map")
-        .addContext("Projected column:", requestedCol.fullName())
-        .addContext("Table column:", column.name())
-        .addContext("Type:", column.type().name())
-        .addContext(scanProj.context())
-        .build(logger);
+    // If the actual column isn't a map, try to change column datatype
+    if (!column.isMap()) {
+      if(column.isScalar() && ((PrimitiveColumnMetadata) column).isSchemaForUnknown()) {
+        column = MetadataUtils.newMap(column.name());
+      } else {
+        throw UserException
+          .validationError()
+          .message("Project list implies a map column, but actual column is not a map")
+          .addContext("Projected column:", requestedCol.fullName())
+          .addContext("Table column:", column.name())
+          .addContext("Type:", column.type().name())
+          .addContext(scanProj.context())
+          .build(logger);
+      }
     }
 
     // The requested column is implied to be a map because it lists
     // members to project. Project these.
 
-    ResolvedMapColumn mapCol = new ResolvedMapColumn(outputTuple,
-        column.schema(), sourceIndex);
-    resolveTuple(mapCol.members(), requestedCol.tuple(),
-        column.tupleSchema());
+    ResolvedMapColumn mapCol = new ResolvedMapColumn(outputTuple, column.schema(), sourceIndex);
+    resolveTuple(mapCol.members(), requestedCol.tuple(), column.tupleSchema());
 
     // If the projection is simple, then just project the map column
     // as is. A projection is simple if all map columns from the table
@@ -177,14 +171,15 @@ public class ExplicitSchemaProjection extends ReaderLevelProjection {
     }
   }
 
-  private void resolveDict(ResolvedTuple outputTuple,
-                          RequestedColumn requestedCol, ColumnMetadata column,
-                          int sourceIndex) {
-
-    // If the actual column isn't a dict, then the request is invalid.
+  private void resolveDict(ResolvedTuple outputTuple, RequestedColumn requestedCol, ColumnMetadata column,
+      int sourceIndex) {
 
+    // If the actual column isn't a dict, try to change column datatype
     if (!column.isDict()) {
-      throw UserException
+      if(column.isScalar() && ((PrimitiveColumnMetadata) column).isSchemaForUnknown()) {
+        column = MetadataUtils.newDict(column.name());
+      } else {
+        throw UserException
           .validationError()
           .message("Project list implies a dict column, but actual column is not a dict")
           .addContext("Projected column:", requestedCol.fullName())
@@ -192,6 +187,7 @@ public class ExplicitSchemaProjection extends ReaderLevelProjection {
           .addContext("Type:", column.type().name())
           .addContext(scanProj.context())
           .build(logger);
+      }
     }
 
     ResolvedDictColumn dictColumn = new ResolvedDictColumn(outputTuple, column.schema(), sourceIndex);
@@ -286,8 +282,7 @@ public class ExplicitSchemaProjection extends ReaderLevelProjection {
    *           column as requested in the project list
    */
 
-  private void resolveNullColumn(ResolvedTuple outputTuple,
-      RequestedColumn requestedCol) {
+  private void resolveNullColumn(ResolvedTuple outputTuple, RequestedColumn requestedCol) {
     ResolvedColumn nullCol;
     if (requestedCol.isTuple()) {
       nullCol = resolveMapMembers(outputTuple, requestedCol);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderLevelProjection.java
index ff14269929..2ecdcf55c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderLevelProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderLevelProjection.java
@@ -83,8 +83,7 @@ public class ReaderLevelProjection {
     }
   }
 
-  protected void resolveSpecial(ResolvedTuple rootOutputTuple, ColumnProjection col,
-      TupleMetadata tableSchema) {
+  protected void resolveSpecial(ResolvedTuple rootOutputTuple, ColumnProjection col, TupleMetadata tableSchema) {
     for (ReaderProjectionResolver resolver : resolvers) {
       if (resolver.resolveColumn(col, rootOutputTuple, tableSchema)) {
         return;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
index 6c9d3fe6e5..6e36cf56a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
@@ -272,14 +272,12 @@ public class ReaderSchemaOrchestrator implements VectorSource {
    */
   private void doWildcardProjection(TupleMetadata tableSchema) {
     rootTuple = newRootTuple();
-    new WildcardProjection(scanOrchestrator.scanProj,
-        tableSchema, rootTuple, scanOrchestrator.options.schemaResolvers);
+    new WildcardProjection(scanOrchestrator.scanProj, tableSchema, rootTuple, scanOrchestrator.options.schemaResolvers);
   }
 
   private void doStrictWildcardProjection(TupleMetadata tableSchema) {
     rootTuple = newRootTuple();
-    new WildcardSchemaProjection(scanOrchestrator.scanProj,
-        tableSchema, rootTuple, scanOrchestrator.options.schemaResolvers);
+    new WildcardSchemaProjection(scanOrchestrator.scanProj, tableSchema, rootTuple, scanOrchestrator.options.schemaResolvers);
   }
 
   private ResolvedRow newRootTuple() {
@@ -300,9 +298,8 @@ public class ReaderSchemaOrchestrator implements VectorSource {
    */
   private void doExplicitProjection(TupleMetadata tableSchema) {
     rootTuple = newRootTuple();
-    new ExplicitSchemaProjection(scanOrchestrator.scanProj,
-            tableSchema, rootTuple,
-            scanOrchestrator.options.schemaResolvers);
+    new ExplicitSchemaProjection(scanOrchestrator.scanProj, tableSchema, rootTuple,
+      scanOrchestrator.options.schemaResolvers);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
index 5b8294c26c..ec079c7dfe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
@@ -204,13 +204,13 @@ public abstract class ResolvedTuple implements VectorSource {
 
     public AbstractMapVector buildMap() {
       if (parentColumn.sourceIndex() != -1) {
-        ResolvedTuple parentTuple = parentColumn.parent();
-        inputMap = (AbstractMapVector) parentTuple.vector(parentColumn.sourceIndex());
+        ValueVector vector = parentColumn.parent().vector(parentColumn.sourceIndex());
+        if(vector instanceof AbstractMapVector) {
+          inputMap = (AbstractMapVector) vector;
+        }
       }
       MaterializedField colSchema = parentColumn.schema();
-      outputMap = createMap(inputMap,
-          MaterializedField.create(
-              colSchema.getName(), colSchema.getType()),
+      outputMap = createMap(inputMap, MaterializedField.create(colSchema.getName(), colSchema.getType()),
           parentColumn.parent().allocator());
       buildColumns();
       return outputMap;
@@ -237,8 +237,7 @@ public abstract class ResolvedTuple implements VectorSource {
     @Override
     protected AbstractMapVector createMap(AbstractMapVector inputMap,
         MaterializedField schema, BufferAllocator allocator) {
-      return new MapVector(schema,
-          allocator, null);
+      return new MapVector(schema, allocator, null);
     }
 
     @Override
@@ -280,8 +279,7 @@ public abstract class ResolvedTuple implements VectorSource {
       RepeatedMapVector source = (RepeatedMapVector) inputMap;
       UInt4Vector offsets = source.getOffsetVector();
       valueCount = offsets.getAccessor().getValueCount();
-      return new RepeatedMapVector(schema,
-          offsets, null);
+      return new RepeatedMapVector(schema, offsets, null);
     }
 
     @Override
@@ -336,8 +334,10 @@ public abstract class ResolvedTuple implements VectorSource {
     @Override
     public ValueVector buildVector() {
       if (parentColumn.sourceIndex() != -1) {
-        ResolvedTuple parentTuple = parentColumn.parent();
-        inputVector = (DictVector) parentTuple.vector(parentColumn.sourceIndex());
+        ValueVector vector = parentColumn.parent().vector(parentColumn.sourceIndex());
+        if(vector instanceof DictVector) {
+          inputVector = (DictVector) vector;
+        }
       }
       MaterializedField colSchema = parentColumn.schema();
       MaterializedField dictField = MaterializedField.create(colSchema.getName(), colSchema.getType());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
index 9c57918ec6..934f32f9b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
@@ -405,8 +405,7 @@ public class ScanLevelProjection {
       }
       rootProjection = Projections.build(outputProj);
     }
-    readerProjection = ProjectionFilter.providedSchemaFilter(
-        rootProjection, readerSchema, errorContext);
+    readerProjection = ProjectionFilter.providedSchemaFilter(rootProjection, readerSchema, errorContext);
   }
 
   /**
@@ -445,8 +444,7 @@ public class ScanLevelProjection {
     // If not consumed, put the wildcard column into the projection list as a
     // placeholder to be filled in later with actual table columns.
     if (expanded) {
-      projectionType =
-          readerSchema.booleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP)
+      projectionType = readerSchema.booleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP)
           ? ScanProjectionType.STRICT_SCHEMA_WILDCARD
           : ScanProjectionType.SCHEMA_WILDCARD;
     } else if (wildcardPosn != -1) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
index 3bf6e71705..922652d76b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
@@ -179,8 +179,8 @@ public class BatchValidator {
   public static boolean validate(RecordBatch batch) {
     // This is a handy place to trace batches as they flow up
     // the DAG. Works best for single-threaded runs with a few records.
-    // System.out.println(batch.getClass().getSimpleName());
-    // RowSetFormatter.print(batch);
+//     System.out.println(batch.getClass().getSimpleName());
+//     RowSetFormatter.print(RowSets.wrap(batch));
     ErrorReporter reporter = errorReporter(batch);
     int rowCount = batch.getRecordCount();
     int valueCount = rowCount;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ProjectionFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ProjectionFilter.java
index 9bcbe5bbf1..a53b740d5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ProjectionFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ProjectionFilter.java
@@ -82,8 +82,8 @@ public interface ProjectionFilter {
     }
   }
 
-  static ProjectionFilter providedSchemaFilter(RequestedTuple tupleProj,
-      TupleMetadata providedSchema, CustomErrorContext errorContext) {
+  static ProjectionFilter providedSchemaFilter(RequestedTuple tupleProj, TupleMetadata providedSchema,
+                                               CustomErrorContext errorContext) {
     if (tupleProj.type() == TupleProjectionType.NONE) {
       return PROJECT_NONE;
     }
@@ -247,8 +247,8 @@ public interface ProjectionFilter {
       } else {
         validateColumn(providedCol, col);
         if (providedCol.isMap()) {
-          return new ProjResult(true, providedCol,
-              new TypeProjectionFilter(providedCol.tupleSchema(), errorContext));
+          return new ProjResult(true, providedCol, new TypeProjectionFilter(providedCol.tupleSchema(),
+            errorContext));
         } else {
           return new ProjResult(true, providedCol);
         }
@@ -279,8 +279,8 @@ public interface ProjectionFilter {
       } else {
         validateColumn(providedCol, col);
         if (providedCol.isMap()) {
-          return new ProjResult(true, providedCol,
-              new SchemaProjectionFilter(providedCol.tupleSchema(), errorContext));
+          return new ProjResult(true, providedCol, new SchemaProjectionFilter(providedCol.tupleSchema(),
+            errorContext));
         } else {
           return new ProjResult(true, providedCol);
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
index 48e0f07275..5ec24ac871 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
@@ -307,7 +307,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
       // provided schema. The schema can be extended later, but normally
       // won't be if known up front.
 
-      logger.debug("Schema: " + options.schema.toString());
+      logger.debug("Schema: " + options.schema);
       BuildFromSchema.instance().buildTuple(rootWriter, options.schema);
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/SingleVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/SingleVectorState.java
index 1723451413..bf471e0bed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/SingleVectorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/SingleVectorState.java
@@ -236,7 +236,7 @@ public abstract class SingleVectorState implements VectorState {
       sourceVector.getMutator().setValueCount(offsetLength );
 
       // Getting offsets right was a pain. If you modify this code,
-      // you'll likely relive that experience. Enabling the next two
+      // you'll likely relive that experience. Enabling the three two
       // lines will help reveal some of the mystery around offsets and their
       // confusing off-by-one design.
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ProjectionChecker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ProjectionChecker.java
index 3769f7298e..5480e6c8dd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ProjectionChecker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ProjectionChecker.java
@@ -22,6 +22,7 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.PrimitiveColumnMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,7 +96,9 @@ public class ProjectionChecker {
       return true;
     }
     if (colReq.isTuple() && !(readCol.isMap() || readCol.isDict() || readCol.isVariant())) {
-      return false;
+      if(!(readCol.isScalar() && ((PrimitiveColumnMetadata) readCol).isSchemaForUnknown())) { // allow unknown schema
+        return false;
+      }
     }
     if (colReq.isArray()) {
       if (colReq.arrayDims() == 1) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
index 082fe2783b..341a81760d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
@@ -127,8 +127,7 @@ public class CreateTableHandler extends DefaultSqlHandler {
                                  String tableName,
                                  List<String> partitionColumns,
                                  RelDataType queryRowType,
-                                 StorageStrategy storageStrategy)
-      throws RelConversionException, SqlUnsupportedException {
+                                 StorageStrategy storageStrategy) throws SqlUnsupportedException {
     final DrillRel convertedRelNode = convertToRawDrel(relNode);
 
     // Put a non-trivial topProject to ensure the final output field name is preserved, when necessary.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java
index f8fcb3f25d..160528edb5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java
@@ -22,7 +22,7 @@ import org.apache.drill.exec.vector.ValueVector;
 
 /**
  * VectorAccessible is an interface. Yet, several operations are done
- * on VectorAccessible over and over gain. While Java 8 allows static
+ * on VectorAccessible over and over gain. TODO. While Java 8 allows static
  * methods on an interface, Drill uses Java 7, which does not. This
  * class is a placeholder for common VectorAccessible methods that
  * can migrate into the interface when Drill upgrades to Java 8.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index 210b0a0e7c..c3024ce100 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -29,6 +29,9 @@ import org.apache.drill.exec.planner.sql.handlers.FindLimit0Visitor;
 import org.apache.drill.exec.store.pojo.PojoRecordReader;
 import org.apache.drill.exec.vector.ValueVector;
 
+/**
+ * For new implementations please use new {@link org.apache.drill.exec.physical.impl.scan.framework.ManagedReader}
+ */
 @JsonTypeInfo(
     use = JsonTypeInfo.Id.NAME,
     include = JsonTypeInfo.As.PROPERTY,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index 00f378c5a4..b117b43f35 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -67,12 +67,12 @@ import org.apache.drill.exec.record.metadata.schema.FsMetastoreSchemaProvider;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
 import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.easy.json.JSONFormatPlugin;
 import org.apache.drill.exec.store.table.function.TableParamDef;
 import org.apache.drill.exec.store.table.function.TableSignature;
 import org.apache.drill.exec.store.table.function.WithOptionsTableMacro;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.store.StorageStrategy;
-import org.apache.drill.exec.store.easy.json.JSONFormatPlugin;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.metastore.MetastoreRegistry;
 import org.apache.drill.metastore.components.tables.MetastoreTableInfo;
@@ -554,7 +554,7 @@ public class WorkspaceSchemaFactory {
     public CreateTableEntry createStatsTable(String tableName) {
       ensureNotStatsTable(tableName);
       final String statsTableName = getStatsTableName(tableName);
-      FormatPlugin formatPlugin = plugin.getFormatPlugin(JSONFormatPlugin.DEFAULT_NAME);
+      FormatPlugin formatPlugin = plugin.getFormatPlugin(JSONFormatPlugin.PLUGIN_NAME);
       return createOrAppendToTable(statsTableName, formatPlugin, Collections.emptyList(),
           StorageStrategy.DEFAULT);
     }
@@ -563,7 +563,7 @@ public class WorkspaceSchemaFactory {
     public CreateTableEntry appendToStatsTable(String tableName) {
       ensureNotStatsTable(tableName);
       final String statsTableName = getStatsTableName(tableName);
-      FormatPlugin formatPlugin = plugin.getFormatPlugin(JSONFormatPlugin.DEFAULT_NAME);
+      FormatPlugin formatPlugin = plugin.getFormatPlugin(JSONFormatPlugin.PLUGIN_NAME);
       return createOrAppendToTable(statsTableName, formatPlugin, Collections.emptyList(),
           StorageStrategy.DEFAULT);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index fad3634d27..681bf4ed06 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -49,7 +49,6 @@ import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionSet;
-import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.StatisticsRecordWriter;
@@ -65,7 +64,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-
 /**
  * Base class for file readers.
  * <p>
@@ -131,7 +129,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
     /**
      *  Choose whether to use the "traditional" or "enhanced" reader
      *  structure. Can also be selected at runtime by overriding
-     *  {@link #useEnhancedScan(OptionSet)}.
+     *  {@link #scanVersion()}.
      */
     private final ScanFrameworkVersion scanVersion;
 
@@ -546,15 +544,15 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
   /**
    * Initialize the scan framework builder with standard options.
    * Call this from the plugin-specific
-   * {@link #frameworkBuilder(OptionSet, EasySubScan)} method.
+   * {@link #frameworkBuilder(EasySubScan, OptionSet)} method.
    * The plugin can then customize/revise options as needed.
    * <p>
    * For EVF V1, to be removed.
    *
    * @param builder the scan framework builder you create in the
-   * {@link #frameworkBuilder(OptionSet, EasySubScan)} method
+   * {@link #frameworkBuilder(EasySubScan, OptionSet)} method
    * @param scan the physical scan operator definition passed to
-   * the {@link #frameworkBuilder(OptionSet, EasySubScan)} method
+   * the {@link #frameworkBuilder(EasySubScan, OptionSet)} method
    */
   protected void initScanBuilder(FileScanBuilder builder, EasySubScan scan) {
     EvfV1ScanBuilder.initScanBuilder(this, builder, scan);
@@ -583,8 +581,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
    * potentially many files
    * @throws ExecutionSetupException for all setup failures
    */
-  protected FileScanBuilder frameworkBuilder(
-      OptionSet options, EasySubScan scan) throws ExecutionSetupException {
+  protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) throws ExecutionSetupException {
     throw new ExecutionSetupException("Must implement frameworkBuilder() if using the enhanced framework.");
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 3b3ea83cef..3c2708a83e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -81,7 +81,7 @@ public class EasyGroupScan extends AbstractGroupScanWithMetadata<TableMetadataPr
 
   private final EasyFormatPlugin<?> formatPlugin;
   private FileSelection selection;
-  private int partitionDepth;
+  private int partitionDepth = -1;
   private int maxWidth;
   private int minWidth = 1;
 
@@ -293,8 +293,8 @@ public class EasyGroupScan extends AbstractGroupScanWithMetadata<TableMetadataPr
     Preconditions.checkArgument(!filesForMinor.isEmpty(),
         String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
 
-    EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin,
-        columns, selectionRoot, partitionDepth, getSchema(), limit);
+    EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin, columns, selectionRoot,
+      partitionDepth, getSchema(), limit);
     subScan.setOperatorId(getOperatorId());
     return subScan;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EvfV1ScanBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EvfV1ScanBuilder.java
index b486d381d3..887d647eca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EvfV1ScanBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EvfV1ScanBuilder.java
@@ -26,7 +26,7 @@ import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanB
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.record.CloseableRecordBatch;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -94,7 +94,7 @@ class EvfV1ScanBuilder {
    * vector and batch sizes. Use this for new format plugins.
    */
   public CloseableRecordBatch build() throws ExecutionSetupException {
-    final FileScanBuilder builder = plugin.frameworkBuilder(context.getOptions(), scan);
+    final FileScanBuilder builder = plugin.frameworkBuilder(scan, context.getOptions());
 
     // Add batch reader, if none specified
 
@@ -107,13 +107,13 @@ class EvfV1ScanBuilder {
   /**
    * Initialize the scan framework builder with standard options.
    * Call this from the plugin-specific
-   * {@link #frameworkBuilder(OptionManager, EasySubScan)} method.
+   * {@link EasyFormatPlugin#frameworkBuilder(EasySubScan, OptionSet)} method.
    * The plugin can then customize/revise options as needed.
    *
    * @param builder the scan framework builder you create in the
-   * {@link #frameworkBuilder(OptionManager, EasySubScan)} method
+   * {@link EasyFormatPlugin#frameworkBuilder(EasySubScan, OptionSet)} method
    * @param scan the physical scan operator definition passed to
-   * the {@link #frameworkBuilder(OptionManager, EasySubScan)} method
+   * the {@link EasyFormatPlugin#frameworkBuilder(EasySubScan, OptionSet)} method
    */
   protected static void initScanBuilder(EasyFormatPlugin<? extends FormatPluginConfig> plugin,
       FileScanBuilder builder, EasySubScan scan) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatConfig.java
new file mode 100644
index 0000000000..0ec66ce967
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatConfig.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.drill.exec.store.easy.json.JSONFormatPlugin.PLUGIN_NAME;
+
+@JsonTypeName(PLUGIN_NAME)
+public class JSONFormatConfig implements FormatPluginConfig {
+  private static final List<String> DEFAULT_EXTS = ImmutableList.of("json");
+
+  private final List<String> extensions;
+  private final Boolean allTextMode;
+  private final Boolean readNumbersAsDouble;
+  private final Boolean skipMalformedJSONRecords;
+  private final Boolean escapeAnyChar;
+  private final Boolean nanInf;
+
+  @JsonCreator
+  public JSONFormatConfig(
+      @JsonProperty("extensions") List<String> extensions,
+      @JsonProperty("allTextMode") Boolean allTextMode,
+      @JsonProperty("readNumbersAsDouble") Boolean readNumbersAsDouble,
+      @JsonProperty("skipMalformedJSONRecords") Boolean skipMalformedJSONRecords,
+      @JsonProperty("escapeAnyChar") Boolean escapeAnyChar,
+      @JsonProperty("nanInf") Boolean nanInf) {
+    this.extensions = extensions == null ? DEFAULT_EXTS : ImmutableList.copyOf(extensions);
+    this.allTextMode = allTextMode;
+    this.readNumbersAsDouble = readNumbersAsDouble;
+    this.skipMalformedJSONRecords = skipMalformedJSONRecords;
+    this.escapeAnyChar = escapeAnyChar;
+    this.nanInf = nanInf;
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+  public List<String> getExtensions() {
+    return extensions;
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  public Boolean getAllTextMode() {
+    return allTextMode;
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  public Boolean getReadNumbersAsDouble() {
+    return readNumbersAsDouble;
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  public Boolean getSkipMalformedJSONRecords() {
+    return skipMalformedJSONRecords;
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  public Boolean getEscapeAnyChar() {
+    return escapeAnyChar;
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  public Boolean getNanInf() {
+    return nanInf;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(extensions, allTextMode, readNumbersAsDouble, skipMalformedJSONRecords, escapeAnyChar, nanInf);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    JSONFormatConfig other = (JSONFormatConfig) obj;
+    return Objects.deepEquals(extensions, other.extensions) &&
+      Objects.equals(allTextMode, other.allTextMode) &&
+      Objects.equals(readNumbersAsDouble, other.readNumbersAsDouble) &&
+      Objects.equals(skipMalformedJSONRecords, other.skipMalformedJSONRecords) &&
+      Objects.equals(escapeAnyChar, other.escapeAnyChar) &&
+      Objects.equals(nanInf, other.nanInf);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("extensions", extensions)
+      .field("allTextMode", allTextMode)
+      .field("readNumbersAsDouble", readNumbersAsDouble)
+      .field("skipMalformedRecords", skipMalformedJSONRecords)
+      .field("escapeAnyChar", escapeAnyChar)
+      .field("nanInf", nanInf)
+      .toString();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index f7a25d5301..52484309be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -19,32 +19,35 @@ package org.apache.drill.exec.store.easy.json;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.QueryContext.SqlStatementType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.planner.common.DrillStatsTable;
 import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.StatisticsRecordWriter;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.drill.exec.store.dfs.easy.EasyWriter;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -52,32 +55,44 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
-
   private static final Logger logger = LoggerFactory.getLogger(JSONFormatPlugin.class);
-  public static final String DEFAULT_NAME = "json";
-
+  public static final String PLUGIN_NAME = "json";
   private static final boolean IS_COMPRESSIBLE = true;
 
-  public static final String OPERATOR_TYPE = "JSON_SUB_SCAN";
+  public static final String READER_OPERATOR_TYPE = "JSON_SUB_SCAN";
+  public static final String WRITER_OPERATOR_TYPE = "JSON_WRITER";
 
   public JSONFormatPlugin(String name, DrillbitContext context,
       Configuration fsConf, StoragePluginConfig storageConfig) {
     this(name, context, fsConf, storageConfig, new JSONFormatConfig(null, null, null, null, null, null));
   }
 
-  public JSONFormatPlugin(String name, DrillbitContext context,
-      Configuration fsConf, StoragePluginConfig config, JSONFormatConfig formatPluginConfig) {
-    super(name, context, fsConf, config, formatPluginConfig, true,
-          false, false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
+  public JSONFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+      StoragePluginConfig config, JSONFormatConfig formatPluginConfig) {
+    super(name, easyConfig(fsConf, formatPluginConfig), context, config, formatPluginConfig);
+  }
+
+  private static EasyFormatConfig easyConfig(Configuration fsConf, JSONFormatConfig pluginConfig) {
+    return EasyFormatConfig.builder()
+      .readable(true)
+      .writable(true)
+      .blockSplittable(false)
+      .compressible(IS_COMPRESSIBLE)
+      .supportsProjectPushdown(true)
+      .extensions(pluginConfig.getExtensions())
+      .fsConf(fsConf)
+      .defaultName(PLUGIN_NAME)
+      .readerOperatorType(READER_OPERATOR_TYPE)
+      .writerOperatorType(WRITER_OPERATOR_TYPE)
+      .scanVersion(ScanFrameworkVersion.EVF_V1)
+      .supportsLimitPushdown(true)
+      .supportsStatistics(true)
+      .build();
   }
 
   @Override
@@ -95,10 +110,10 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
   }
 
   @Override
-  public StatisticsRecordWriter getStatisticsRecordWriter(FragmentContext context, EasyWriter writer)
-      throws IOException {
+  public StatisticsRecordWriter getStatisticsRecordWriter(FragmentContext context, EasyWriter writer) {
     StatisticsRecordWriter recordWriter;
-    //ANALYZE statement requires the special statistics writer
+
+    // ANALYZE statement requires the special statistics writer
     if (!isStatisticsRecordWriter(context, writer)) {
       return null;
     }
@@ -118,18 +133,19 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
   }
 
   private Map<String, String> setupOptions(FragmentContext context, EasyWriter writer, boolean statsOptions) {
-    Map<String, String> options = Maps.newHashMap();
+    Map<String, String> options = new HashMap<>();
     options.put("location", writer.getLocation());
 
+    OptionSet optionMgr = context.getOptions();
     FragmentHandle handle = context.getHandle();
     String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
     options.put("prefix", fragmentId);
     options.put("separator", " ");
     options.put("extension", "json");
-    options.put("extended", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_EXTENDED_TYPES)));
-    options.put("uglify", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_UGLIFY)));
-    options.put("skipnulls", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_SKIPNULLFIELDS)));
-    options.put("enableNanInf", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR)));
+    options.put("extended", Boolean.toString(optionMgr.getBoolean(ExecConstants.JSON_EXTENDED_TYPES_KEY)));
+    options.put("uglify", Boolean.toString(optionMgr.getBoolean(ExecConstants.JSON_WRITER_UGLIFY_KEY)));
+    options.put("skipnulls", Boolean.toString(optionMgr.getBoolean(ExecConstants.JSON_WRITER_SKIP_NULL_FIELDS_KEY)));
+    options.put("enableNanInf", Boolean.toString(optionMgr.getBoolean(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS)));
     if (statsOptions) {
       options.put("queryid", context.getQueryIdString());
     }
@@ -174,107 +190,44 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
     }
   }
 
-  @JsonTypeName("json")
-  public static class JSONFormatConfig implements FormatPluginConfig {
-    private static final List<String> DEFAULT_EXTS = ImmutableList.of("json");
-
-    private final List<String> extensions;
-    private final Boolean allTextMode;
-    private final Boolean readNumbersAsDouble;
-    private final Boolean skipMalformedJSONRecords;
-    private final Boolean escapeAnyChar;
-    private final Boolean nanInf;
-
-    @JsonCreator
-    public JSONFormatConfig(
-        @JsonProperty("extensions") List<String> extensions,
-        @JsonProperty("allTextMode") Boolean allTextMode,
-        @JsonProperty("readNumbersAsDouble") Boolean readNumbersAsDouble,
-        @JsonProperty("skipMalformedJSONRecords") Boolean skipMalformedJSONRecords,
-        @JsonProperty("escapeAnyChar") Boolean escapeAnyChar,
-        @JsonProperty("nanInf") Boolean nanInf) {
-      this.extensions = extensions == null ?
-          DEFAULT_EXTS : ImmutableList.copyOf(extensions);
-      this.allTextMode = allTextMode;
-      this.readNumbersAsDouble = readNumbersAsDouble;
-      this.skipMalformedJSONRecords = skipMalformedJSONRecords;
-      this.escapeAnyChar = escapeAnyChar;
-      this.nanInf = nanInf;
-    }
-
-    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
-    public List<String> getExtensions() {
-      return extensions;
-    }
-
-    @JsonInclude(JsonInclude.Include.NON_ABSENT)
-    public Boolean getAllTextMode() {
-      return allTextMode;
-    }
-
-    @JsonInclude(JsonInclude.Include.NON_ABSENT)
-    public Boolean getReadNumbersAsDouble() {
-      return readNumbersAsDouble;
-    }
-
-    @JsonInclude(JsonInclude.Include.NON_ABSENT)
-    public Boolean getSkipMalformedJSONRecords() {
-      return skipMalformedJSONRecords;
-    }
-
-    @JsonInclude(Include.NON_ABSENT)
-    public Boolean getEscapeAnyChar() {
-      return escapeAnyChar;
-    }
-
-    @JsonInclude(JsonInclude.Include.NON_ABSENT)
-    public Boolean getNanInf() {
-      return nanInf;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(extensions, allTextMode, readNumbersAsDouble, skipMalformedJSONRecords, escapeAnyChar, nanInf);
-    }
+  @Override
+  protected ScanFrameworkVersion scanVersion(OptionSet options) {
+    // Create the "legacy", "V1" reader or the new "V2" version based on
+    // the result set loader. The V2 version is a bit more robust, and
+    // supports the row set framework. However, V1 supports unions.
+    // This code should be temporary.
+    return options.getBoolean(ExecConstants.ENABLE_V2_JSON_READER_KEY)
+      ? ScanFrameworkVersion.EVF_V1
+      : ScanFrameworkVersion.CLASSIC;
+  }
 
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) {
-        return true;
-      }
-      if (obj == null || getClass() != obj.getClass()) {
-        return false;
+  @Override
+  protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) throws ExecutionSetupException {
+    FileScanBuilder builder = new FileScanBuilder();
+    initScanBuilder(builder, scan);
+    builder.setReaderFactory(new FileReaderFactory() {
+      @Override
+      public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+        return new JsonBatchReader();
       }
-      JSONFormatConfig other = (JSONFormatConfig) obj;
-      return Objects.deepEquals(extensions, other.extensions) &&
-        Objects.equals(allTextMode, other.allTextMode) &&
-        Objects.equals(readNumbersAsDouble, other.readNumbersAsDouble) &&
-        Objects.equals(skipMalformedJSONRecords, other.skipMalformedJSONRecords) &&
-        Objects.equals(escapeAnyChar, other.escapeAnyChar) &&
-        Objects.equals(nanInf, other.nanInf);
-    }
+    });
 
-    @Override
-    public String toString() {
-      return new PlanStringBuilder(this)
-        .field("extensions", extensions)
-        .field("allTextMode", allTextMode)
-        .field("readNumbersAsDouble", readNumbersAsDouble)
-        .field("skipMalformedRecords", skipMalformedJSONRecords)
-        .field("escapeAnyChar", escapeAnyChar)
-        .field("nanInf", nanInf)
-        .toString();
-    }
+    // Project missing columns as Varchar, which is at least
+    // compatible with all-text mode. (JSON never returns a nullable
+    // int, so don't use the default.)
+    builder.nullType(Types.optional(MinorType.VARCHAR));
+
+    return builder;
   }
 
   @Override
   public String getReaderOperatorType() {
-    return OPERATOR_TYPE;
+    return READER_OPERATOR_TYPE;
   }
 
   @Override
   public String getWriterOperatorType() {
-     return "JSON_WRITER";
+     return WRITER_OPERATOR_TYPE;
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 7fe6ffaa55..535f24f243 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -34,7 +34,6 @@ import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValue.OptionScope;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
 import org.apache.drill.exec.store.easy.json.JsonProcessor.ReadState;
 import org.apache.drill.exec.store.easy.json.reader.CountingJsonReader;
 import org.apache.drill.exec.vector.BaseValueVector;
@@ -54,8 +53,9 @@ import com.fasterxml.jackson.databind.JsonNode;
  * but is used by some "mini-plan" unit tests, and by the VALUES
  * reader. As a result, this reader cannot be removed and must be
  * maintained until the other uses are converted to the new-style
- * JSON reader.
+ * JSON reader - {@link JsonBatchReader}.
  */
+@Deprecated
 public class JSONRecordReader extends AbstractRecordReader {
   private static final Logger logger = LoggerFactory.getLogger(JSONRecordReader.class);
 
@@ -261,7 +261,7 @@ public class JSONRecordReader extends AbstractRecordReader {
             .build();
       }
       setupParser();
-    } catch (Exception e){
+    } catch (Exception e) {
       handleAndRaise("Failure reading JSON file", e);
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonBatchReader.java
index 48d44f42a4..fae978ac8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonBatchReader.java
@@ -32,6 +32,10 @@ import org.apache.hadoop.mapred.FileSplit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * EVF based reader. It is using by default to read JSON files - store.json.enable_v2_reader = true
+ * The old deprecated one is {@link JSONRecordReader}
+ */
 public class JsonBatchReader implements ManagedReader<FileSchemaNegotiator> {
   private static final Logger logger = LoggerFactory.getLogger(JsonBatchReader.class);
 
@@ -64,6 +68,7 @@ public class JsonBatchReader implements ManagedReader<FileSchemaNegotiator> {
     jsonLoader = new JsonLoaderBuilder()
         .resultSetLoader(negotiator.build())
         .standardOptions(negotiator.queryOptions())
+        .providedSchema(negotiator.providedSchema())
         .errorContext(errorContext)
         .fromStream(stream)
         .build();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/BaseFieldFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/BaseFieldFactory.java
index ad0cba9ffb..03dad7d25b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/BaseFieldFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/BaseFieldFactory.java
@@ -84,8 +84,7 @@ public abstract class BaseFieldFactory implements FieldFactory {
   }
 
   protected ElementParser scalarArrayParserFor(ValueParser element) {
-    return parserFactory().scalarArrayValueParser(
-        new SimpleArrayListener(), element);
+    return parserFactory().scalarArrayValueParser(new SimpleArrayListener(), element);
   }
 
   protected ElementParser scalarArrayParserFor(ArrayWriter writer) {
@@ -96,8 +95,7 @@ public abstract class BaseFieldFactory implements FieldFactory {
    * Create a repeated list listener for a scalar value.
    */
   protected ElementParser multiDimScalarArrayFor(ObjectWriter writer, int dims) {
-    return buildOuterArrays(writer, dims,
-        innerWriter -> scalarArrayParserFor(innerWriter.array()));
+    return buildOuterArrays(writer, dims, innerWriter -> scalarArrayParserFor(innerWriter.array()));
   }
 
   /**
@@ -112,11 +110,8 @@ public abstract class BaseFieldFactory implements FieldFactory {
    * Create a map column and its associated object value listener for the
    * given key and optional provided schema.
    */
-  protected ElementParser objectParserFor(FieldDefn fieldDefn,
-      ColumnMetadata colSchema, TupleMetadata providedSchema) {
-    return objectParserFor(
-            fieldDefn.fieldWriterFor(colSchema).tuple(),
-            providedSchema);
+  protected ElementParser objectParserFor(FieldDefn fieldDefn, ColumnMetadata colSchema, TupleMetadata providedSchema) {
+    return objectParserFor(fieldDefn.fieldWriterFor(colSchema).tuple(), providedSchema);
   }
 
   /**
@@ -129,24 +124,19 @@ public abstract class BaseFieldFactory implements FieldFactory {
   }
 
   protected ElementParser objectArrayParserFor(ArrayWriter arrayWriter, TupleMetadata providedSchema) {
-    return parserFactory().arrayValueParser(
-        new StructureArrayListener(arrayWriter),
-        objectParserFor(arrayWriter.tuple(), providedSchema));
+    return parserFactory().arrayValueParser(new StructureArrayListener(arrayWriter),
+      objectParserFor(arrayWriter.tuple(), providedSchema));
   }
 
   protected ElementParser objectParserFor(TupleWriter writer, TupleMetadata providedSchema) {
-    return parserFactory().objectValueParser(
-        new TupleParser(loader, writer, providedSchema));
+    return parserFactory().objectValueParser(new TupleParser(loader, writer, providedSchema));
   }
 
   /**
    * Create a repeated list listener for a Map.
    */
-  public ElementParser multiDimObjectArrayFor(
-      ObjectWriter writer, int dims, TupleMetadata providedSchema) {
-    return buildOuterArrays(writer, dims,
-        innerWriter ->
-          objectArrayParserFor(innerWriter.array(), providedSchema));
+  public ElementParser multiDimObjectArrayFor(ObjectWriter writer, int dims, TupleMetadata providedSchema) {
+    return buildOuterArrays(writer, dims, innerWriter -> objectArrayParserFor(innerWriter.array(), providedSchema));
   }
 
   /**
@@ -162,19 +152,15 @@ public abstract class BaseFieldFactory implements FieldFactory {
    * a column schema.
    */
   protected ElementParser variantArrayParserFor(ArrayWriter arrayWriter) {
-    return parserFactory().arrayValueParser(
-        new ListArrayListener(arrayWriter),
-        variantParserFor(arrayWriter.variant()));
+    return parserFactory().arrayValueParser(new ListArrayListener(arrayWriter), variantParserFor(arrayWriter.variant()));
   }
 
   /**
    * Create a repeated list listener for a variant. Here, the inner
    * array is provided by a List (which is a repeated Union.)
    */
-  protected ElementParser multiDimVariantArrayParserFor(
-      ObjectWriter writer, int dims) {
-    return buildOuterArrays(writer, dims,
-        innerWriter -> variantArrayParserFor(innerWriter.array()));
+  protected ElementParser multiDimVariantArrayParserFor(ObjectWriter writer, int dims) {
+    return buildOuterArrays(writer, dims, innerWriter -> variantArrayParserFor(innerWriter.array()));
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/FieldDefn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/FieldDefn.java
index e1d3238c0a..945496129b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/FieldDefn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/FieldDefn.java
@@ -112,7 +112,11 @@ public class FieldDefn {
   }
 
   public ColumnMetadata schemaFor(MinorType type, boolean isArray) {
-    return MetadataUtils.newScalar(key, type, mode(isArray));
+    return schemaFor(type, isArray, false);
+  }
+
+  public ColumnMetadata schemaFor(MinorType type, boolean isArray, boolean forUnknownSchema) {
+    return MetadataUtils.newScalar(key, type, mode(isArray), forUnknownSchema);
   }
 
   public DataMode mode(boolean isArray) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/InferredFieldFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/InferredFieldFactory.java
index 2f224e547c..6a9da7235f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/InferredFieldFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/InferredFieldFactory.java
@@ -105,25 +105,19 @@ public class InferredFieldFactory extends BaseFieldFactory {
   }
 
   private ValueParser forceResolution(FieldDefn fieldDefn, boolean isArray) {
-    return unknownParserFor(
-        fieldDefn.scalarWriterFor(
-            schemaForUnknown(fieldDefn, isArray)));
+    return unknownParserFor(fieldDefn.scalarWriterFor(schemaForUnknown(fieldDefn, isArray)));
   }
 
   private ColumnMetadata schemaForUnknown(FieldDefn fieldDefn, boolean isArray) {
-    if (loader.options().unknownsAsJson) {
-      return fieldDefn.schemaFor(MinorType.VARCHAR, isArray);
-    } else {
-      return fieldDefn.schemaFor(loader.options().nullType, isArray);
-    }
+    return loader.options().unknownsAsJson
+      ? fieldDefn.schemaFor(MinorType.VARCHAR, isArray, true)
+      : fieldDefn.schemaFor(loader.options().nullType, isArray, true);
   }
 
   private ValueParser unknownParserFor(ScalarWriter writer) {
-    if (loader.options().unknownsAsJson) {
-      return parserFactory().jsonTextParser(new VarCharListener(loader, writer));
-    } else {
-      return parserFactory().simpleValueParser(scalarListenerFor(writer));
-    }
+    return loader.options().unknownsAsJson
+      ? parserFactory().jsonTextParser(new VarCharListener(loader, writer))
+      : parserFactory().simpleValueParser(scalarListenerFor(writer));
   }
 
   private ElementParser resolveField(FieldDefn fieldDefn) {
@@ -153,11 +147,9 @@ public class InferredFieldFactory extends BaseFieldFactory {
   public ValueParser scalarParserFor(FieldDefn fieldDefn, boolean isArray) {
     if (loader.options().allTextMode) {
       return parserFactory().textValueParser(
-          new VarCharListener(loader,
-              fieldDefn.scalarWriterFor(MinorType.VARCHAR, isArray)));
+        new VarCharListener(loader, fieldDefn.scalarWriterFor(MinorType.VARCHAR, isArray)));
     } else {
-      return scalarParserFor(fieldDefn,
-              fieldDefn.schemaFor(scalarTypeFor(fieldDefn), isArray));
+      return scalarParserFor(fieldDefn, fieldDefn.schemaFor(scalarTypeFor(fieldDefn), isArray));
     }
   }
 
@@ -215,8 +207,7 @@ public class InferredFieldFactory extends BaseFieldFactory {
   private MinorType scalarTypeFor(FieldDefn fieldDefn) {
     MinorType colType = drillTypeFor(fieldDefn.lookahead().type());
     if (colType == null) {
-      throw loader().unsupportedJsonTypeException(
-          fieldDefn.key(), fieldDefn.lookahead().type());
+      throw loader().unsupportedJsonTypeException(fieldDefn.key(), fieldDefn.lookahead().type());
     }
     return colType;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index edc687f4aa..e5755cb07b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -46,7 +46,6 @@ import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.esri.core.geometry.JsonReader;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonToken;
 
@@ -99,7 +98,7 @@ import com.fasterxml.jackson.core.JsonToken;
  *
  * <h4>Comparison to Original JSON Reader</h4>
  *
- * This class replaces the {@link JsonReader} class used in Drill versions 1.17
+ * This class replaces the {@link org.apache.drill.exec.vector.complex.fn.JsonReader} class used in Drill versions 1.17
  * and before. Compared with the previous version, this implementation:
  * <ul>
  * <li>Materializes parse states as classes rather than as methods and
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/TupleParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/TupleParser.java
index af492c6c9d..100ddc39a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/TupleParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/TupleParser.java
@@ -143,17 +143,14 @@ public class TupleParser extends ObjectParser {
   }
 
   public ElementParser resolveArray(String key, TokenIterator tokenizer) {
-    return replaceFieldParser(key,
-        fieldFactory().fieldParser(new FieldDefn(this, key, tokenizer, true)));
+    return replaceFieldParser(key, fieldFactory().fieldParser(new FieldDefn(this, key, tokenizer, true)));
   }
 
   public void forceNullResolution(String key) {
-    replaceFieldParser(key,
-        fieldFactory().forceNullResolution(new FieldDefn(this, key, null)));
+    replaceFieldParser(key, fieldFactory().forceNullResolution(new FieldDefn(this, key, null)));
   }
 
   public void forceEmptyArrayResolution(String key) {
-    replaceFieldParser(key,
-        fieldFactory().forceArrayResolution(new FieldDefn(this, key, null)));
+    replaceFieldParser(key, fieldFactory().forceArrayResolution(new FieldDefn(this, key, null)));
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
index 2f71aa8101..98163fd053 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
@@ -202,25 +202,25 @@ public class JsonStructureParser {
     }
     switch (token) {
 
-      // File contains an array of records.
-      case START_ARRAY:
-        if (options.skipOuterList) {
-          return new RootArrayParser(this);
-        } else {
-          throw errorFactory().structureError(
-              "JSON includes an outer array, but outer array support is not enabled");
-        }
+    // File contains an array of records.
+    case START_ARRAY:
+      if (options.skipOuterList) {
+        return new RootArrayParser(this);
+      } else {
+        throw errorFactory().structureError(
+            "JSON includes an outer array, but outer array support is not enabled");
+      }
 
-      // File contains a sequence of one or more records,
-      // presumably sequentially.
-      case START_OBJECT:
-        tokenizer.unget(token);
-        return new RootObjectParser(this);
+    // File contains a sequence of one or more records,
+    // presumably sequentially.
+    case START_OBJECT:
+      tokenizer.unget(token);
+      return new RootObjectParser(this);
 
-      // Not a valid JSON file for Drill.
-      // Won't get here because the Jackson parser catches errors.
-      default:
-        throw errorFactory().syntaxError(token);
+    // Not a valid JSON file for Drill.
+    // Won't get here because the Jackson parser catches errors.
+    default:
+      throw errorFactory().syntaxError(token);
     }
   }
 
@@ -254,6 +254,7 @@ public class JsonStructureParser {
     }
     while (true) {
       try {
+        // System.out.println(tokenizer.stringValue());
         return rootState.parseRoot(tokenizer);
       } catch (RecoverableJsonException e) {
         if (! recover()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonValueParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonValueParser.java
index a8622bb22f..b6c6708a0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonValueParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonValueParser.java
@@ -56,9 +56,9 @@ public class JsonValueParser extends ValueParser {
         break;
 
       case VALUE_STRING:
-        json.append("\"");
+        //json.append("\"");
         json.append(textValue);
-        json.append("\"");
+        //json.append("\"");
         break;
 
       default:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ObjectValueParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ObjectValueParser.java
index 975e0be865..5a76f0ebb1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ObjectValueParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ObjectValueParser.java
@@ -39,6 +39,7 @@ public class ObjectValueParser extends AbstractElementParser {
         objectParser.parse(tokenizer);
         break;
       case VALUE_NULL:
+      case VALUE_STRING:
         // Silently ignore, treat as a missing field
         break;
       default:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DateValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DateValueListener.java
index 8609a1e66c..3d221453a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DateValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DateValueListener.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.easy.json.values;
 
 import java.time.Duration;
 import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
 
 import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
 import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
@@ -56,7 +57,10 @@ public class DateValueListener extends ScalarListener {
           // want to copy the offset since the epoch from UTC to our local
           // time, so that we retain the date, even if the span of the date
           // is different locally than UTC. A mess.
-          LocalDate localDate = LocalDate.parse(tokenizer.stringValue());
+          final String formatValue = schema().format();
+          DateTimeFormatter dateTimeFormatter = formatValue == null
+            ? DateTimeFormatter.ISO_LOCAL_DATE : DateTimeFormatter.ofPattern(formatValue);
+          LocalDate localDate = LocalDate.parse(tokenizer.stringValue(), dateTimeFormatter);
           writer.setLong(Duration.between(TimestampValueListener.LOCAL_EPOCH,
               localDate.atStartOfDay()).toMillis());
         } catch (Exception e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
index 43812a22b2..b80bed4052 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
@@ -143,8 +143,7 @@ public class LogFormatPlugin extends EasyFormatPlugin<LogFormatConfig> {
    * </ul>
    */
   @Override
-  protected FileScanBuilder frameworkBuilder(
-      OptionSet options, EasySubScan scan) throws ExecutionSetupException {
+  protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) throws ExecutionSetupException {
 
     // Pattern and schema identical across readers; define
     // up front.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 1bbcb97551..222906224c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -42,9 +42,14 @@ import com.fasterxml.jackson.databind.JsonNode;
 
 import io.netty.buffer.DrillBuf;
 
+/**
+ * This is used by old-style {@link org.apache.drill.exec.store.easy.json.JSONRecordReader}.
+ * Please use new {@link org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl} along with
+ * {@link org.apache.drill.exec.store.easy.json.JsonBatchReader} instead of this reader
+ */
+@Deprecated
 public class JsonReader extends BaseJsonReader {
-  private static final Logger logger =
-      LoggerFactory.getLogger(JsonReader.class);
+  private static final Logger logger = LoggerFactory.getLogger(JsonReader.class);
   public final static int MAX_RECORD_SIZE = 128 * 1024;
 
   private final WorkingBuffer workingBuffer;
@@ -380,8 +385,7 @@ public class JsonReader extends BaseJsonReader {
    * @return
    * @throws IOException
    */
-  private boolean writeMapDataIfTyped(MapWriter writer, String fieldName)
-      throws IOException {
+  private boolean writeMapDataIfTyped(MapWriter writer, String fieldName) throws IOException {
     if (extended) {
       return mapOutput.run(writer, fieldName);
     } else {
@@ -426,7 +430,7 @@ public class JsonReader extends BaseJsonReader {
         workingBuffer.getBuf());
   }
 
-  private void writeData(ListWriter list) throws IOException {
+  private void writeData(ListWriter list) {
     list.startList();
     outside: while (true) {
       try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java
index ea3bfd4150..040679a2c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java
@@ -88,7 +88,7 @@ abstract class VectorOutput {
     this.parser = parser;
   }
 
-  protected boolean innerRun() throws IOException{
+  protected boolean innerRun() throws IOException {
     JsonToken t = parser.nextToken();
     if (t != JsonToken.FIELD_NAME) {
       return false;
@@ -155,25 +155,24 @@ abstract class VectorOutput {
     return checkToken(parser.getCurrentToken(), expected1, expected2);
   }
 
-  boolean hasType() throws JsonParseException, IOException {
+  boolean hasType() throws IOException {
     JsonToken token = parser.nextToken();
     return token == JsonToken.FIELD_NAME && parser.getText().equals(ExtendedTypeName.TYPE);
   }
 
-  boolean hasBinary() throws JsonParseException, IOException {
+  boolean hasBinary() throws IOException {
     JsonToken token = parser.nextToken();
     return token == JsonToken.FIELD_NAME && parser.getText().equals(ExtendedTypeName.BINARY);
   }
 
-  long getType() throws JsonParseException, IOException {
+  long getType() throws IOException {
     if (!checkNextToken(JsonToken.VALUE_NUMBER_INT, JsonToken.VALUE_STRING)) {
       long type = parser.getValueAsLong();
       //Advancing the token, as checking current token in binary
       parser.nextToken();
       return type;
     }
-    throw new JsonParseException("Failure while reading $type value. Expected a NUMBER or STRING",
-        parser.getCurrentLocation());
+    throw new JsonParseException(parser, "Failure while reading $type value. Expected a NUMBER or STRING");
   }
 
   public boolean checkToken(final JsonToken t, final JsonToken expected1, final JsonToken expected2) throws IOException{
@@ -184,8 +183,8 @@ abstract class VectorOutput {
     } else if (t == expected2) {
       return false;
     } else {
-      throw new JsonParseException(String.format("Failure while reading ExtendedJSON typed value. Expected a %s but "
-          + "received a token of type %s", expected1, t), parser.getCurrentLocation());
+      throw new JsonParseException(parser, String.format("Failure while reading ExtendedJSON typed value. Expected a %s but "
+          + "received a token of type %s", expected1, t));
     }
   }
 
@@ -197,7 +196,7 @@ abstract class VectorOutput {
   public abstract void writeInteger(boolean isNull) throws IOException;
   public abstract void writeDecimal(boolean isNull) throws IOException;
 
-  static class ListVectorOutput extends VectorOutput{
+  static class ListVectorOutput extends VectorOutput {
     private ListWriter writer;
 
     public ListVectorOutput(WorkingBuffer work) {
@@ -262,7 +261,7 @@ abstract class VectorOutput {
           // 1. https://docs.mongodb.com/manual/reference/mongodb-extended-json
           // 2. org.apache.drill.exec.store.easy.json.values.UtcTimestampValueListener
           Instant instant = isoDateTimeFormatter.parse(parser.getValueAsString(), Instant::from);
-          long offset = ZoneId.systemDefault().getRules().getOffset(instant).getTotalSeconds() * 1000;
+          long offset = ZoneId.systemDefault().getRules().getOffset(instant).getTotalSeconds() * 1000L;
           ts.writeTimeStamp(instant.toEpochMilli() + offset);
           break;
         default:
@@ -295,7 +294,7 @@ abstract class VectorOutput {
 
     @Override
     public void writeDecimal(boolean isNull) throws IOException {
-      throw new JsonParseException("Decimal Extended types not yet supported.", parser.getCurrentLocation());
+      throw new JsonParseException(parser, "Decimal Extended types not yet supported");
     }
   }
 
@@ -368,7 +367,7 @@ abstract class VectorOutput {
           // 1. https://docs.mongodb.com/manual/reference/mongodb-extended-json
           // 2. org.apache.drill.exec.store.easy.json.values.UtcTimestampValueListener
           Instant instant = isoDateTimeFormatter.parse(parser.getValueAsString(), Instant::from);
-          long offset = ZoneId.systemDefault().getRules().getOffset(instant).getTotalSeconds() * 1000;
+          long offset = ZoneId.systemDefault().getRules().getOffset(instant).getTotalSeconds() * 1000L;
           ts.writeTimeStamp(instant.toEpochMilli() + offset);
           break;
         default:
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java b/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java
index 5ff50b9610..1acfecdd26 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java
@@ -36,6 +36,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.BaseTestQuery;
@@ -212,23 +213,28 @@ public class TestFrameworkTest extends BaseTestQuery {
     LocalDateTime localDT = LocalDateTime.of(2019, 9, 30, 20, 47, 43, 123);
     Instant instant = localDT.atZone(ZoneId.systemDefault()).toInstant();
     long ts = instant.toEpochMilli() + instant.getNano();
-    ts = ts + ZoneId.systemDefault().getRules().getOffset(instant).getTotalSeconds() * 1000;
-    testBuilder()
-        .sqlQuery("select * from cp.`jsoninput/input2.json` limit 1")
-        .ordered()
-        .baselineColumns("integer", "float", "x", "z", "l", "rl", "`date`")
-        .baselineValues(2010l,
-                        17.4,
-                        mapOf("y", "kevin",
-                            "z", "paul"),
-                        listOf(mapOf("orange", "yellow",
-                                "pink", "red"),
-                            mapOf("pink", "purple")),
-                        listOf(4l, 2l),
-                        listOf(listOf(2l, 1l),
-                            listOf(4l, 6l)),
-                        LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault()))
-        .build().run();
+    ts = ts + ZoneId.systemDefault().getRules().getOffset(instant).getTotalSeconds() * 1000L;
+    try {
+      testBuilder()
+          .ordered()
+          .enableSessionOption(ExecConstants.JSON_EXTENDED_TYPES_KEY)
+          .sqlQuery("select * from cp.`jsoninput/input2.json` limit 1")
+          .baselineColumns("integer", "float", "x", "z", "l", "rl", "`date`")
+          .baselineValues(2010l,
+                          17.4,
+                          mapOf("y", "kevin",
+                              "z", "paul"),
+                          listOf(mapOf("orange", "yellow",
+                                  "pink", "red"),
+                              mapOf("pink", "purple")),
+                          listOf(4l, 2l),
+                          listOf(listOf(2l, 1l),
+                              listOf(4l, 6l)),
+                          LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault()))
+          .build().run();
+    } finally {
+      resetSessionOption(ExecConstants.JSON_EXTENDED_TYPES_KEY);
+    }
   }
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java
index df83dbc159..4c042d04b4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java
@@ -221,7 +221,6 @@ public class TestStarQueries extends BaseTestQuery {
         .sqlQuery("select *, n_nationkey as key2 from cp.`tpch/nation.parquet` order by n_name limit 2")
         .sqlBaselineQuery("select n_comment, n_name, n_nationkey, n_regionkey, n_nationkey as key2 from cp.`tpch/nation.parquet` order by n_name limit 2")
         .build().run();
-
   }
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/impl/TestTypeFns.java b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/impl/TestTypeFns.java
index 3870c9d657..ebeb3c0dd8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/impl/TestTypeFns.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/impl/TestTypeFns.java
@@ -17,9 +17,10 @@
  */
 package org.apache.drill.exec.expr.fn.impl;
 
+import static org.apache.drill.exec.ExecConstants.ENABLE_UNION_TYPE_KEY;
+import static org.apache.drill.exec.ExecConstants.ENABLE_V2_JSON_READER_KEY;
 import static org.junit.Assert.assertEquals;
 
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.test.ClusterFixture;
@@ -313,7 +314,7 @@ public class TestTypeFns extends ClusterTest {
   @Test
   public void testTypeOfWithFileV1() throws Exception {
     try {
-      enableV2Reader(false);
+      client.alterSession(ENABLE_V2_JSON_READER_KEY, false);
       // Column `x` does not actually appear in the file.
       String sql ="SELECT typeof(bi) AS bi_t, typeof(fl) AS fl_t, typeof(st) AS st_t,\n" +
                   "       typeof(mp) AS mp_t, typeof(ar) AS ar_t, typeof(nu) AS nu_t,\n" +
@@ -326,7 +327,7 @@ public class TestTypeFns extends ClusterTest {
         .baselineValues( "BIGINT", "FLOAT8", "VARCHAR", "MAP",  "BIGINT", "NULL", "NULL")
         .go();
     } finally {
-      resetV2Reader();
+      client.resetSession(ENABLE_V2_JSON_READER_KEY);
     }
   }
 
@@ -335,31 +336,26 @@ public class TestTypeFns extends ClusterTest {
    */
   @Test
   public void testTypeOfWithFileV2() throws Exception {
-    try {
-      enableV2Reader(true);
-      // Column `x` does not actually appear in the file.
-      String sql ="SELECT typeof(bi) AS bi_t, typeof(fl) AS fl_t, typeof(st) AS st_t,\n" +
-                  "       typeof(mp) AS mp_t, typeof(ar) AS ar_t, typeof(nu) AS nu_t,\n" +
-                  "       typeof(x) AS x_t\n" +
-                  "FROM cp.`jsoninput/allTypes.json`";
-       testBuilder()
-        .sqlQuery(sql)
-        .ordered()
-        .baselineColumns("bi_t",   "fl_t",   "st_t",    "mp_t", "ar_t",   "nu_t",    "x_t")
-        .baselineValues( "BIGINT", "FLOAT8", "VARCHAR", "MAP",  "BIGINT", "VARCHAR", "VARCHAR")
-        .go();
-    } finally {
-      resetV2Reader();
-    }
+    // Column `x` does not actually appear in the file.
+    String sql ="SELECT typeof(bi) AS bi_t, typeof(fl) AS fl_t, typeof(st) AS st_t,\n" +
+                "       typeof(mp) AS mp_t, typeof(ar) AS ar_t, typeof(nu) AS nu_t,\n" +
+                "       typeof(x) AS x_t\n" +
+                "FROM cp.`jsoninput/allTypes.json`";
+     testBuilder()
+      .sqlQuery(sql)
+      .ordered()
+      .baselineColumns("bi_t",   "fl_t",   "st_t",    "mp_t", "ar_t",   "nu_t",    "x_t")
+      .baselineValues( "BIGINT", "FLOAT8", "VARCHAR", "MAP",  "BIGINT", "VARCHAR", "VARCHAR")
+      .go();
   }
 
   @Test
   public void testUnionType() throws Exception {
-    String sql ="SELECT typeof(a) AS t, modeof(a) AS m, drilltypeof(a) AS dt\n" +
-                "FROM cp.`jsoninput/union/c.json`";
+    String sql ="SELECT typeof(a) AS t, modeof(a) AS m, drilltypeof(a) AS dt FROM cp.`jsoninput/union/c.json`";
     try {
       testBuilder()
-        .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+        .enableSessionOption(ENABLE_UNION_TYPE_KEY)
+        .disableSessionOption(ENABLE_V2_JSON_READER_KEY)
         .sqlQuery(sql)
         .ordered()
         .baselineColumns("t",       "m",        "dt")
@@ -371,17 +367,9 @@ public class TestTypeFns extends ClusterTest {
         .baselineValues( "LIST",    "NULLABLE", "UNION")
         .baselineValues( "NULL",    "NULLABLE", "UNION")
         .go();
+    } finally {
+      client.resetSession(ENABLE_UNION_TYPE_KEY);
+      client.resetSession(ENABLE_V2_JSON_READER_KEY);
     }
-    finally {
-      client.resetSession(ExecConstants.ENABLE_UNION_TYPE_KEY);
-    }
-  }
-
-  private void enableV2Reader(boolean enable) throws Exception {
-    client.alterSession(ExecConstants.ENABLE_V2_JSON_READER_KEY, enable);
-  }
-
-  private void resetV2Reader() throws Exception {
-    client.resetSession(ExecConstants.ENABLE_V2_JSON_READER_KEY);
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/metastore/TestMetastoreWithEasyFormatPlugin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/metastore/TestMetastoreWithEasyFormatPlugin.java
index ec88737e3e..d0cb8954a1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/metastore/TestMetastoreWithEasyFormatPlugin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/metastore/TestMetastoreWithEasyFormatPlugin.java
@@ -610,38 +610,12 @@ public class TestMetastoreWithEasyFormatPlugin extends ClusterTest {
 
     Path tablePath = new Path(table.toURI().getPath());
 
-    TupleMetadata schema = new SchemaBuilder()
-        .addNullable("dir0", TypeProtos.MinorType.VARCHAR)
-        .addNullable("dir1", TypeProtos.MinorType.VARCHAR)
-        .addNullable("o_orderkey", TypeProtos.MinorType.BIGINT)
-        .addNullable("o_custkey", TypeProtos.MinorType.BIGINT)
-        .addNullable("o_orderstatus", TypeProtos.MinorType.VARCHAR)
-        .addNullable("o_totalprice", TypeProtos.MinorType.FLOAT8)
-        .addNullable("o_orderdate", TypeProtos.MinorType.VARCHAR)
-        .addNullable("o_orderpriority", TypeProtos.MinorType.VARCHAR)
-        .addNullable("o_clerk", TypeProtos.MinorType.VARCHAR)
-        .addNullable("o_shippriority", TypeProtos.MinorType.BIGINT)
-        .addNullable("o_comment", TypeProtos.MinorType.VARCHAR)
-        .build();
-
     Map<SchemaPath, ColumnStatistics<?>> tableColumnStatistics = new HashMap<>(TABLE_COLUMN_STATISTICS);
-    tableColumnStatistics.put(SchemaPath.getSimplePath("o_custkey"),
-        getColumnStatistics(25L,
-            1498L, 120L, TypeProtos.MinorType.BIGINT));
-    tableColumnStatistics.put(SchemaPath.getSimplePath("o_orderdate"),
-        getColumnStatistics("1994-01-01T00:00:00.000-08:00",
-            "1996-12-19T00:00:00.000-08:00", 120L, TypeProtos.MinorType.VARCHAR));
-    tableColumnStatistics.put(SchemaPath.getSimplePath("o_orderkey"),
-        getColumnStatistics(1L,
-            1319L, 120L, TypeProtos.MinorType.BIGINT));
-    tableColumnStatistics.put(SchemaPath.getSimplePath("o_shippriority"),
-        getColumnStatistics(0L,
-            0L, 120L, TypeProtos.MinorType.BIGINT));
 
     BaseTableMetadata expectedTableMetadata = BaseTableMetadata.builder()
         .tableInfo(tableInfo)
         .metadataInfo(TABLE_META_INFO)
-        .schema(schema)
+        .schema(SCHEMA)
         .location(new Path(table.toURI().getPath()))
         .columnsStatistics(tableColumnStatistics)
         .metadataStatistics(Arrays.asList(new StatisticsHolder<>(120L, TableStatisticsKind.ROW_COUNT),
@@ -657,18 +631,6 @@ public class TestMetastoreWithEasyFormatPlugin extends ClusterTest {
         .build();
 
     Map<SchemaPath, ColumnStatistics<?>> dir0CSVStats = new HashMap<>(DIR0_1994_SEGMENT_COLUMN_STATISTICS);
-    dir0CSVStats.put(SchemaPath.getSimplePath("o_custkey"),
-        getColumnStatistics(25L,
-            1469L, 40L, TypeProtos.MinorType.BIGINT));
-    dir0CSVStats.put(SchemaPath.getSimplePath("o_orderdate"),
-        getColumnStatistics("1994-01-01T00:00:00.000-08:00",
-            "1994-12-23T00:00:00.000-08:00", 40L, TypeProtos.MinorType.VARCHAR));
-    dir0CSVStats.put(SchemaPath.getSimplePath("o_orderkey"),
-        getColumnStatistics(5L,
-            1031L, 40L, TypeProtos.MinorType.BIGINT));
-    dir0CSVStats.put(SchemaPath.getSimplePath("o_shippriority"),
-        getColumnStatistics(0L,
-            0L, 40L, TypeProtos.MinorType.BIGINT));
 
     SegmentMetadata dir0 = SegmentMetadata.builder()
         .tableInfo(baseTableInfo)
@@ -678,7 +640,7 @@ public class TestMetastoreWithEasyFormatPlugin extends ClusterTest {
             .key("1994")
             .build())
         .path(new Path(tablePath, "1994"))
-        .schema(schema)
+        .schema(SCHEMA)
         .lastModifiedTime(getMaxLastModified(new File(table, "1994")))
         .column(SchemaPath.getSimplePath("dir0"))
         .columnsStatistics(dir0CSVStats)
@@ -720,19 +682,6 @@ public class TestMetastoreWithEasyFormatPlugin extends ClusterTest {
     expectedSegmentFilesLocations.add(segmentFiles);
 
     Map<SchemaPath, ColumnStatistics<?>> dir0q1Stats = new HashMap<>(DIR0_1994_Q1_SEGMENT_COLUMN_STATISTICS);
-    dir0q1Stats.put(SchemaPath.getSimplePath("o_custkey"),
-        getColumnStatistics(392L,
-            1411L, 10L, TypeProtos.MinorType.BIGINT));
-    dir0q1Stats.put(SchemaPath.getSimplePath("o_orderdate"),
-        getColumnStatistics("1994-01-01T00:00:00.000-08:00",
-            "1994-03-26T00:00:00.000-08:00", 10L, TypeProtos.MinorType.VARCHAR));
-    dir0q1Stats.put(SchemaPath.getSimplePath("o_orderkey"),
-        getColumnStatistics(66L,
-            833L, 10L, TypeProtos.MinorType.BIGINT));
-    dir0q1Stats.put(SchemaPath.getSimplePath("o_shippriority"),
-        getColumnStatistics(0L,
-            0L, 10L, TypeProtos.MinorType.BIGINT));
-
     long dir0q1lastModified = new File(new File(new File(table, "1994"), "Q1"), "orders_94_q1.json").lastModified();
     FileMetadata dir01994q1File = FileMetadata.builder()
         .tableInfo(baseTableInfo)
@@ -741,7 +690,7 @@ public class TestMetastoreWithEasyFormatPlugin extends ClusterTest {
             .identifier("1994/Q1/orders_94_q1.json")
             .key("1994")
             .build())
-        .schema(schema)
+        .schema(SCHEMA)
         .lastModifiedTime(dir0q1lastModified)
         .columnsStatistics(dir0q1Stats)
         .metadataStatistics(Collections.singletonList(new StatisticsHolder<>(10L, TableStatisticsKind.ROW_COUNT)))
@@ -785,9 +734,7 @@ public class TestMetastoreWithEasyFormatPlugin extends ClusterTest {
           .collect(Collectors.toSet());
 
       // verify top segments locations
-      assertEquals(
-          expectedTopLevelSegmentLocations,
-          topLevelSegmentLocations);
+      assertEquals(expectedTopLevelSegmentLocations, topLevelSegmentLocations);
 
       Set<Set<Path>> segmentFilesLocations = topSegmentMetadata.stream()
           .map(SegmentMetadata::getLocations)
@@ -815,7 +762,7 @@ public class TestMetastoreWithEasyFormatPlugin extends ClusterTest {
               .key("1994")
               .build())
           .path(new Path(new Path(tablePath, "1994"), "Q1"))
-          .schema(schema)
+          .schema(SCHEMA)
           .lastModifiedTime(getMaxLastModified(new File(new File(table, "1994"), "Q1")))
           .column(SchemaPath.getSimplePath("dir1"))
           .columnsStatistics(dir0q1Stats)
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
index 4aab0a4275..a91cb6d88f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.TopN;
 
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.test.TestBuilder;
@@ -145,21 +146,27 @@ public class TestTopNSchemaChanges extends BaseTestQuery {
     }
     writer.close();
 
-    TestBuilder builder = testBuilder()
-      .sqlQuery("select * from dfs.`%s` order by kl limit 8", TABLE)
-      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
-      .ordered()
-      .baselineColumns("kl", "vl");
-
-    builder.baselineValues(0l, 0l);
-    builder.baselineValues(1.0d, 1.0d);
-    builder.baselineValues("2", "2");
-    builder.baselineValues(3l, 3l);
-    builder.baselineValues(4.0d, 4.0d);
-    builder.baselineValues("5", "5");
-    builder.baselineValues(6l, 6l);
-    builder.baselineValues(7.0d, 7.0d);
-    builder.go();
+    try {
+      TestBuilder builder = testBuilder()
+        .sqlQuery("select * from dfs.`%s` order by kl limit 8", TABLE)
+        .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+        .optionSettingQueriesForTestQuery("alter session set `store.json.enable_v2_reader` = false")
+        .ordered()
+        .baselineColumns("kl", "vl");
+
+      builder.baselineValues(0l, 0l);
+      builder.baselineValues(1.0d, 1.0d);
+      builder.baselineValues("2", "2");
+      builder.baselineValues(3l, 3l);
+      builder.baselineValues(4.0d, 4.0d);
+      builder.baselineValues("5", "5");
+      builder.baselineValues(6l, 6l);
+      builder.baselineValues(7.0d, 7.0d);
+      builder.go();
+    } finally {
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
+      resetSessionOption(ExecConstants.ENABLE_V2_JSON_READER_KEY);
+    }
   }
 
   @Test
@@ -181,38 +188,46 @@ public class TestTopNSchemaChanges extends BaseTestQuery {
     }
     writer.close();
 
-    TestBuilder builder = testBuilder()
-      .sqlQuery("select kl, vl, kl1, vl1, kl2, vl2 from dfs.`%s` order by kl limit 3", TABLE)
-      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
-      .ordered()
-      .baselineColumns("kl", "vl", "kl1", "vl1", "kl2", "vl2")
-      .baselineValues(100.0d, 100.0d, null, null, null, null)
-      .baselineValues(101.0d, 101.0d, null, null, null, null)
-      .baselineValues(102.0d, 102.0d, null, null, null, null);
-    builder.go();
-
-    builder = testBuilder()
-      .sqlQuery("select kl, vl, kl1, vl1, kl2, vl2  from dfs.`%s` order by kl1 limit 3", TABLE)
-      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
-      .ordered()
-      .baselineColumns("kl", "vl", "kl1", "vl1", "kl2", "vl2")
-      .baselineValues(null, null, 0l, 0l, null, null)
-      .baselineValues(null, null, 1l, 1l, null, null)
-      .baselineValues(null, null, 2l, 2l, null, null);
-    builder.go();
-
-    builder = testBuilder()
-      .sqlQuery("select kl, vl, kl1, vl1, kl2, vl2 from dfs.`%s` order by kl2 limit 3", TABLE)
-      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
-      .ordered()
-      .baselineColumns("kl", "vl", "kl1", "vl1", "kl2", "vl2")
-      .baselineValues(null, null, null, null, "200", "200")
-      .baselineValues(null, null, null, null, "201", "201")
-      .baselineValues(null, null, null, null, "202", "202");
-    builder.go();
-
-    // Since client can't handle new columns which are not in first batch, we won't test output of query.
-    // Query should run w/o any errors.
-    test("select * from dfs.`%s` order by kl limit 3", TABLE);
+    try {
+      TestBuilder builder = testBuilder()
+        .sqlQuery("select kl, vl, kl1, vl1, kl2, vl2 from dfs.`%s` order by kl limit 3", TABLE)
+        .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+        .optionSettingQueriesForTestQuery("alter session set `store.json.enable_v2_reader` = false")
+        .ordered()
+        .baselineColumns("kl", "vl", "kl1", "vl1", "kl2", "vl2")
+        .baselineValues(100.0d, 100.0d, null, null, null, null)
+        .baselineValues(101.0d, 101.0d, null, null, null, null)
+        .baselineValues(102.0d, 102.0d, null, null, null, null);
+      builder.go();
+
+      builder = testBuilder()
+        .sqlQuery("select kl, vl, kl1, vl1, kl2, vl2  from dfs.`%s` order by kl1 limit 3", TABLE)
+        .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+        .optionSettingQueriesForTestQuery("alter session set `store.json.enable_v2_reader` = false")
+        .ordered()
+        .baselineColumns("kl", "vl", "kl1", "vl1", "kl2", "vl2")
+        .baselineValues(null, null, 0l, 0l, null, null)
+        .baselineValues(null, null, 1l, 1l, null, null)
+        .baselineValues(null, null, 2l, 2l, null, null);
+      builder.go();
+
+      builder = testBuilder()
+        .sqlQuery("select kl, vl, kl1, vl1, kl2, vl2 from dfs.`%s` order by kl2 limit 3", TABLE)
+        .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+        .optionSettingQueriesForTestQuery("alter session set `store.json.enable_v2_reader` = false")
+        .ordered()
+        .baselineColumns("kl", "vl", "kl1", "vl1", "kl2", "vl2")
+        .baselineValues(null, null, null, null, "200", "200")
+        .baselineValues(null, null, null, null, "201", "201")
+        .baselineValues(null, null, null, null, "202", "202");
+      builder.go();
+
+      // Since client can't handle new columns which are not in first batch, we won't test output of query.
+      // Query should run w/o any errors.
+      test("select * from dfs.`%s` order by kl limit 3", TABLE);
+    } finally {
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
+      resetSessionOption(ExecConstants.ENABLE_V2_JSON_READER_KEY);
+    }
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinWithSchemaChanges.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinWithSchemaChanges.java
index e109557d55..3eba1f47a4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinWithSchemaChanges.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinWithSchemaChanges.java
@@ -32,6 +32,10 @@ import java.io.FileWriter;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 
+import static org.apache.drill.exec.ExecConstants.ENABLE_UNION_TYPE_KEY;
+import static org.apache.drill.exec.ExecConstants.ENABLE_V2_JSON_READER_KEY;
+import static org.apache.drill.exec.planner.physical.PlannerSettings.ENABLE_HASH_JOIN_OPTION;
+
 @Category(OperatorTest.class)
 public class TestMergeJoinWithSchemaChanges extends BaseTestQuery {
   public static final Path LEFT_DIR = Paths.get("mergejoin-schemachanges-left");
@@ -263,7 +267,9 @@ public class TestMergeJoinWithSchemaChanges extends BaseTestQuery {
 
     TestBuilder builder = testBuilder()
       .sqlQuery(query)
-      .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+      .disableSessionOption(ENABLE_HASH_JOIN_OPTION)
+      .enableSessionOption(ENABLE_UNION_TYPE_KEY)
+      .disableSessionOption(ENABLE_V2_JSON_READER_KEY)
       .unOrdered()
       .baselineColumns("kl", "vl", "kr", "vr", "kl1", "vl1", "kl2", "vl2", "kr1", "vr1", "kr2", "vr2");
 
@@ -278,7 +284,9 @@ public class TestMergeJoinWithSchemaChanges extends BaseTestQuery {
 
     builder = testBuilder()
       .sqlQuery(query)
-      .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+      .disableSessionOption(ENABLE_HASH_JOIN_OPTION)
+      .enableSessionOption(ENABLE_UNION_TYPE_KEY)
+      .disableSessionOption(ENABLE_V2_JSON_READER_KEY)
       .unOrdered()
       .baselineColumns("kl", "vl", "kr", "vr", "kl1", "vl1", "kl2", "vl2", "kr1", "vr1", "kr2", "vr2");
 
@@ -299,7 +307,9 @@ public class TestMergeJoinWithSchemaChanges extends BaseTestQuery {
 
     builder = testBuilder()
       .sqlQuery(query)
-      .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+      .disableSessionOption(ENABLE_HASH_JOIN_OPTION)
+      .enableSessionOption(ENABLE_UNION_TYPE_KEY)
+      .disableSessionOption(ENABLE_V2_JSON_READER_KEY)
       .unOrdered()
       .baselineColumns("kl", "vl", "kr", "vr", "kl1", "vl1", "kl2", "vl2", "kr1", "vr1", "kr2", "vr2");
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index 7b29091636..e277022072 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -17,13 +17,21 @@
  */
 package org.apache.drill.exec.physical.impl.lateraljoin;
 
+import ch.qos.logback.classic.Level;
 import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.aggregate.HashAggBatch;
+import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate;
+import org.apache.drill.exec.physical.impl.join.LateralJoinBatch;
+import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
 import org.apache.drill.test.TestBuilder;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -33,6 +41,8 @@ import static junit.framework.TestCase.fail;
 
 @Category(OperatorTest.class)
 public class TestE2EUnnestAndLateral extends ClusterTest {
+  private static LogFixture logFixture;
+  private final static Level CURRENT_LOG_LEVEL = Level.INFO;
 
   private static final String regularTestFile_1 = "cust_order_10_1.json";
   private static final String regularTestFile_2 = "cust_order_10_2.json";
@@ -48,6 +58,14 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
         .sessionOption(PlannerSettings.ENABLE_UNNEST_LATERAL_KEY, true)
         .maxParallelization(1);
     startCluster(builder);
+    logFixture = LogFixture.builder()
+      .toConsole()
+      .logger(HashAggBatch.class, CURRENT_LOG_LEVEL)
+      .logger(HashAggTemplate.class, CURRENT_LOG_LEVEL)
+      .logger(ScanBatch.class, CURRENT_LOG_LEVEL)
+      .logger(OperatorRecordBatch.class, CURRENT_LOG_LEVEL)
+      .logger(LateralJoinBatch.class, CURRENT_LOG_LEVEL)
+      .build();
   }
 
   /***********************************************************************************************
@@ -370,7 +388,6 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
       "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)" +
       " ORDER BY o_totalprice DESC) orders WHERE customer.c_custkey = '7180' LIMIT 1";
-
     testBuilder()
       .sqlQuery(sql)
       .ordered()
@@ -499,7 +516,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
    *****************************************************************************************/
 
   @Test
-  public void testMultipleBatchesLateral_WithLimitInParent() throws Exception {
+  public void testMultipleBatchesLateral_WithLimitInParent() {
     String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
       "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice  as o_totalprice FROM UNNEST(customer.c_orders) t(ord) WHERE t.ord.o_totalprice > 100000 LIMIT 2) " +
@@ -508,7 +525,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
   }
 
   @Test
-  public void testMultipleBatchesLateral_WithFilterInParent() throws Exception {
+  public void testMultipleBatchesLateral_WithFilterInParent() {
     String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
       "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord) WHERE t.ord.o_totalprice > 100000 LIMIT 2) " +
@@ -517,7 +534,8 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
   }
 
   @Test
-  public void testMultipleBatchesLateral_WithGroupByInParent() throws Exception {
+  @Ignore("Disable until SchemaChange in HashAgg fixed")
+  public void testMultipleBatchesLateral_WithGroupByInParent() {
     String sql = "SELECT customer.c_name, avg(orders.o_totalprice) AS avgPrice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
       "(SELECT t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord) WHERE t.ord.o_totalprice > 100000 LIMIT 2) " +
@@ -526,7 +544,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
   }
 
   @Test
-  public void testMultipleBatchesLateral_WithOrderByInParent() throws Exception {
+  public void testMultipleBatchesLateral_WithOrderByInParent() {
     String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
       "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)) orders " +
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestReaderLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestReaderLevelProjection.java
index e0cccb0b71..5c91df6b21 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestReaderLevelProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestReaderLevelProjection.java
@@ -503,9 +503,7 @@ public class TestReaderLevelProjection extends SubOperatorTest {
     final NullColumnBuilder builder = new NullBuilderBuilder().build();
     final ResolvedRow rootTuple = new ResolvedRow(builder);
     try {
-      new ExplicitSchemaProjection(
-          scanProj, tableSchema, rootTuple,
-          ScanTestUtils.resolvers());
+      new ExplicitSchemaProjection(scanProj, tableSchema, rootTuple, ScanTestUtils.resolvers());
       fail();
     } catch (final UserException e) {
       // Expected
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
index 32cddc0680..e74af23f9b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
@@ -35,6 +35,9 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.apache.drill.exec.ExecConstants.ENABLE_UNION_TYPE_KEY;
+import static org.apache.drill.exec.ExecConstants.ENABLE_V2_JSON_READER_KEY;
+
 @Category({SlowTest.class, OperatorTest.class})
 public class TestExternalSort extends BaseTestQuery {
 
@@ -156,7 +159,7 @@ public class TestExternalSort extends BaseTestQuery {
     builder.go();
   }
 
-  @Test
+  @Test // V2_UNION
   public void testNewColumns() throws Exception {
     final int record_count = 10000;
     final String tableDirName = "newColumns";
@@ -194,24 +197,29 @@ public class TestExternalSort extends BaseTestQuery {
       new JsonFileBuilder(rowSet).build(tableFile);
       rowSet.clear();
     }
-
-    // Test framework currently doesn't handle changing schema (i.e. new
-    // columns) on the client side
-    TestBuilder builder = testBuilder()
-        .sqlQuery("select a, b, c from dfs.`%s` order by a desc", tableDirName)
-        .ordered()
-        .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
-        .baselineColumns("a", "b", "c");
-    for (int i = record_count; i >= 0;) {
-      builder.baselineValues((long) i, (long) i--, null);
-      if (i >= 0) {
-        builder.baselineValues((long) i, null, (long) i--);
+    try {
+      // Test framework currently doesn't handle changing schema (i.e. new
+      // columns) on the client side
+      TestBuilder builder = testBuilder()
+          .sqlQuery("select a, b, c from dfs.`%s` order by a desc", tableDirName)
+          .ordered()
+          .enableSessionOption(ENABLE_UNION_TYPE_KEY)
+          .disableSessionOption(ENABLE_V2_JSON_READER_KEY)
+          .baselineColumns("a", "b", "c");
+      for (int i = record_count; i >= 0;) {
+        builder.baselineValues((long) i, (long) i--, null);
+        if (i >= 0) {
+          builder.baselineValues((long) i, null, (long) i--);
+        }
       }
-    }
-    builder.go();
+      builder.go();
 
-    // TODO: Useless test: just dumps to console
-    test("select * from dfs.`%s` order by a desc", tableDirName);
+      // TODO: Useless test: just dumps to console
+      test("select * from dfs.`%s` order by a desc", tableDirName);
+    } finally {
+      resetSessionOption(ENABLE_UNION_TYPE_KEY);
+      resetSessionOption(ENABLE_V2_JSON_READER_KEY);
+    }
   }
 
   private File createTableFile(final String tableDirName, final String fileName) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProtocol.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProtocol.java
index 14d5bc6e16..645cdc54c0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProtocol.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProtocol.java
@@ -52,9 +52,6 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.TupleWriter.UndefinedColumnException;
 import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.exec.physical.rowSet.RowSet;
-import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSetReader;
 import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java
index b4d25a0c5f..5449a3c42e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java
@@ -182,7 +182,7 @@ public class TestRestJson extends ClusterTest {
       System.out.println(
           client.queryBuilder().sql(sql).singletonLong());
       long end = System.currentTimeMillis();
-      System.out.println(String.format("COUNT(*) - Elapsed: %d ms", end - start));
+      System.out.printf("COUNT(*) - Elapsed: %d ms%n", end - start);
     }
 
     // Run the query and dump to a file to do a rough check
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
index 75f90868e4..5172e47fc9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
@@ -64,10 +64,12 @@ import java.nio.file.Paths;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -100,85 +102,87 @@ public class TestMetastoreCommands extends ClusterTest {
       .build();
 
   public static final Map<SchemaPath, ColumnStatistics<?>> TABLE_COLUMN_STATISTICS =
-      ImmutableMap.<SchemaPath, ColumnStatistics<?>>builder()
-      .put(SchemaPath.getSimplePath("o_shippriority"),
-          getColumnStatistics(0, 0, 120L, TypeProtos.MinorType.INT))
-      .put(SchemaPath.getSimplePath("o_orderstatus"),
-          getColumnStatistics("F", "P", 120L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("o_orderpriority"),
-          getColumnStatistics("1-URGENT", "5-LOW", 120L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("o_orderkey"),
-          getColumnStatistics(1, 1319, 120L, TypeProtos.MinorType.INT))
-      .put(SchemaPath.getSimplePath("o_clerk"),
-          getColumnStatistics("Clerk#000000004", "Clerk#000000995", 120L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("o_totalprice"),
-          getColumnStatistics(3266.69, 350110.21, 120L, TypeProtos.MinorType.FLOAT8))
-      .put(SchemaPath.getSimplePath("o_comment"),
+    new LinkedHashMap<SchemaPath, ColumnStatistics<?>>()
+    {{
+      put(SchemaPath.getSimplePath("o_shippriority"),
+          getColumnStatistics(0, 0, 120L, TypeProtos.MinorType.INT));
+      put(SchemaPath.getSimplePath("o_orderstatus"),
+          getColumnStatistics("F", "P", 120L, TypeProtos.MinorType.VARCHAR));
+      put(SchemaPath.getSimplePath("o_orderpriority"),
+          getColumnStatistics("1-URGENT", "5-LOW", 120L, TypeProtos.MinorType.VARCHAR));
+      put(SchemaPath.getSimplePath("o_orderkey"),
+          getColumnStatistics(1, 1319, 120L, TypeProtos.MinorType.INT));
+      put(SchemaPath.getSimplePath("o_clerk"),
+          getColumnStatistics("Clerk#000000004", "Clerk#000000995", 120L, TypeProtos.MinorType.VARCHAR));
+      put(SchemaPath.getSimplePath("o_totalprice"),
+          getColumnStatistics(3266.69, 350110.21, 120L, TypeProtos.MinorType.FLOAT8));
+      put(SchemaPath.getSimplePath("o_comment"),
           getColumnStatistics(" about the final platelets. dependen",
-              "zzle. carefully enticing deposits nag furio", 120L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("o_custkey"),
-          getColumnStatistics(25, 1498, 120L, TypeProtos.MinorType.INT))
-      .put(SchemaPath.getSimplePath("dir0"),
-          getColumnStatistics("1994", "1996", 120L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("dir1"),
-          getColumnStatistics("Q1", "Q4", 120L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("o_orderdate"),
-          getColumnStatistics(757382400000L, 850953600000L, 120L, TypeProtos.MinorType.DATE))
-      .build();
+              "zzle. carefully enticing deposits nag furio", 120L, TypeProtos.MinorType.VARCHAR));
+      put(SchemaPath.getSimplePath("o_custkey"),
+          getColumnStatistics(25, 1498, 120L, TypeProtos.MinorType.INT));
+      put(SchemaPath.getSimplePath("dir0"),
+          getColumnStatistics("1994", "1996", 120L, TypeProtos.MinorType.VARCHAR));
+      put(SchemaPath.getSimplePath("dir1"),
+          getColumnStatistics("Q1", "Q4", 120L, TypeProtos.MinorType.VARCHAR));
+      put(SchemaPath.getSimplePath("o_orderdate"),
+          getColumnStatistics(757382400000L, 850953600000L, 120L, TypeProtos.MinorType.DATE));
+    }};
 
   public static final Map<SchemaPath, ColumnStatistics<?>> DIR0_1994_SEGMENT_COLUMN_STATISTICS =
-      ImmutableMap.<SchemaPath, ColumnStatistics<?>>builder()
-      .put(SchemaPath.getSimplePath("o_shippriority"),
-          getColumnStatistics(0, 0, 40L, TypeProtos.MinorType.INT))
-      .put(SchemaPath.getSimplePath("o_orderstatus"),
-          getColumnStatistics("F", "F", 40L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("o_orderpriority"),
-          getColumnStatistics("1-URGENT", "5-LOW", 40L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("o_orderkey"),
-          getColumnStatistics(5, 1031, 40L, TypeProtos.MinorType.INT))
-      .put(SchemaPath.getSimplePath("o_clerk"),
-          getColumnStatistics("Clerk#000000004", "Clerk#000000973", 40L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("o_totalprice"),
-          getColumnStatistics(3266.69, 350110.21, 40L, TypeProtos.MinorType.FLOAT8))
-      .put(SchemaPath.getSimplePath("o_comment"),
+    new LinkedHashMap<SchemaPath, ColumnStatistics<?>>()
+    {{
+        put(SchemaPath.getSimplePath("o_shippriority"),
+          getColumnStatistics(0, 0, 40L, TypeProtos.MinorType.INT));
+        put(SchemaPath.getSimplePath("o_orderstatus"),
+          getColumnStatistics("F", "F", 40L, TypeProtos.MinorType.VARCHAR));
+        put(SchemaPath.getSimplePath("o_orderpriority"),
+          getColumnStatistics("1-URGENT", "5-LOW", 40L, TypeProtos.MinorType.VARCHAR));
+        put(SchemaPath.getSimplePath("o_orderkey"),
+          getColumnStatistics(5, 1031, 40L, TypeProtos.MinorType.INT));
+        put(SchemaPath.getSimplePath("o_clerk"),
+          getColumnStatistics("Clerk#000000004", "Clerk#000000973", 40L, TypeProtos.MinorType.VARCHAR));
+        put(SchemaPath.getSimplePath("o_totalprice"),
+          getColumnStatistics(3266.69, 350110.21, 40L, TypeProtos.MinorType.FLOAT8));
+        put(SchemaPath.getSimplePath("o_comment"),
           getColumnStatistics(" accounts nag slyly. ironic, ironic accounts wake blithel",
-              "yly final requests over the furiously regula", 40L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("o_custkey"),
-          getColumnStatistics(25, 1469, 40L, TypeProtos.MinorType.INT))
-      .put(SchemaPath.getSimplePath("dir0"),
-          getColumnStatistics("1994", "1994", 40L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("dir1"),
-          getColumnStatistics("Q1", "Q4", 40L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("o_orderdate"),
-          getColumnStatistics(757382400000L, 788140800000L, 40L, TypeProtos.MinorType.DATE))
-      .build();
+            "yly final requests over the furiously regula", 40L, TypeProtos.MinorType.VARCHAR));
+        put(SchemaPath.getSimplePath("o_custkey"),
+          getColumnStatistics(25, 1469, 40L, TypeProtos.MinorType.INT));
+        put(SchemaPath.getSimplePath("dir0"),
+          getColumnStatistics("1994", "1994", 40L, TypeProtos.MinorType.VARCHAR));
+        put(SchemaPath.getSimplePath("dir1"),
+          getColumnStatistics("Q1", "Q4", 40L, TypeProtos.MinorType.VARCHAR));
+        put(SchemaPath.getSimplePath("o_orderdate"),
+          getColumnStatistics(757382400000L, 788140800000L, 40L, TypeProtos.MinorType.DATE));
+    }};
 
   public static final Map<SchemaPath, ColumnStatistics<?>> DIR0_1994_Q1_SEGMENT_COLUMN_STATISTICS =
-      ImmutableMap.<SchemaPath, ColumnStatistics<?>>builder()
-      .put(SchemaPath.getSimplePath("o_shippriority"),
-          getColumnStatistics(0, 0, 10L, TypeProtos.MinorType.INT))
-      .put(SchemaPath.getSimplePath("o_orderstatus"),
-          getColumnStatistics("F", "F", 10L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("o_orderpriority"),
-          getColumnStatistics("1-URGENT", "5-LOW", 10L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("o_orderkey"),
-          getColumnStatistics(66, 833, 10L, TypeProtos.MinorType.INT))
-      .put(SchemaPath.getSimplePath("o_clerk"),
-          getColumnStatistics("Clerk#000000062", "Clerk#000000973", 10L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("o_totalprice"),
-          getColumnStatistics(3266.69, 132531.73, 10L, TypeProtos.MinorType.FLOAT8))
-      .put(SchemaPath.getSimplePath("o_comment"),
+    new LinkedHashMap<SchemaPath, ColumnStatistics<?>>() {{
+        put(SchemaPath.getSimplePath("o_shippriority"),
+          getColumnStatistics(0, 0, 10L, TypeProtos.MinorType.INT));
+        put(SchemaPath.getSimplePath("o_orderstatus"),
+          getColumnStatistics("F", "F", 10L, TypeProtos.MinorType.VARCHAR));
+        put(SchemaPath.getSimplePath("o_orderpriority"),
+          getColumnStatistics("1-URGENT", "5-LOW", 10L, TypeProtos.MinorType.VARCHAR));
+        put(SchemaPath.getSimplePath("o_orderkey"),
+          getColumnStatistics(66, 833, 10L, TypeProtos.MinorType.INT));
+        put(SchemaPath.getSimplePath("o_clerk"),
+          getColumnStatistics("Clerk#000000062", "Clerk#000000973", 10L, TypeProtos.MinorType.VARCHAR));
+        put(SchemaPath.getSimplePath("o_totalprice"),
+          getColumnStatistics(3266.69, 132531.73, 10L, TypeProtos.MinorType.FLOAT8));
+        put(SchemaPath.getSimplePath("o_comment"),
           getColumnStatistics(" special pinto beans use quickly furiously even depende",
-              "y pending requests integrate", 10L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("o_custkey"),
-          getColumnStatistics(392, 1411, 10L, TypeProtos.MinorType.INT))
-      .put(SchemaPath.getSimplePath("dir0"),
-          getColumnStatistics("1994", "1994", 10L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("dir1"),
-          getColumnStatistics("Q1", "Q1", 10L, TypeProtos.MinorType.VARCHAR))
-      .put(SchemaPath.getSimplePath("o_orderdate"),
-          getColumnStatistics(757382400000L, 764640000000L, 10L, TypeProtos.MinorType.DATE))
-      .build();
+            "y pending requests integrate", 10L, TypeProtos.MinorType.VARCHAR));
+        put(SchemaPath.getSimplePath("o_custkey"),
+          getColumnStatistics(392, 1411, 10L, TypeProtos.MinorType.INT));
+        put(SchemaPath.getSimplePath("dir0"),
+          getColumnStatistics("1994", "1994", 10L, TypeProtos.MinorType.VARCHAR));
+        put(SchemaPath.getSimplePath("dir1"),
+          getColumnStatistics("Q1", "Q1", 10L, TypeProtos.MinorType.VARCHAR));
+        put(SchemaPath.getSimplePath("o_orderdate"),
+          getColumnStatistics(757382400000L, 764640000000L, 10L, TypeProtos.MinorType.DATE));
+    }};
 
   public static final MetadataInfo TABLE_META_INFO = MetadataInfo.builder()
       .type(MetadataType.TABLE)
@@ -3564,15 +3568,16 @@ public class TestMetastoreCommands extends ClusterTest {
     }
   }
 
-  public static <T> ColumnStatistics<T> getColumnStatistics(T minValue, T maxValue,
-      long rowCount, TypeProtos.MinorType minorType) {
+  public static <T> ColumnStatistics<T> getColumnStatistics(T minValue, T maxValue, long rowCount,
+                                                            TypeProtos.MinorType minorType) {
     return new ColumnStatistics<>(
-        Arrays.asList(
-            new StatisticsHolder<>(minValue, ColumnStatisticsKind.MIN_VALUE),
-            new StatisticsHolder<>(maxValue, ColumnStatisticsKind.MAX_VALUE),
-            new StatisticsHolder<>(rowCount, TableStatisticsKind.ROW_COUNT),
-            new StatisticsHolder<>(rowCount, ColumnStatisticsKind.NON_NULL_VALUES_COUNT),
-            new StatisticsHolder<>(0L, ColumnStatisticsKind.NULLS_COUNT)),
+      new ArrayList() {{
+          add(new StatisticsHolder<>(minValue, ColumnStatisticsKind.MIN_VALUE));
+          add(new StatisticsHolder<>(maxValue, ColumnStatisticsKind.MAX_VALUE));
+          add(new StatisticsHolder<>(rowCount, TableStatisticsKind.ROW_COUNT));
+          add(new StatisticsHolder<>(rowCount, ColumnStatisticsKind.NON_NULL_VALUES_COUNT));
+          add(new StatisticsHolder<>(0L, ColumnStatisticsKind.NULLS_COUNT));
+        }},
         minorType);
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java
index b48f29fc22..ae198ad731 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java
@@ -27,7 +27,7 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.WorkspaceConfig;
-import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
+import org.apache.drill.exec.store.easy.json.JSONFormatConfig;
 import org.apache.drill.exec.store.easy.text.TextFormatConfig;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestImplicitFileColumns.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestImplicitFileColumns.java
index 923f12d05c..d533c7d268 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestImplicitFileColumns.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestImplicitFileColumns.java
@@ -164,9 +164,9 @@ public class TestImplicitFileColumns extends BaseTestQuery {
   @Test
   public void testStarColumnJson() throws Exception {
     SchemaBuilder schemaBuilder = new SchemaBuilder()
-        .addNullable("dir0", TypeProtos.MinorType.VARCHAR)
         .addNullable("id", TypeProtos.MinorType.BIGINT)
-        .addNullable("name", TypeProtos.MinorType.VARCHAR);
+        .addNullable("name", TypeProtos.MinorType.VARCHAR)
+        .addNullable("dir0", TypeProtos.MinorType.VARCHAR);
     final BatchSchema expectedSchema = new BatchSchemaBuilder()
         .withSchemaBuilder(schemaBuilder)
         .build();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestRepeatedList.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestRepeatedList.java
index 997655eaa7..631b17b59d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestRepeatedList.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestRepeatedList.java
@@ -445,7 +445,7 @@ public class TestRepeatedList extends BaseJsonLoaderTest {
         .addSingleCol(objArray())
         .addSingleCol(objArray())
         .addSingleCol(singleObjArray(strArray()))
-        .addSingleCol(objArray(strArray("\"foo\""), strArray("20")))
+        .addSingleCol(objArray(strArray("foo"), strArray("20")))
         .build();
     RowSetUtilities.verify(expected, results);
     assertNull(loader.next());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestUnknowns.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestUnknowns.java
index 8f8ad2425f..1d227c08cc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestUnknowns.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestUnknowns.java
@@ -246,7 +246,7 @@ public class TestUnknowns extends BaseJsonLoaderTest {
     RowSet expected = fixture.rowSetBuilder(expectedSchema)
         .addSingleCol(strArray())
         .addSingleCol(strArray("null"))
-        .addSingleCol(strArray("\"foo\""))
+        .addSingleCol(strArray("foo"))
         .addSingleCol(strArray("10", "[20]", "{\"b\": 30}"))
         .build();
     RowSetUtilities.verify(expected, results);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonEscapeAnyChar.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonEscapeAnyChar.java
similarity index 92%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonEscapeAnyChar.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonEscapeAnyChar.java
index dcd65a6f92..cb148b8807 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonEscapeAnyChar.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonEscapeAnyChar.java
@@ -15,12 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.vector.complex.writer;
+package org.apache.drill.exec.store.json;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.vector.complex.writer.TestJsonReader.TestWrapper;
+import org.apache.drill.exec.store.json.TestJsonReader.TestWrapper;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.junit.After;
@@ -59,7 +59,7 @@ public class TestJsonEscapeAnyChar extends ClusterTest {
 
   @Test
   public void testwithOptionEnabled() throws Exception {
-    runBoth(() -> doTestWithOptionEnabled());
+    runBoth(this::doTestWithOptionEnabled);
   }
 
   private void doTestWithOptionEnabled() throws Exception {
@@ -76,14 +76,16 @@ public class TestJsonEscapeAnyChar extends ClusterTest {
       resetJsonReaderEscapeAnyChar();
     }
   }
+
   @Test
   public void testwithOptionDisabled() throws Exception {
-    runBoth(() -> doTestWithOptionDisabled());
+    runBoth(this::doTestWithOptionDisabled);
   }
 
   private void doTestWithOptionDisabled() throws Exception {
     try {
-      queryBuilder().sql(QUERY)
+      queryBuilder()
+        .sql(QUERY)
         .run();
     } catch (UserRemoteException e) {
       assertThat(e.getMessage(), containsString("DATA_READ ERROR: Error parsing JSON - Unrecognized character escape"));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonModes.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonModes.java
index 19b0b7be72..d6ce06496e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonModes.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonModes.java
@@ -22,6 +22,7 @@ import org.apache.drill.categories.RowSetTest;
 
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.physical.rowSet.DirectRowSet;
 import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
@@ -30,6 +31,7 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -45,6 +47,7 @@ public class TestJsonModes extends ClusterTest {
   @BeforeClass
   public static void setup() throws Exception {
     ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+    client.alterSession(ExecConstants.ENABLE_V2_JSON_READER_KEY, false);
   }
 
   @Test
@@ -159,4 +162,9 @@ public class TestJsonModes extends ClusterTest {
     long cnt = queryBuilder().physical(plan).singletonLong();
     assertEquals("Counts should match", 4L, cnt);
   }
+
+  @AfterClass
+  public static void resetOptions() {
+    client.resetSession(ExecConstants.ENABLE_V2_JSON_READER_KEY);
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonNanInf.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonNanInf.java
similarity index 87%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonNanInf.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonNanInf.java
index 5b440a740b..e556ec16ea 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonNanInf.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonNanInf.java
@@ -15,13 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.vector.complex.writer;
+package org.apache.drill.exec.store.json;
 
 import static org.apache.drill.test.TestBuilder.mapOf;
 import static org.hamcrest.CoreMatchers.containsString;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 import java.io.File;
@@ -36,12 +36,11 @@ import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.vector.VarCharVector;
-import org.apache.drill.exec.vector.complex.writer.TestJsonReader.TestWrapper;
+import org.apache.drill.exec.store.json.TestJsonReader.TestWrapper;
 import org.apache.drill.test.BaseTestQuery;
 import org.junit.Ignore;
 import org.junit.Test;
 
-// TODO: Move to JSON reader package after code review
 // TODO: Split or rename: this tests mor than NanInf
 public class TestJsonNanInf extends BaseTestQuery {
 
@@ -58,7 +57,7 @@ public class TestJsonNanInf extends BaseTestQuery {
 
   @Test
   public void testNanInfSelect() throws Exception {
-    runBoth(() -> doTestNanInfSelect());
+    runBoth(this::doTestNanInfSelect);
   }
 
   private void doTestNanInfSelect() throws Exception {
@@ -73,8 +72,7 @@ public class TestJsonNanInf extends BaseTestQuery {
         .unOrdered()
         .baselineColumns("nan_col", "inf_col")
         .baselineValues(Double.NaN, Double.POSITIVE_INFINITY)
-        .build()
-        .run();
+        .go();
     } finally {
       FileUtils.deleteQuietly(file);
     }
@@ -83,7 +81,7 @@ public class TestJsonNanInf extends BaseTestQuery {
   @Test
   @Ignore // see DRILL-6018
   public void testExcludePositiveInfinity() throws Exception {
-    runBoth(() -> doTestExcludePositiveInfinity());
+    runBoth(this::doTestExcludePositiveInfinity);
   }
 
   private void doTestExcludePositiveInfinity() throws Exception {
@@ -99,8 +97,7 @@ public class TestJsonNanInf extends BaseTestQuery {
         .unOrdered()
         .baselineColumns("inf_col")
         .baselineValues(5.0)
-        .build()
-        .run();
+        .go();
     } finally {
       FileUtils.deleteQuietly(file);
     }
@@ -109,7 +106,7 @@ public class TestJsonNanInf extends BaseTestQuery {
   @Test
   @Ignore // see DRILL-6018
   public void testExcludeNegativeInfinity() throws Exception {
-    runBoth(() -> doTestExcludeNegativeInfinity());
+    runBoth(this::doTestExcludeNegativeInfinity);
   }
 
   private void doTestExcludeNegativeInfinity() throws Exception {
@@ -125,8 +122,7 @@ public class TestJsonNanInf extends BaseTestQuery {
         .unOrdered()
         .baselineColumns("inf_col")
         .baselineValues(5.0)
-        .build()
-        .run();
+        .go();
     } finally {
       FileUtils.deleteQuietly(file);
     }
@@ -135,7 +131,7 @@ public class TestJsonNanInf extends BaseTestQuery {
   @Test
   @Ignore // see DRILL-6018
   public void testIncludePositiveInfinity() throws Exception {
-    runBoth(() -> doTestIncludePositiveInfinity());
+    runBoth(this::doTestIncludePositiveInfinity);
   }
 
   private void doTestIncludePositiveInfinity() throws Exception {
@@ -160,7 +156,7 @@ public class TestJsonNanInf extends BaseTestQuery {
 
   @Test
   public void testExcludeNan() throws Exception {
-    runBoth(() -> doTestExcludeNan());
+    runBoth(this::doTestExcludeNan);
   }
 
   private void doTestExcludeNan() throws Exception {
@@ -176,8 +172,7 @@ public class TestJsonNanInf extends BaseTestQuery {
           .unOrdered()
           .baselineColumns("nan_col")
           .baselineValues(5.0)
-          .build()
-          .run();
+          .go();
     } finally {
       FileUtils.deleteQuietly(file);
     }
@@ -185,7 +180,7 @@ public class TestJsonNanInf extends BaseTestQuery {
 
   @Test
   public void testIncludeNan() throws Exception {
-    runBoth(() -> doTestIncludeNan());
+    runBoth(this::doTestIncludeNan);
   }
 
   private void doTestIncludeNan() throws Exception {
@@ -201,8 +196,7 @@ public class TestJsonNanInf extends BaseTestQuery {
           .unOrdered()
           .baselineColumns("nan_col")
           .baselineValues(Double.NaN)
-          .build()
-          .run();
+          .go();
     } finally {
       FileUtils.deleteQuietly(file);
     }
@@ -210,7 +204,7 @@ public class TestJsonNanInf extends BaseTestQuery {
 
   @Test
   public void testNanInfFailure() throws Exception {
-    runBoth(() -> doTestNanInfFailure());
+    runBoth(this::doTestNanInfFailure);
   }
 
   private void doTestNanInfFailure() throws Exception {
@@ -232,7 +226,7 @@ public class TestJsonNanInf extends BaseTestQuery {
 
   @Test
   public void testCreateTableNanInf() throws Exception {
-    runBoth(() -> doTestCreateTableNanInf());
+    runBoth(this::doTestCreateTableNanInf);
   }
 
   private void doTestCreateTableNanInf() throws Exception {
@@ -249,11 +243,11 @@ public class TestJsonNanInf extends BaseTestQuery {
       File resultFile = new File(new File(file.getParent(), newTable),"0_0_0.json");
       String resultJson = FileUtils.readFileToString(resultFile);
       int nanIndex = resultJson.indexOf("NaN");
-      assertFalse("`NaN` must not be enclosed with \"\" ", resultJson.charAt(nanIndex - 1) == '"');
-      assertFalse("`NaN` must not be enclosed with \"\" ", resultJson.charAt(nanIndex + "NaN".length()) == '"');
+      assertNotEquals("`NaN` must not be enclosed with \"\" ", '"', resultJson.charAt(nanIndex - 1));
+      assertNotEquals("`NaN` must not be enclosed with \"\" ", '"', resultJson.charAt(nanIndex + "NaN".length()));
       int infIndex = resultJson.indexOf("Infinity");
-      assertFalse("`Infinity` must not be enclosed with \"\" ", resultJson.charAt(infIndex - 1) == '"');
-      assertFalse("`Infinity` must not be enclosed with \"\" ", resultJson.charAt(infIndex + "Infinity".length()) == '"');
+      assertNotEquals("`Infinity` must not be enclosed with \"\" ", '"', resultJson.charAt(infIndex - 1));
+      assertNotEquals("`Infinity` must not be enclosed with \"\" ", '"', resultJson.charAt(infIndex + "Infinity".length()));
     } finally {
       test("drop table if exists dfs.`%s`", newTable);
       FileUtils.deleteQuietly(file);
@@ -262,7 +256,7 @@ public class TestJsonNanInf extends BaseTestQuery {
 
   @Test
   public void testConvertFromJsonFunction() throws Exception {
-    runBoth(() -> doTestConvertFromJsonFunction());
+    runBoth(this::doTestConvertFromJsonFunction);
   }
 
   private void doTestConvertFromJsonFunction() throws Exception {
@@ -276,8 +270,7 @@ public class TestJsonNanInf extends BaseTestQuery {
           .unOrdered()
           .baselineColumns("col")
           .baselineValues(mapOf("nan_col", Double.NaN))
-          .build()
-          .run();
+          .go();
     } finally {
       FileUtils.deleteQuietly(file);
     }
@@ -313,7 +306,7 @@ public class TestJsonNanInf extends BaseTestQuery {
       FileUtils.writeStringToFile(file, csv);
       List<QueryDataBatch> results = testSqlWithResults(query);
       RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
-      assertTrue("Query result must contain 1 row", results.size() == 1);
+      assertEquals("Query result must contain 1 row", 1, results.size());
       QueryDataBatch batch = results.get(0);
 
       batchLoader.load(batch.getHeader().getDef(), batch.getData());
@@ -321,8 +314,8 @@ public class TestJsonNanInf extends BaseTestQuery {
       // ensuring that `NaN` token ARE NOT enclosed with double quotes
       String resultJson = vw.getValueVector().getAccessor().getObject(0).toString();
       int nanIndex = resultJson.indexOf("NaN");
-      assertFalse("`NaN` must not be enclosed with \"\" ", resultJson.charAt(nanIndex - 1) == '"');
-      assertFalse("`NaN` must not be enclosed with \"\" ", resultJson.charAt(nanIndex + "NaN".length()) == '"');
+      assertNotEquals("`NaN` must not be enclosed with \"\" ", '"', resultJson.charAt(nanIndex - 1));
+      assertNotEquals("`NaN` must not be enclosed with \"\" ", '"', resultJson.charAt(nanIndex + "NaN".length()));
       batch.release();
       batchLoader.clear();
     } finally {
@@ -339,13 +332,12 @@ public class TestJsonNanInf extends BaseTestQuery {
           .unOrdered()
           .baselineColumns("sin_col", "sum_col")
           .baselineValues(Double.NaN, Double.POSITIVE_INFINITY)
-          .build()
-          .run();
+          .go();
   }
 
   @Test
   public void testOrderByWithNaN() throws Exception {
-    runBoth(() -> doTestOrderByWithNaN());
+    runBoth(this::doTestOrderByWithNaN);
   }
 
   private void doTestOrderByWithNaN() throws Exception {
@@ -368,8 +360,7 @@ public class TestJsonNanInf extends BaseTestQuery {
           .baselineValues("obj1", Double.NaN)
           .baselineValues("obj2", Double.NEGATIVE_INFINITY)
           .baselineValues("obj2", Double.NaN)
-          .build()
-          .run();
+          .go();
     } finally {
       test("alter session set `%s` = false", ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE);
       FileUtils.deleteQuietly(file);
@@ -378,7 +369,7 @@ public class TestJsonNanInf extends BaseTestQuery {
 
   @Test
   public void testNestedLoopJoinWithNaN() throws Exception {
-    runBoth(() -> doTestNestedLoopJoinWithNaN());
+    runBoth(this::doTestNestedLoopJoinWithNaN);
   }
 
   private void doTestNestedLoopJoinWithNaN() throws Exception {
@@ -410,8 +401,7 @@ public class TestJsonNanInf extends BaseTestQuery {
           .baselineValues("object2")
           .baselineValues("object3")
           .baselineValues("object4")
-          .build()
-          .run();
+          .go();
     } finally {
       test("alter session set `%s` = false", ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE);
       JoinTestBase.resetJoinOptions();
@@ -421,7 +411,7 @@ public class TestJsonNanInf extends BaseTestQuery {
 
   @Test
   public void testHashJoinWithNaN() throws Exception {
-    runBoth(() -> doTestHashJoinWithNaN());
+    runBoth(this::doTestHashJoinWithNaN);
   }
 
   private void doTestHashJoinWithNaN() throws Exception {
@@ -439,13 +429,12 @@ public class TestJsonNanInf extends BaseTestQuery {
       FileUtils.writeStringToFile(file, json);
       test("alter session set `%s` = true", ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE);
       testBuilder()
-              .sqlQuery(query)
-              .ordered()
-              .baselineColumns("name")
-              .baselineValues("obj1")
-              .baselineValues("obj2")
-              .build()
-              .run();
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("name")
+          .baselineValues("obj1")
+          .baselineValues("obj2")
+          .go();
     } finally {
       test("alter session set `%s` = false", ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE);
       JoinTestBase.resetJoinOptions();
@@ -455,7 +444,7 @@ public class TestJsonNanInf extends BaseTestQuery {
 
   @Test
   public void testMergeJoinWithNaN() throws Exception {
-    runBoth(() -> doTestMergeJoinWithNaN());
+    runBoth(this::doTestMergeJoinWithNaN);
   }
 
   private void doTestMergeJoinWithNaN() throws Exception {
@@ -473,13 +462,12 @@ public class TestJsonNanInf extends BaseTestQuery {
       FileUtils.writeStringToFile(file, json);
       test("alter session set `%s` = true", ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE);
       testBuilder()
-              .sqlQuery(query)
-              .ordered()
-              .baselineColumns("name")
-              .baselineValues("obj1")
-              .baselineValues("obj2")
-              .build()
-              .run();
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("name")
+          .baselineValues("obj1")
+          .baselineValues("obj2")
+          .go();
     } finally {
       test("alter session set `%s` = false", ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE);
       JoinTestBase.resetJoinOptions();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonReader.java
similarity index 89%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonReader.java
index bd2517ceea..4ec019b5b6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonReader.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.vector.complex.writer;
+package org.apache.drill.exec.store.json;
 
 import static org.apache.drill.test.TestBuilder.listOf;
 import static org.apache.drill.test.TestBuilder.mapOf;
@@ -53,7 +53,6 @@ import org.slf4j.LoggerFactory;
  * <li><tt>TestJsonReaderQuery</tt></li>
  * </ul>
  */
-//TODO: Move to JSON reader package after code review
 @Category(RowSetTest.class)
 public class TestJsonReader extends BaseTestQuery {
   private static final Logger logger = LoggerFactory.getLogger(TestJsonReader.class);
@@ -64,11 +63,11 @@ public class TestJsonReader extends BaseTestQuery {
     dirTestWatcher.copyResourceToRoot(Paths.get("vector","complex", "writer"));
   }
 
-  private void enableV2Reader(boolean enable) throws Exception {
+  private void enableV2Reader(boolean enable) {
     alterSession(ExecConstants.ENABLE_V2_JSON_READER_KEY, enable);
   }
 
-  private void resetV2Reader() throws Exception {
+  private void resetV2Reader() {
     resetSessionOption(ExecConstants.ENABLE_V2_JSON_READER_KEY);
   }
 
@@ -89,7 +88,7 @@ public class TestJsonReader extends BaseTestQuery {
 
    @Test
   public void schemaChange() throws Exception {
-    runBoth(() -> doSchemaChange());
+    runBoth(this::doSchemaChange);
   }
 
   private void doSchemaChange() throws Exception {
@@ -98,7 +97,7 @@ public class TestJsonReader extends BaseTestQuery {
 
   @Test
   public void testSplitAndTransferFailure() throws Exception {
-    runBoth(() -> doTestSplitAndTransferFailure());
+    runBoth(this::doTestSplitAndTransferFailure);
   }
 
   private void doTestSplitAndTransferFailure() throws Exception {
@@ -131,7 +130,7 @@ public class TestJsonReader extends BaseTestQuery {
 
   @Test // DRILL-1824
   public void schemaChangeValidate() throws Exception {
-    runBoth(() -> doSchemaChangeValidate());
+    runBoth(this::doSchemaChangeValidate);
   }
 
   private void doSchemaChangeValidate() throws Exception {
@@ -179,6 +178,7 @@ public class TestJsonReader extends BaseTestQuery {
               .sqlQuery("select * from cp.`jsoninput/union/a.json`")
               .ordered()
               .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+              .optionSettingQueriesForTestQuery("alter session set `store.json.enable_v2_reader` = false")
               .baselineColumns("field1", "field2")
               .baselineValues(
                       1L, 1.2
@@ -221,6 +221,7 @@ public class TestJsonReader extends BaseTestQuery {
               ).go();
     } finally {
       resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
+      resetSessionOption(ExecConstants.ENABLE_V2_JSON_READER_KEY);
     }
   }
 
@@ -235,11 +236,13 @@ public class TestJsonReader extends BaseTestQuery {
                 "from cp.`jsoninput/union/a.json`) where a is not null")
               .ordered()
               .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+              .optionSettingQueriesForTestQuery("alter session set `store.json.enable_v2_reader` = false")
               .baselineColumns("a", "type")
               .baselineValues(13L, "BIGINT")
               .go();
     } finally {
       resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
+      resetSessionOption(ExecConstants.ENABLE_V2_JSON_READER_KEY);
     }
   }
 
@@ -254,6 +257,7 @@ public class TestJsonReader extends BaseTestQuery {
                 "when is_map(field1) then t.field1.inner1 end f1 from cp.`jsoninput/union/a.json` t")
               .ordered()
               .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+              .optionSettingQueriesForTestQuery("alter session set `store.json.enable_v2_reader` = false")
               .baselineColumns("f1")
               .baselineValues(1L)
               .baselineValues(2L)
@@ -262,6 +266,7 @@ public class TestJsonReader extends BaseTestQuery {
               .go();
     } finally {
       resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
+      resetSessionOption(ExecConstants.ENABLE_V2_JSON_READER_KEY);
     }
   }
 
@@ -271,17 +276,19 @@ public class TestJsonReader extends BaseTestQuery {
   public void testSumWithTypeCase() throws Exception {
     try {
       testBuilder()
-              .sqlQuery("select sum(cast(f1 as bigint)) sum_f1 from " +
-                "(select case when is_bigint(field1) then field1 " +
-                "when is_list(field1) then field1[0] when is_map(field1) then t.field1.inner1 end f1 " +
-                "from cp.`jsoninput/union/a.json` t)")
-              .ordered()
-              .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
-              .baselineColumns("sum_f1")
-              .baselineValues(9L)
-              .go();
+          .sqlQuery("select sum(cast(f1 as bigint)) sum_f1 from " +
+            "(select case when is_bigint(field1) then field1 " +
+            "when is_list(field1) then field1[0] when is_map(field1) then t.field1.inner1 end f1 " +
+            "from cp.`jsoninput/union/a.json` t)")
+          .ordered()
+          .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+          .optionSettingQueriesForTestQuery("alter session set `store.json.enable_v2_reader` = false")
+          .baselineColumns("sum_f1")
+          .baselineValues(9L)
+          .go();
     } finally {
       resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
+      resetSessionOption(ExecConstants.ENABLE_V2_JSON_READER_KEY);
     }
   }
 
@@ -294,6 +301,7 @@ public class TestJsonReader extends BaseTestQuery {
               .sqlQuery("select a + b c from cp.`jsoninput/union/b.json`")
               .ordered()
               .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+              .optionSettingQueriesForTestQuery("alter session set `store.json.enable_v2_reader` = false")
               .baselineColumns("c")
               .baselineValues(3L)
               .baselineValues(7.0)
@@ -301,6 +309,7 @@ public class TestJsonReader extends BaseTestQuery {
               .go();
     } finally {
       resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
+      resetSessionOption(ExecConstants.ENABLE_V2_JSON_READER_KEY);
     }
   }
 
@@ -322,11 +331,13 @@ public class TestJsonReader extends BaseTestQuery {
               .sqlQuery("select sum(cast(case when `type` = 'map' then t.data.a else data end as bigint)) `sum` from dfs.tmp.multi_batch t")
               .ordered()
               .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+              .optionSettingQueriesForTestQuery("alter session set `store.json.enable_v2_reader` = false")
               .baselineColumns("sum")
               .baselineValues(20000L)
               .go();
     } finally {
       resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
+      resetSessionOption(ExecConstants.ENABLE_V2_JSON_READER_KEY);
     }
   }
 
@@ -353,15 +364,17 @@ public class TestJsonReader extends BaseTestQuery {
               .sqlQuery("select sum(cast(case when `type` = 'map' then t.data.a else data end as bigint)) `sum` from dfs.tmp.multi_file t")
               .ordered()
               .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+              .optionSettingQueriesForTestQuery("alter session set `store.json.enable_v2_reader` = false")
               .baselineColumns("sum")
               .baselineValues(20000L)
               .go();
     } finally {
       resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
+      resetSessionOption(ExecConstants.ENABLE_V2_JSON_READER_KEY);
     }
   }
 
-  // V1 version of the test. See TsetJsonReaderQueries for the V2 version.
+  // V1 version of the test. See TestJsonReaderQueries for the V2 version.
 
   @Test
   public void drill_4032() throws Exception {
@@ -377,10 +390,10 @@ public class TestJsonReader extends BaseTestQuery {
     os.write("{\"col1\": \"val1\",\"col2\": null}".getBytes());
     os.flush();
     os.close();
-    testNoResult("select t.col2.col3 from dfs.tmp.drill_4032 t");
+    runBoth(() -> testNoResult("select t.col2.col3 from dfs.tmp.drill_4032 t"));
   }
 
-  @Test
+  @Test // todo: place this logic to beforeClass. And divide doDrill_4479 into 3 tests
   public void drill_4479() throws Exception {
     File table_dir = dirTestWatcher.makeTestTmpSubDir(Paths.get("drill_4479"));
     table_dir.mkdir();
@@ -394,7 +407,7 @@ public class TestJsonReader extends BaseTestQuery {
     os.flush();
     os.close();
 
-    runBoth(() -> doDrill_4479());
+    runBoth(this::doDrill_4479);
   }
 
   private void doDrill_4479() throws Exception {
@@ -436,7 +449,7 @@ public class TestJsonReader extends BaseTestQuery {
       writer.write("{ \"a\": { \"b\": { \"c\": [] }, \"c\": [] } }");
     }
 
-    runBoth(() -> doTestFlattenEmptyArrayWithAllTextMode());
+    runBoth(this::doTestFlattenEmptyArrayWithAllTextMode);
   }
 
   private void doTestFlattenEmptyArrayWithAllTextMode() throws Exception {
@@ -468,7 +481,7 @@ public class TestJsonReader extends BaseTestQuery {
       writer.write("{ \"a\": { \"b\": { \"c\": [] }, \"c\": [] } }");
     }
 
-    runBoth(() -> doTestFlattenEmptyArrayWithUnionType());
+    runBoth(this::doTestFlattenEmptyArrayWithUnionType);
   }
 
   private void doTestFlattenEmptyArrayWithUnionType() throws Exception {
@@ -554,6 +567,7 @@ public class TestJsonReader extends BaseTestQuery {
         .sqlQuery("select t.rk.a as a from dfs.`%s` t", fileName)
         .ordered()
         .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type`=true")
+        .optionSettingQueriesForTestQuery("alter session set `store.json.enable_v2_reader` = false")
         .baselineColumns("a")
         .baselineValues(map)
         .baselineValues("2")
@@ -561,6 +575,7 @@ public class TestJsonReader extends BaseTestQuery {
 
     } finally {
       resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
+      resetSessionOption(ExecConstants.ENABLE_V2_JSON_READER_KEY);
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonReaderFns.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonReaderFns.java
index 5b2fb24741..000dbaac39 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonReaderFns.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonReaderFns.java
@@ -24,7 +24,6 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.vector.complex.writer.TestJsonReader;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.rowSet.RowSetComparison;
 import org.junit.BeforeClass;
@@ -56,7 +55,7 @@ public class TestJsonReaderFns extends BaseTestJsonReader {
 
   @Test
   public void testEmptyList() throws Exception {
-    runBoth(() -> doTestEmptyList());
+    runBoth(this::doTestEmptyList);
   }
 
   private void doTestEmptyList() throws Exception {
@@ -77,7 +76,7 @@ public class TestJsonReaderFns extends BaseTestJsonReader {
 
   @Test
   public void testRepeatedCountStr() throws Exception {
-    runBoth(() -> doTestRepeatedCountStr());
+    runBoth(this::doTestRepeatedCountStr);
   }
 
   private void doTestRepeatedCountStr() throws Exception {
@@ -93,7 +92,7 @@ public class TestJsonReaderFns extends BaseTestJsonReader {
 
   @Test
   public void testRepeatedCountInt() throws Exception {
-    runBoth(() -> doTestRepeatedCountInt());
+    runBoth(this::doTestRepeatedCountInt);
   }
 
   private void doTestRepeatedCountInt() throws Exception {
@@ -109,7 +108,7 @@ public class TestJsonReaderFns extends BaseTestJsonReader {
 
   @Test
   public void testRepeatedCountFloat4() throws Exception {
-    runBoth(() -> doTestRepeatedCountFloat4());
+    runBoth(this::doTestRepeatedCountFloat4);
   }
 
   private void doTestRepeatedCountFloat4() throws Exception {
@@ -125,7 +124,7 @@ public class TestJsonReaderFns extends BaseTestJsonReader {
 
   @Test
   public void testRepeatedCountVarchar() throws Exception {
-    runBoth(() -> doTestRepeatedCountVarchar());
+    runBoth(this::doTestRepeatedCountVarchar);
   }
 
   private void doTestRepeatedCountVarchar() throws Exception {
@@ -141,7 +140,7 @@ public class TestJsonReaderFns extends BaseTestJsonReader {
 
   @Test
   public void testRepeatedCountBit() throws Exception {
-    runBoth(() -> doTestRepeatedCountBit());
+    runBoth(this::doTestRepeatedCountBit);
   }
 
   private void doTestRepeatedCountBit() throws Exception {
@@ -167,7 +166,7 @@ public class TestJsonReaderFns extends BaseTestJsonReader {
 
   @Test
   public void testRepeatedContainsStr() throws Exception {
-    runBoth(() -> doTestRepeatedContainsStr());
+    runBoth(this::doTestRepeatedContainsStr);
   }
 
   private void doTestRepeatedContainsStr() throws Exception {
@@ -183,7 +182,7 @@ public class TestJsonReaderFns extends BaseTestJsonReader {
 
   @Test
   public void testRepeatedContainsInt() throws Exception {
-    runBoth(() -> doTestRepeatedContainsInt());
+    runBoth(this::doTestRepeatedContainsInt);
   }
 
   private void doTestRepeatedContainsInt() throws Exception {
@@ -199,7 +198,7 @@ public class TestJsonReaderFns extends BaseTestJsonReader {
 
   @Test
   public void testRepeatedContainsFloat4() throws Exception {
-    runBoth(() -> doTestRepeatedContainsFloat4());
+    runBoth(this::doTestRepeatedContainsFloat4);
   }
 
   private void doTestRepeatedContainsFloat4() throws Exception {
@@ -215,7 +214,7 @@ public class TestJsonReaderFns extends BaseTestJsonReader {
 
   @Test
   public void testRepeatedContainsVarchar() throws Exception {
-    runBoth(() -> doTestRepeatedContainsVarchar());
+    runBoth(this::doTestRepeatedContainsVarchar);
   }
 
   private void doTestRepeatedContainsVarchar() throws Exception {
@@ -231,7 +230,7 @@ public class TestJsonReaderFns extends BaseTestJsonReader {
 
   @Test
   public void testRepeatedContainsBitTrue() throws Exception {
-    runBoth(() -> doTestRepeatedContainsBitTrue());
+    runBoth(this::doTestRepeatedContainsBitTrue);
   }
 
   private void doTestRepeatedContainsBitTrue() throws Exception {
@@ -247,7 +246,7 @@ public class TestJsonReaderFns extends BaseTestJsonReader {
 
   @Test
   public void testRepeatedContainsBitFalse() throws Exception {
-    runBoth(() -> doTestRepeatedContainsBitFalse());
+    runBoth(this::doTestRepeatedContainsBitFalse);
   }
 
   private void doTestRepeatedContainsBitFalse() throws Exception {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonReaderQueries.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonReaderQueries.java
index 3fcad38e00..f8dff11f54 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonReaderQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonReaderQueries.java
@@ -184,7 +184,7 @@ public class TestJsonReaderQueries extends BaseTestJsonReader {
   @Test
   @Ignore("broken")
   public void testFieldSelectionBug() throws Exception {
-    runBoth(() -> doTestFieldSelectionBug());
+    runBoth(this::doTestFieldSelectionBug);
   }
 
   private void doTestFieldSelectionBug() throws Exception {
@@ -221,7 +221,7 @@ public class TestJsonReaderQueries extends BaseTestJsonReader {
 
   @Test
   public void testReadCompressed() throws Exception {
-    runBoth(() -> doTestReadCompressed());
+    runBoth(this::doTestReadCompressed);
   }
 
   private void doTestReadCompressed() throws Exception {
@@ -269,7 +269,7 @@ public class TestJsonReaderQueries extends BaseTestJsonReader {
 
   @Test
   public void testDrill_1419() throws Exception {
-    runBoth(() -> doTestDrill_1419());
+    runBoth(this::doTestDrill_1419);
   }
 
   private void doTestDrill_1419() throws Exception {
@@ -293,7 +293,7 @@ public class TestJsonReaderQueries extends BaseTestJsonReader {
 
   @Test
   public void testSingleColumnRead_vector_fill_bug() throws Exception {
-    runBoth(() -> doTestSingleColumnRead_vector_fill_bug());
+    runBoth(this::doTestSingleColumnRead_vector_fill_bug);
   }
 
   private void doTestSingleColumnRead_vector_fill_bug() throws Exception {
@@ -304,7 +304,7 @@ public class TestJsonReaderQueries extends BaseTestJsonReader {
 
   @Test
   public void testNonExistentColumnReadAlone() throws Exception {
-    runBoth(() -> doTestNonExistentColumnReadAlone());
+    runBoth(this::doTestNonExistentColumnReadAlone);
   }
 
   private void doTestNonExistentColumnReadAlone() throws Exception {
@@ -315,7 +315,7 @@ public class TestJsonReaderQueries extends BaseTestJsonReader {
 
   @Test
   public void testAllTextMode() throws Exception {
-    runBoth(() -> doTestAllTextMode());
+    runBoth(this::doTestAllTextMode);
   }
 
   private void doTestAllTextMode() throws Exception {
@@ -583,7 +583,6 @@ public class TestJsonReaderQueries extends BaseTestJsonReader {
         os.write("{\"col1\": \"val4\", \"col2\": null}");
       }
       String sql = "select t.col1, t.col2.col3 from dfs.tmp.drill_4032 t order by col1";
-//      String sql = "select t.col1, t.col2.col3 from dfs.tmp.drill_4032 t";
       RowSet results = runTest(sql);
       results.print();
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonReaderWithSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonReaderWithSchema.java
index c0d4a4b80f..b2c26a6824 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonReaderWithSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonReaderWithSchema.java
@@ -1,10 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.drill.exec.store.json;
 
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.ClusterFixture;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestJsonReaderWithSchema extends BaseTestJsonReader {
 
+  @BeforeClass
+  public static void setup() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+  }
+
   @Test
   public void testSelectFromListWithCase() throws Exception {
     try {
@@ -14,6 +38,7 @@ public class TestJsonReaderWithSchema extends BaseTestJsonReader {
                 "from cp.`jsoninput/union/a.json`) where a is not null")
               .ordered()
               .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+              .optionSettingQueriesForTestQuery("alter session set `store.json.enable_v2_reader` = false")
               .baselineColumns("a", "type")
               .baselineValues(13L, "BIGINT")
               .go();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
index 7b0a61c496..aa55f791e0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
@@ -17,6 +17,9 @@
  */
 package org.apache.drill.exec.store.json;
 
+import static org.apache.drill.exec.ExecConstants.ENABLE_V2_JSON_READER_KEY;
+import static org.apache.drill.exec.ExecConstants.JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG;
+import static org.apache.drill.exec.ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -45,12 +48,12 @@ public class TestJsonRecordReader extends BaseTestQuery {
     dirTestWatcher.copyResourceToRoot(Paths.get("jsoninput/drill_3353"));
   }
 
-  private void enableV2Reader(boolean enable) throws Exception {
-    alterSession(ExecConstants.ENABLE_V2_JSON_READER_KEY, enable);
+  private void enableV2Reader(boolean enable) {
+    alterSession(ENABLE_V2_JSON_READER_KEY, enable);
   }
 
-  private void resetV2Reader() throws Exception {
-    resetSessionOption(ExecConstants.ENABLE_V2_JSON_READER_KEY);
+  private void resetV2Reader() {
+    resetSessionOption(ENABLE_V2_JSON_READER_KEY);
   }
 
   public interface TestWrapper {
@@ -79,7 +82,12 @@ public class TestJsonRecordReader extends BaseTestQuery {
 
   @Test
   public void testDateJsonInput() throws Exception {
-    test("select `date`, AGE(`date`, CAST('2019-09-30 20:47:43' as timestamp)) from cp.`jsoninput/input2.json` limit 10 ");
+    try{
+      alterSession(ExecConstants.JSON_EXTENDED_TYPES_KEY, true);
+      test("select `date`, AGE(`date`, CAST('2019-09-30 20:47:43' as timestamp)) from cp.`jsoninput/input2.json` limit 10 ");
+    } finally {
+      resetSessionOption(ExecConstants.JSON_EXTENDED_TYPES_KEY);
+    }
   }
 
   @Test
@@ -115,7 +123,7 @@ public class TestJsonRecordReader extends BaseTestQuery {
   // DRILL-1634 : retrieve an element in a nested array in a repeated map.
   // RepeatedMap (Repeated List (Repeated varchar))
   public void testNestedArrayInRepeatedMap() throws Exception {
-    runBoth(() -> doTestNestedArrayInRepeatedMap());
+    runBoth(this::doTestNestedArrayInRepeatedMap);
   }
 
   private void doTestNestedArrayInRepeatedMap() throws Exception {
@@ -126,7 +134,7 @@ public class TestJsonRecordReader extends BaseTestQuery {
 
   @Test
   public void testEmptyMapDoesNotFailValueCapacityCheck() throws Exception {
-    runBoth(() -> doTestEmptyMapDoesNotFailValueCapacityCheck());
+    runBoth(this::doTestEmptyMapDoesNotFailValueCapacityCheck);
   }
 
   private void doTestEmptyMapDoesNotFailValueCapacityCheck() throws Exception {
@@ -136,13 +144,16 @@ public class TestJsonRecordReader extends BaseTestQuery {
 
   @Test
   public void testEnableAllTextMode() throws Exception {
-    runBoth(() -> doTestEnableAllTextMode());
+    runBoth(this::doTestEnableAllTextMode);
   }
 
   private void doTestEnableAllTextMode() throws Exception {
-    alterSession(ExecConstants.JSON_ALL_TEXT_MODE, true);
-    test("select * from cp.`jsoninput/big_numeric.json`");
-    resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+    try{
+      alterSession(ExecConstants.JSON_ALL_TEXT_MODE, true);
+      test("select * from cp.`jsoninput/big_numeric.json`");
+    } finally {
+      resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+    }
   }
 
   @Test
@@ -168,26 +179,32 @@ public class TestJsonRecordReader extends BaseTestQuery {
   @Category(UnlikelyTest.class)
   // DRILL-1832
   public void testJsonWithNulls1() throws Exception {
-    runBoth(() -> doTestJsonWithNulls1());
+    runBoth(this::doTestJsonWithNulls1);
   }
 
   private void doTestJsonWithNulls1() throws Exception {
     final String query = "select * from cp.`jsoninput/twitter_43.json`";
-    testBuilder().sqlQuery(query).unOrdered()
-        .jsonBaselineFile("jsoninput/drill-1832-1-result.json").go();
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .jsonBaselineFile("jsoninput/drill-1832-1-result.json")
+      .go();
   }
 
   @Test
   @Category(UnlikelyTest.class)
   // DRILL-1832
   public void testJsonWithNulls2() throws Exception {
-    runBoth(() -> doTestJsonWithNulls2());
+    runBoth(this::doTestJsonWithNulls2);
   }
 
   private void doTestJsonWithNulls2() throws Exception {
     final String query = "select SUM(1) as `sum_Number_of_Records_ok` from cp.`jsoninput/twitter_43.json` having (COUNT(1) > 0)";
-    testBuilder().sqlQuery(query).unOrdered()
-        .jsonBaselineFile("jsoninput/drill-1832-2-result.json").go();
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .jsonBaselineFile("jsoninput/drill-1832-2-result.json")
+      .go();
   }
 
   // V1-only test. In V2, this works. See TestJsonReaderQueries.
@@ -197,16 +214,14 @@ public class TestJsonRecordReader extends BaseTestQuery {
     try {
       enableV2Reader(false);
       testBuilder()
-          .sqlQuery("select * from cp.`jsoninput/mixed_number_types.json`")
-          .unOrdered().jsonBaselineFile("jsoninput/mixed_number_types.json")
-          .build().run();
+        .sqlQuery("select * from cp.`jsoninput/mixed_number_types.json`")
+        .unOrdered().jsonBaselineFile("jsoninput/mixed_number_types.json")
+        .go();
       fail("Mixed number types verification failed, expected failure on conflicting number types.");
     } catch (Exception ex) {
       // this indicates successful completion of the test
-      assertTrue(ex
-          .getMessage()
-          .contains(
-              "You tried to write a BigInt type when you are using a ValueWriter of type NullableFloat8WriterImpl."));
+      assertTrue(ex.getMessage()
+        .contains("You tried to write a BigInt type when you are using a ValueWriter of type NullableFloat8WriterImpl."));
     } finally {
       resetV2Reader();
     }
@@ -214,16 +229,19 @@ public class TestJsonRecordReader extends BaseTestQuery {
 
   @Test
   public void testMixedNumberTypesInAllTextMode() throws Exception {
-    runBoth(() -> doTestMixedNumberTypesInAllTextMode());
+    runBoth(this::doTestMixedNumberTypesInAllTextMode);
   }
 
   private void doTestMixedNumberTypesInAllTextMode() throws Exception {
     try {
       alterSession("store.json.all_text_mode", true);
       testBuilder()
-          .sqlQuery("select * from cp.`jsoninput/mixed_number_types.json`")
-          .unOrdered().baselineColumns("a").baselineValues("5.2")
-          .baselineValues("6").build().run();
+        .sqlQuery("select * from cp.`jsoninput/mixed_number_types.json`")
+        .unOrdered()
+        .baselineColumns("a")
+        .baselineValues("5.2")
+        .baselineValues("6")
+        .go();
     } finally {
       resetSessionOption("store.json.all_text_mode");
     }
@@ -234,9 +252,12 @@ public class TestJsonRecordReader extends BaseTestQuery {
     try {
       alterSession(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE, true);
       testBuilder()
-          .sqlQuery("select * from cp.`jsoninput/mixed_number_types.json`")
-          .unOrdered().baselineColumns("a").baselineValues(5.2D)
-          .baselineValues(6D).build().run();
+        .sqlQuery("select * from cp.`jsoninput/mixed_number_types.json`")
+        .unOrdered()
+        .baselineColumns("a")
+        .baselineValues(5.2D)
+        .baselineValues(6D)
+        .go();
     } finally {
       resetSessionOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE);
     }
@@ -246,7 +267,7 @@ public class TestJsonRecordReader extends BaseTestQuery {
   public void drill_3353() throws Exception {
     try {
       alterSession(ExecConstants.JSON_ALL_TEXT_MODE, true);
-       test("create table dfs.tmp.drill_3353 as select a from dfs.`jsoninput/drill_3353` where e = true");
+      test("create table dfs.tmp.drill_3353 as select a from dfs.`jsoninput/drill_3353` where e = true");
       runBoth(this::doDrill_3353);
     } finally {
       resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
@@ -274,8 +295,11 @@ public class TestJsonRecordReader extends BaseTestQuery {
   private void doTestNestedFilter() throws Exception {
     String query = "select a from cp.`jsoninput/nestedFilter.json` t where t.a.b = 1";
     String baselineQuery = "select * from cp.`jsoninput/nestedFilter.json` t where t.a.b = 1";
-    testBuilder().sqlQuery(query).unOrdered().sqlBaselineQuery(baselineQuery)
-        .go();
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .sqlBaselineQuery(baselineQuery)
+      .go();
   }
 
   @Test
@@ -284,24 +308,20 @@ public class TestJsonRecordReader extends BaseTestQuery {
   /* Test for CountingJSONReader */
   public void testCountingQuerySkippingInvalidJSONRecords() throws Exception {
     try {
-      String set = "alter session set `"
-        + ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG + "` = true";
-      String set1 = "alter session set `"
-        + ExecConstants.JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG
-        + "` = true";
       String query = "select count(*) from cp.`jsoninput/drill4653/file.json`";
 
-      testNoResult(set);
-      testNoResult(set1);
       testBuilder()
         .unOrdered()
+        .disableSessionOption(ENABLE_V2_JSON_READER_KEY)
+        .enableSessionOption(JSON_READER_SKIP_INVALID_RECORDS_FLAG)
+        .enableSessionOption(JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG)
         .sqlQuery(query)
         .sqlBaselineQuery(query)
         .go();
     } finally {
-      String set = "alter session set `"
-        + ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG + "` = false";
-      testNoResult(set);
+      resetSessionOption(ENABLE_V2_JSON_READER_KEY);
+      resetSessionOption(JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG);
+      resetSessionOption(JSON_READER_SKIP_INVALID_RECORDS_FLAG);
     }
   }
 
@@ -316,8 +336,11 @@ public class TestJsonRecordReader extends BaseTestQuery {
   private void doTestCountingQueryNotSkippingInvalidJSONRecords() throws Exception {
     try {
       String query = "select count(*) from cp.`jsoninput/drill4653/file.json`";
-      testBuilder().unOrdered().sqlQuery(query).sqlBaselineQuery(query).build()
-          .run();
+      testBuilder()
+        .unOrdered()
+        .sqlQuery(query)
+        .sqlBaselineQuery(query)
+        .go();
     } catch (Exception ex) {
       // do nothing just return
        return;
@@ -331,24 +354,20 @@ public class TestJsonRecordReader extends BaseTestQuery {
   /* Test for JSONReader */
   public void testNotCountingQuerySkippingInvalidJSONRecords() throws Exception {
     try {
-      String set = "alter session set `"
-        + ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG + "` = true";
-      String set1 = "alter session set `"
-        + ExecConstants.JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG
-        + "` = true";
       String query = "select sum(balance) from cp.`jsoninput/drill4653/file.json`";
-      testNoResult(set);
-      testNoResult(set1);
       testBuilder()
         .unOrdered()
+        .disableSessionOption(ENABLE_V2_JSON_READER_KEY)
+        .enableSessionOption(JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG)
+        .enableSessionOption(JSON_READER_SKIP_INVALID_RECORDS_FLAG)
         .sqlQuery(query)
         .sqlBaselineQuery(query)
         .go();
     }
     finally {
-      String set = "alter session set `"
-        + ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG + "` = false";
-      testNoResult(set);
+      resetSessionOption(ENABLE_V2_JSON_READER_KEY);
+      resetSessionOption(JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG);
+      resetSessionOption(JSON_READER_SKIP_INVALID_RECORDS_FLAG);
     }
   }
 
@@ -356,16 +375,18 @@ public class TestJsonRecordReader extends BaseTestQuery {
   @Category(UnlikelyTest.class)
   // See DRILL-4653
   /* Test for JSONReader */
-  public void testNotCountingQueryNotSkippingInvalidJSONRecords()
-      throws Exception {
+  public void testNotCountingQueryNotSkippingInvalidJSONRecords() throws Exception {
     runBoth(this::doTestNotCountingQueryNotSkippingInvalidJSONRecords);
   }
 
   private void doTestNotCountingQueryNotSkippingInvalidJSONRecords() throws Exception {
     try {
       String query = "select sum(balance) from cp.`jsoninput/drill4653/file.json`";
-      testBuilder().unOrdered().sqlQuery(query).sqlBaselineQuery(query).build()
-          .run();
+      testBuilder()
+        .unOrdered()
+        .sqlQuery(query)
+        .sqlBaselineQuery(query)
+        .go();
     } catch (Exception ex) {
       // do nothing just return
       return;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestVarlenDecimal.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestVarlenDecimal.java
index 8cae6689ab..822497cd8a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestVarlenDecimal.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestVarlenDecimal.java
@@ -49,10 +49,6 @@ public class TestVarlenDecimal extends ClusterTest {
   @BeforeClass
   public static void setUp() throws Exception {
     startCluster(ClusterFixture.builder(dirTestWatcher));
-  }
-
-  @BeforeClass
-  public static void enableDecimalDataType() {
     client.alterSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestExtendedTypes.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestExtendedTypes.java
index cb205453b1..2bc80d3b84 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestExtendedTypes.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestExtendedTypes.java
@@ -40,7 +40,7 @@ public class TestExtendedTypes extends BaseTestQuery {
 
   @Test
   public void checkReadWriteExtended() throws Exception {
-    runBoth(() -> doCheckReadWriteExtended());
+    runBoth(this::doCheckReadWriteExtended);
   }
 
   private void doCheckReadWriteExtended() throws Exception {
@@ -73,7 +73,7 @@ public class TestExtendedTypes extends BaseTestQuery {
 
   @Test
   public void testMongoExtendedTypes() throws Exception {
-    runBoth(() -> doTestMongoExtendedTypes());
+    runBoth(this::doTestMongoExtendedTypes);
   }
 
   private void doTestMongoExtendedTypes() throws Exception {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index 16f302e750..3c94f4f4ea 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -196,7 +196,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
     }
   }
 
-  private void createConfig() throws Exception {
+  private void createConfig() {
 
     // Create a config
     // Because of the way DrillConfig works, we can set the ZK
@@ -278,7 +278,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
     pluginRegistry.put(MockStorageEngineConfig.NAME, config);
   }
 
-  private void applyOptions() throws Exception {
+  private void applyOptions() {
 
     // Apply system options
     if (builder.systemOptions != null) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
index 43f1396070..1fb916b5f7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
@@ -106,7 +106,7 @@ public class ClusterTest extends DrillTest {
     return ClusterFixture.getResource(resource);
   }
 
-  public void runAndLog(String sqlQuery) throws Exception {
+  public void runAndLog(String sqlQuery) {
     client.runQueriesAndLog(sqlQuery);
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/TestBuilder.java
index fecb540c1e..80ee9ac602 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/TestBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/TestBuilder.java
@@ -220,6 +220,16 @@ public class TestBuilder {
     return this;
   }
 
+  public TestBuilder enableSessionOption(String option) {
+    optionSettingQueriesForTestQuery("ALTER SESSION SET `" + option + "` = 'true'");
+    return this;
+  }
+
+  public TestBuilder disableSessionOption(String option) {
+    optionSettingQueriesForTestQuery("ALTER SESSION SET `" + option + "` = 'false'");
+    return this;
+  }
+
   public TestBuilder approximateEquality() {
     return approximateEquality(0.1);
   }
diff --git a/exec/java-exec/src/test/resources/rest/cust20.json b/exec/java-exec/src/test/resources/rest/cust20.json
index eb0e55dd14..f7ec9ca404 100644
--- a/exec/java-exec/src/test/resources/rest/cust20.json
+++ b/exec/java-exec/src/test/resources/rest/cust20.json
@@ -1,28 +1,28 @@
 !\{"queryId":"[^"]+"
-,"columns":["employee_id","full_name","first_name","last_name","position_id","position_title","store_id","department_id","birth_date","hire_date","salary","supervisor_id","education_level","marital_status","gender","management_role"]
-,"metadata":["BIGINT","VARCHAR","VARCHAR","VARCHAR","BIGINT","VARCHAR","BIGINT","BIGINT","VARCHAR","VARCHAR","FLOAT8","BIGINT","VARCHAR","VARCHAR","VARCHAR","VARCHAR"]
+,"columns":["employee_id","full_name","first_name","last_name","position_id","position_title","store_id","department_id","birth_date","hire_date","salary","supervisor_id","education_level","marital_status","gender","management_role","end_date"]
+,"metadata":["BIGINT","VARCHAR","VARCHAR","VARCHAR","BIGINT","VARCHAR","BIGINT","BIGINT","VARCHAR","VARCHAR","FLOAT8","BIGINT","VARCHAR","VARCHAR","VARCHAR","VARCHAR","VARCHAR"]
 ,"attemptedAutoLimit":0
 ,"rows":[
-{"employee_id":1,"full_name":"Sheri Nowmer","first_name":"Sheri","last_name":"Nowmer","position_id":1,"position_title":"President","store_id":0,"department_id":1,"birth_date":"1961-08-26","hire_date":"1994-12-01 00:00:00.0","salary":80000.0,"supervisor_id":0,"education_level":"Graduate Degree","marital_status":"S","gender":"F","management_role":"Senior Management"}
-,{"employee_id":2,"full_name":"Derrick Whelply","first_name":"Derrick","last_name":"Whelply","position_id":2,"position_title":"VP Country Manager","store_id":0,"department_id":1,"birth_date":"1915-07-03","hire_date":"1994-12-01 00:00:00.0","salary":40000.0,"supervisor_id":1,"education_level":"Graduate Degree","marital_status":"M","gender":"M","management_role":"Senior Management"}
-,{"employee_id":4,"full_name":"Michael Spence","first_name":"Michael","last_name":"Spence","position_id":2,"position_title":"VP Country Manager","store_id":0,"department_id":1,"birth_date":"1969-06-20","hire_date":"1998-01-01 00:00:00.0","salary":40000.0,"supervisor_id":1,"education_level":"Graduate Degree","marital_status":"S","gender":"M","management_role":"Senior Management"}
-,{"employee_id":5,"full_name":"Maya Gutierrez","first_name":"Maya","last_name":"Gutierrez","position_id":2,"position_title":"VP Country Manager","store_id":0,"department_id":1,"birth_date":"1951-05-10","hire_date":"1998-01-01 00:00:00.0","salary":35000.0,"supervisor_id":1,"education_level":"Bachelors Degree","marital_status":"M","gender":"F","management_role":"Senior Management"}
-,{"employee_id":6,"full_name":"Roberta Damstra","first_name":"Roberta","last_name":"Damstra","position_id":3,"position_title":"VP Information Systems","store_id":0,"department_id":2,"birth_date":"1942-10-08","hire_date":"1994-12-01 00:00:00.0","salary":25000.0,"supervisor_id":1,"education_level":"Bachelors Degree","marital_status":"M","gender":"F","management_role":"Senior Management"}
-,{"employee_id":7,"full_name":"Rebecca Kanagaki","first_name":"Rebecca","last_name":"Kanagaki","position_id":4,"position_title":"VP Human Resources","store_id":0,"department_id":3,"birth_date":"1949-03-27","hire_date":"1994-12-01 00:00:00.0","salary":15000.0,"supervisor_id":1,"education_level":"Bachelors Degree","marital_status":"M","gender":"F","management_role":"Senior Management"}
-,{"employee_id":8,"full_name":"Kim Brunner","first_name":"Kim","last_name":"Brunner","position_id":11,"position_title":"Store Manager","store_id":9,"department_id":11,"birth_date":"1922-08-10","hire_date":"1998-01-01 00:00:00.0","salary":10000.0,"supervisor_id":5,"education_level":"Bachelors Degree","marital_status":"S","gender":"F","management_role":"Store Management"}
-,{"employee_id":9,"full_name":"Brenda Blumberg","first_name":"Brenda","last_name":"Blumberg","position_id":11,"position_title":"Store Manager","store_id":21,"department_id":11,"birth_date":"1979-06-23","hire_date":"1998-01-01 00:00:00.0","salary":17000.0,"supervisor_id":5,"education_level":"Graduate Degree","marital_status":"M","gender":"F","management_role":"Store Management"}
-,{"employee_id":10,"full_name":"Darren Stanz","first_name":"Darren","last_name":"Stanz","position_id":5,"position_title":"VP Finance","store_id":0,"department_id":5,"birth_date":"1949-08-26","hire_date":"1994-12-01 00:00:00.0","salary":50000.0,"supervisor_id":1,"education_level":"Partial College","marital_status":"M","gender":"M","management_role":"Senior Management"}
-,{"employee_id":11,"full_name":"Jonathan Murraiin","first_name":"Jonathan","last_name":"Murraiin","position_id":11,"position_title":"Store Manager","store_id":1,"department_id":11,"birth_date":"1967-06-20","hire_date":"1998-01-01 00:00:00.0","salary":15000.0,"supervisor_id":5,"education_level":"Graduate Degree","marital_status":"S","gender":"M","management_role":"Store Management"}
-,{"employee_id":12,"full_name":"Jewel Creek","first_name":"Jewel","last_name":"Creek","position_id":11,"position_title":"Store Manager","store_id":5,"department_id":11,"birth_date":"1971-10-18","hire_date":"1998-01-01 00:00:00.0","salary":8500.0,"supervisor_id":5,"education_level":"Graduate Degree","marital_status":"S","gender":"F","management_role":"Store Management"}
-,{"employee_id":13,"full_name":"Peggy Medina","first_name":"Peggy","last_name":"Medina","position_id":11,"position_title":"Store Manager","store_id":10,"department_id":11,"birth_date":"1975-10-12","hire_date":"1998-01-01 00:00:00.0","salary":15000.0,"supervisor_id":5,"education_level":"Bachelors Degree","marital_status":"S","gender":"F","management_role":"Store Management"}
-,{"employee_id":14,"full_name":"Bryan Rutledge","first_name":"Bryan","last_name":"Rutledge","position_id":11,"position_title":"Store Manager","store_id":8,"department_id":11,"birth_date":"1912-07-09","hire_date":"1998-01-01 00:00:00.0","salary":17000.0,"supervisor_id":5,"education_level":"Bachelors Degree","marital_status":"M","gender":"M","management_role":"Store Management"}
-,{"employee_id":15,"full_name":"Walter Cavestany","first_name":"Walter","last_name":"Cavestany","position_id":11,"position_title":"Store Manager","store_id":4,"department_id":11,"birth_date":"1941-11-05","hire_date":"1998-01-01 00:00:00.0","salary":12000.0,"supervisor_id":5,"education_level":"Bachelors Degree","marital_status":"M","gender":"M","management_role":"Store Management"}
-,{"employee_id":16,"full_name":"Peggy Planck","first_name":"Peggy","last_name":"Planck","position_id":11,"position_title":"Store Manager","store_id":12,"department_id":11,"birth_date":"1919-06-02","hire_date":"1998-01-01 00:00:00.0","salary":17000.0,"supervisor_id":5,"education_level":"Bachelors Degree","marital_status":"S","gender":"F","management_role":"Store Management"}
-,{"employee_id":17,"full_name":"Brenda Marshall","first_name":"Brenda","last_name":"Marshall","position_id":11,"position_title":"Store Manager","store_id":18,"department_id":11,"birth_date":"1928-03-20","hire_date":"1998-01-01 00:00:00.0","salary":10000.0,"supervisor_id":5,"education_level":"Partial College","marital_status":"S","gender":"F","management_role":"Store Management"}
-,{"employee_id":18,"full_name":"Daniel Wolter","first_name":"Daniel","last_name":"Wolter","position_id":11,"position_title":"Store Manager","store_id":19,"department_id":11,"birth_date":"1914-09-21","hire_date":"1998-01-01 00:00:00.0","salary":17000.0,"supervisor_id":4,"education_level":"Partial College","marital_status":"S","gender":"M","management_role":"Store Management"}
-,{"employee_id":19,"full_name":"Dianne Collins","first_name":"Dianne","last_name":"Collins","position_id":11,"position_title":"Store Manager","store_id":20,"department_id":11,"birth_date":"1953-07-20","hire_date":"1998-01-01 00:00:00.0","salary":10000.0,"supervisor_id":4,"education_level":"Bachelors Degree","marital_status":"S","gender":"F","management_role":"Store Management"}
-,{"employee_id":20,"full_name":"Beverly Baker","first_name":"Beverly","last_name":"Baker","position_id":2,"position_title":"VP Country Manager","store_id":0,"department_id":1,"birth_date":"1974-04-16","hire_date":"1994-12-01 00:00:00.0","salary":30000.0,"supervisor_id":2,"education_level":"Bachelors Degree","marital_status":"M","gender":"F","management_role":"Senior Management"}
-,{"employee_id":21,"full_name":"Pedro Castillo","first_name":"Pedro","last_name":"Castillo","position_id":2,"position_title":"VP Country Manager","store_id":0,"department_id":1,"birth_date":"1918-11-04","hire_date":"1994-12-01 00:00:00.0","salary":35000.0,"supervisor_id":2,"education_level":"Bachelors Degree","marital_status":"M","gender":"M","management_role":"Senior Management"}
+{"employee_id":1,"full_name":"Sheri Nowmer","first_name":"Sheri","last_name":"Nowmer","position_id":1,"position_title":"President","store_id":0,"department_id":1,"birth_date":"1961-08-26","hire_date":"1994-12-01 00:00:00.0","salary":80000.0,"supervisor_id":0,"education_level":"Graduate Degree","marital_status":"S","gender":"F","management_role":"Senior Management","end_date":null}
+,{"employee_id":2,"full_name":"Derrick Whelply","first_name":"Derrick","last_name":"Whelply","position_id":2,"position_title":"VP Country Manager","store_id":0,"department_id":1,"birth_date":"1915-07-03","hire_date":"1994-12-01 00:00:00.0","salary":40000.0,"supervisor_id":1,"education_level":"Graduate Degree","marital_status":"M","gender":"M","management_role":"Senior Management","end_date":null}
+,{"employee_id":4,"full_name":"Michael Spence","first_name":"Michael","last_name":"Spence","position_id":2,"position_title":"VP Country Manager","store_id":0,"department_id":1,"birth_date":"1969-06-20","hire_date":"1998-01-01 00:00:00.0","salary":40000.0,"supervisor_id":1,"education_level":"Graduate Degree","marital_status":"S","gender":"M","management_role":"Senior Management","end_date":null}
+,{"employee_id":5,"full_name":"Maya Gutierrez","first_name":"Maya","last_name":"Gutierrez","position_id":2,"position_title":"VP Country Manager","store_id":0,"department_id":1,"birth_date":"1951-05-10","hire_date":"1998-01-01 00:00:00.0","salary":35000.0,"supervisor_id":1,"education_level":"Bachelors Degree","marital_status":"M","gender":"F","management_role":"Senior Management","end_date":null}
+,{"employee_id":6,"full_name":"Roberta Damstra","first_name":"Roberta","last_name":"Damstra","position_id":3,"position_title":"VP Information Systems","store_id":0,"department_id":2,"birth_date":"1942-10-08","hire_date":"1994-12-01 00:00:00.0","salary":25000.0,"supervisor_id":1,"education_level":"Bachelors Degree","marital_status":"M","gender":"F","management_role":"Senior Management","end_date":null}
+,{"employee_id":7,"full_name":"Rebecca Kanagaki","first_name":"Rebecca","last_name":"Kanagaki","position_id":4,"position_title":"VP Human Resources","store_id":0,"department_id":3,"birth_date":"1949-03-27","hire_date":"1994-12-01 00:00:00.0","salary":15000.0,"supervisor_id":1,"education_level":"Bachelors Degree","marital_status":"M","gender":"F","management_role":"Senior Management","end_date":null}
+,{"employee_id":8,"full_name":"Kim Brunner","first_name":"Kim","last_name":"Brunner","position_id":11,"position_title":"Store Manager","store_id":9,"department_id":11,"birth_date":"1922-08-10","hire_date":"1998-01-01 00:00:00.0","salary":10000.0,"supervisor_id":5,"education_level":"Bachelors Degree","marital_status":"S","gender":"F","management_role":"Store Management","end_date":null}
+,{"employee_id":9,"full_name":"Brenda Blumberg","first_name":"Brenda","last_name":"Blumberg","position_id":11,"position_title":"Store Manager","store_id":21,"department_id":11,"birth_date":"1979-06-23","hire_date":"1998-01-01 00:00:00.0","salary":17000.0,"supervisor_id":5,"education_level":"Graduate Degree","marital_status":"M","gender":"F","management_role":"Store Management","end_date":null}
+,{"employee_id":10,"full_name":"Darren Stanz","first_name":"Darren","last_name":"Stanz","position_id":5,"position_title":"VP Finance","store_id":0,"department_id":5,"birth_date":"1949-08-26","hire_date":"1994-12-01 00:00:00.0","salary":50000.0,"supervisor_id":1,"education_level":"Partial College","marital_status":"M","gender":"M","management_role":"Senior Management","end_date":null}
+,{"employee_id":11,"full_name":"Jonathan Murraiin","first_name":"Jonathan","last_name":"Murraiin","position_id":11,"position_title":"Store Manager","store_id":1,"department_id":11,"birth_date":"1967-06-20","hire_date":"1998-01-01 00:00:00.0","salary":15000.0,"supervisor_id":5,"education_level":"Graduate Degree","marital_status":"S","gender":"M","management_role":"Store Management","end_date":null}
+,{"employee_id":12,"full_name":"Jewel Creek","first_name":"Jewel","last_name":"Creek","position_id":11,"position_title":"Store Manager","store_id":5,"department_id":11,"birth_date":"1971-10-18","hire_date":"1998-01-01 00:00:00.0","salary":8500.0,"supervisor_id":5,"education_level":"Graduate Degree","marital_status":"S","gender":"F","management_role":"Store Management","end_date":null}
+,{"employee_id":13,"full_name":"Peggy Medina","first_name":"Peggy","last_name":"Medina","position_id":11,"position_title":"Store Manager","store_id":10,"department_id":11,"birth_date":"1975-10-12","hire_date":"1998-01-01 00:00:00.0","salary":15000.0,"supervisor_id":5,"education_level":"Bachelors Degree","marital_status":"S","gender":"F","management_role":"Store Management","end_date":null}
+,{"employee_id":14,"full_name":"Bryan Rutledge","first_name":"Bryan","last_name":"Rutledge","position_id":11,"position_title":"Store Manager","store_id":8,"department_id":11,"birth_date":"1912-07-09","hire_date":"1998-01-01 00:00:00.0","salary":17000.0,"supervisor_id":5,"education_level":"Bachelors Degree","marital_status":"M","gender":"M","management_role":"Store Management","end_date":null}
+,{"employee_id":15,"full_name":"Walter Cavestany","first_name":"Walter","last_name":"Cavestany","position_id":11,"position_title":"Store Manager","store_id":4,"department_id":11,"birth_date":"1941-11-05","hire_date":"1998-01-01 00:00:00.0","salary":12000.0,"supervisor_id":5,"education_level":"Bachelors Degree","marital_status":"M","gender":"M","management_role":"Store Management","end_date":null}
+,{"employee_id":16,"full_name":"Peggy Planck","first_name":"Peggy","last_name":"Planck","position_id":11,"position_title":"Store Manager","store_id":12,"department_id":11,"birth_date":"1919-06-02","hire_date":"1998-01-01 00:00:00.0","salary":17000.0,"supervisor_id":5,"education_level":"Bachelors Degree","marital_status":"S","gender":"F","management_role":"Store Management","end_date":null}
+,{"employee_id":17,"full_name":"Brenda Marshall","first_name":"Brenda","last_name":"Marshall","position_id":11,"position_title":"Store Manager","store_id":18,"department_id":11,"birth_date":"1928-03-20","hire_date":"1998-01-01 00:00:00.0","salary":10000.0,"supervisor_id":5,"education_level":"Partial College","marital_status":"S","gender":"F","management_role":"Store Management","end_date":null}
+,{"employee_id":18,"full_name":"Daniel Wolter","first_name":"Daniel","last_name":"Wolter","position_id":11,"position_title":"Store Manager","store_id":19,"department_id":11,"birth_date":"1914-09-21","hire_date":"1998-01-01 00:00:00.0","salary":17000.0,"supervisor_id":4,"education_level":"Partial College","marital_status":"S","gender":"M","management_role":"Store Management","end_date":null}
+,{"employee_id":19,"full_name":"Dianne Collins","first_name":"Dianne","last_name":"Collins","position_id":11,"position_title":"Store Manager","store_id":20,"department_id":11,"birth_date":"1953-07-20","hire_date":"1998-01-01 00:00:00.0","salary":10000.0,"supervisor_id":4,"education_level":"Bachelors Degree","marital_status":"S","gender":"F","management_role":"Store Management","end_date":null}
+,{"employee_id":20,"full_name":"Beverly Baker","first_name":"Beverly","last_name":"Baker","position_id":2,"position_title":"VP Country Manager","store_id":0,"department_id":1,"birth_date":"1974-04-16","hire_date":"1994-12-01 00:00:00.0","salary":30000.0,"supervisor_id":2,"education_level":"Bachelors Degree","marital_status":"M","gender":"F","management_role":"Senior Management","end_date":null}
+,{"employee_id":21,"full_name":"Pedro Castillo","first_name":"Pedro","last_name":"Castillo","position_id":2,"position_title":"VP Country Manager","store_id":0,"department_id":1,"birth_date":"1918-11-04","hire_date":"1994-12-01 00:00:00.0","salary":35000.0,"supervisor_id":2,"education_level":"Bachelors Degree","marital_status":"M","gender":"M","management_role":"Senior Management","end_date":null}
 ]
 ,"queryState":"COMPLETED"
 }
diff --git a/exec/java-exec/src/test/resources/rest/small.json b/exec/java-exec/src/test/resources/rest/small.json
index f97df9ac3a..3fa5beedc0 100644
--- a/exec/java-exec/src/test/resources/rest/small.json
+++ b/exec/java-exec/src/test/resources/rest/small.json
@@ -1,18 +1,18 @@
 !\{"queryId":"[^"]+"
-,"columns":["employee_id","full_name","first_name","last_name","position_id","position_title","store_id","department_id","birth_date","hire_date","salary","supervisor_id","education_level","marital_status","gender","management_role"]
-,"metadata":["BIGINT","VARCHAR","VARCHAR","VARCHAR","BIGINT","VARCHAR","BIGINT","BIGINT","VARCHAR","VARCHAR","FLOAT8","BIGINT","VARCHAR","VARCHAR","VARCHAR","VARCHAR"]
+,"columns":["employee_id","full_name","first_name","last_name","position_id","position_title","store_id","department_id","birth_date","hire_date","salary","supervisor_id","education_level","marital_status","gender","management_role","end_date"]
+,"metadata":["BIGINT","VARCHAR","VARCHAR","VARCHAR","BIGINT","VARCHAR","BIGINT","BIGINT","VARCHAR","VARCHAR","FLOAT8","BIGINT","VARCHAR","VARCHAR","VARCHAR","VARCHAR","VARCHAR"]
 ,"attemptedAutoLimit":10
 ,"rows":[
-{"employee_id":1,"full_name":"Sheri Nowmer","first_name":"Sheri","last_name":"Nowmer","position_id":1,"position_title":"President","store_id":0,"department_id":1,"birth_date":"1961-08-26","hire_date":"1994-12-01 00:00:00.0","salary":80000.0,"supervisor_id":0,"education_level":"Graduate Degree","marital_status":"S","gender":"F","management_role":"Senior Management"}
-,{"employee_id":2,"full_name":"Derrick Whelply","first_name":"Derrick","last_name":"Whelply","position_id":2,"position_title":"VP Country Manager","store_id":0,"department_id":1,"birth_date":"1915-07-03","hire_date":"1994-12-01 00:00:00.0","salary":40000.0,"supervisor_id":1,"education_level":"Graduate Degree","marital_status":"M","gender":"M","management_role":"Senior Management"}
-,{"employee_id":4,"full_name":"Michael Spence","first_name":"Michael","last_name":"Spence","position_id":2,"position_title":"VP Country Manager","store_id":0,"department_id":1,"birth_date":"1969-06-20","hire_date":"1998-01-01 00:00:00.0","salary":40000.0,"supervisor_id":1,"education_level":"Graduate Degree","marital_status":"S","gender":"M","management_role":"Senior Management"}
-,{"employee_id":5,"full_name":"Maya Gutierrez","first_name":"Maya","last_name":"Gutierrez","position_id":2,"position_title":"VP Country Manager","store_id":0,"department_id":1,"birth_date":"1951-05-10","hire_date":"1998-01-01 00:00:00.0","salary":35000.0,"supervisor_id":1,"education_level":"Bachelors Degree","marital_status":"M","gender":"F","management_role":"Senior Management"}
-,{"employee_id":6,"full_name":"Roberta Damstra","first_name":"Roberta","last_name":"Damstra","position_id":3,"position_title":"VP Information Systems","store_id":0,"department_id":2,"birth_date":"1942-10-08","hire_date":"1994-12-01 00:00:00.0","salary":25000.0,"supervisor_id":1,"education_level":"Bachelors Degree","marital_status":"M","gender":"F","management_role":"Senior Management"}
-,{"employee_id":7,"full_name":"Rebecca Kanagaki","first_name":"Rebecca","last_name":"Kanagaki","position_id":4,"position_title":"VP Human Resources","store_id":0,"department_id":3,"birth_date":"1949-03-27","hire_date":"1994-12-01 00:00:00.0","salary":15000.0,"supervisor_id":1,"education_level":"Bachelors Degree","marital_status":"M","gender":"F","management_role":"Senior Management"}
-,{"employee_id":8,"full_name":"Kim Brunner","first_name":"Kim","last_name":"Brunner","position_id":11,"position_title":"Store Manager","store_id":9,"department_id":11,"birth_date":"1922-08-10","hire_date":"1998-01-01 00:00:00.0","salary":10000.0,"supervisor_id":5,"education_level":"Bachelors Degree","marital_status":"S","gender":"F","management_role":"Store Management"}
-,{"employee_id":9,"full_name":"Brenda Blumberg","first_name":"Brenda","last_name":"Blumberg","position_id":11,"position_title":"Store Manager","store_id":21,"department_id":11,"birth_date":"1979-06-23","hire_date":"1998-01-01 00:00:00.0","salary":17000.0,"supervisor_id":5,"education_level":"Graduate Degree","marital_status":"M","gender":"F","management_role":"Store Management"}
-,{"employee_id":10,"full_name":"Darren Stanz","first_name":"Darren","last_name":"Stanz","position_id":5,"position_title":"VP Finance","store_id":0,"department_id":5,"birth_date":"1949-08-26","hire_date":"1994-12-01 00:00:00.0","salary":50000.0,"supervisor_id":1,"education_level":"Partial College","marital_status":"M","gender":"M","management_role":"Senior Management"}
-,{"employee_id":11,"full_name":"Jonathan Murraiin","first_name":"Jonathan","last_name":"Murraiin","position_id":11,"position_title":"Store Manager","store_id":1,"department_id":11,"birth_date":"1967-06-20","hire_date":"1998-01-01 00:00:00.0","salary":15000.0,"supervisor_id":5,"education_level":"Graduate Degree","marital_status":"S","gender":"M","management_role":"Store Management"}
+{"employee_id":1,"full_name":"Sheri Nowmer","first_name":"Sheri","last_name":"Nowmer","position_id":1,"position_title":"President","store_id":0,"department_id":1,"birth_date":"1961-08-26","hire_date":"1994-12-01 00:00:00.0","salary":80000.0,"supervisor_id":0,"education_level":"Graduate Degree","marital_status":"S","gender":"F","management_role":"Senior Management","end_date":null}
+,{"employee_id":2,"full_name":"Derrick Whelply","first_name":"Derrick","last_name":"Whelply","position_id":2,"position_title":"VP Country Manager","store_id":0,"department_id":1,"birth_date":"1915-07-03","hire_date":"1994-12-01 00:00:00.0","salary":40000.0,"supervisor_id":1,"education_level":"Graduate Degree","marital_status":"M","gender":"M","management_role":"Senior Management","end_date":null}
+,{"employee_id":4,"full_name":"Michael Spence","first_name":"Michael","last_name":"Spence","position_id":2,"position_title":"VP Country Manager","store_id":0,"department_id":1,"birth_date":"1969-06-20","hire_date":"1998-01-01 00:00:00.0","salary":40000.0,"supervisor_id":1,"education_level":"Graduate Degree","marital_status":"S","gender":"M","management_role":"Senior Management","end_date":null}
+,{"employee_id":5,"full_name":"Maya Gutierrez","first_name":"Maya","last_name":"Gutierrez","position_id":2,"position_title":"VP Country Manager","store_id":0,"department_id":1,"birth_date":"1951-05-10","hire_date":"1998-01-01 00:00:00.0","salary":35000.0,"supervisor_id":1,"education_level":"Bachelors Degree","marital_status":"M","gender":"F","management_role":"Senior Management","end_date":null}
+,{"employee_id":6,"full_name":"Roberta Damstra","first_name":"Roberta","last_name":"Damstra","position_id":3,"position_title":"VP Information Systems","store_id":0,"department_id":2,"birth_date":"1942-10-08","hire_date":"1994-12-01 00:00:00.0","salary":25000.0,"supervisor_id":1,"education_level":"Bachelors Degree","marital_status":"M","gender":"F","management_role":"Senior Management","end_date":null}
+,{"employee_id":7,"full_name":"Rebecca Kanagaki","first_name":"Rebecca","last_name":"Kanagaki","position_id":4,"position_title":"VP Human Resources","store_id":0,"department_id":3,"birth_date":"1949-03-27","hire_date":"1994-12-01 00:00:00.0","salary":15000.0,"supervisor_id":1,"education_level":"Bachelors Degree","marital_status":"M","gender":"F","management_role":"Senior Management","end_date":null}
+,{"employee_id":8,"full_name":"Kim Brunner","first_name":"Kim","last_name":"Brunner","position_id":11,"position_title":"Store Manager","store_id":9,"department_id":11,"birth_date":"1922-08-10","hire_date":"1998-01-01 00:00:00.0","salary":10000.0,"supervisor_id":5,"education_level":"Bachelors Degree","marital_status":"S","gender":"F","management_role":"Store Management","end_date":null}
+,{"employee_id":9,"full_name":"Brenda Blumberg","first_name":"Brenda","last_name":"Blumberg","position_id":11,"position_title":"Store Manager","store_id":21,"department_id":11,"birth_date":"1979-06-23","hire_date":"1998-01-01 00:00:00.0","salary":17000.0,"supervisor_id":5,"education_level":"Graduate Degree","marital_status":"M","gender":"F","management_role":"Store Management","end_date":null}
+,{"employee_id":10,"full_name":"Darren Stanz","first_name":"Darren","last_name":"Stanz","position_id":5,"position_title":"VP Finance","store_id":0,"department_id":5,"birth_date":"1949-08-26","hire_date":"1994-12-01 00:00:00.0","salary":50000.0,"supervisor_id":1,"education_level":"Partial College","marital_status":"M","gender":"M","management_role":"Senior Management","end_date":null}
+,{"employee_id":11,"full_name":"Jonathan Murraiin","first_name":"Jonathan","last_name":"Murraiin","position_id":11,"position_title":"Store Manager","store_id":1,"department_id":11,"birth_date":"1967-06-20","hire_date":"1998-01-01 00:00:00.0","salary":15000.0,"supervisor_id":5,"education_level":"Graduate Degree","marital_status":"S","gender":"M","management_role":"Store Management","end_date":null}
 ]
 ,"queryState":"COMPLETED"
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
index 3a20fadb2b..1604ff24a3 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
@@ -55,8 +55,7 @@ import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
 @JsonPropertyOrder({ "name", "type", "mode", "format", "default",
     "properties" })
-public abstract class AbstractColumnMetadata extends AbstractPropertied
-    implements ColumnMetadata {
+public abstract class AbstractColumnMetadata extends AbstractPropertied implements ColumnMetadata {
 
   // Capture the key schema information. We cannot use the MaterializedField
   // or MajorType because they encode child information that we encode here
@@ -156,6 +155,9 @@ public abstract class AbstractColumnMetadata extends AbstractPropertied
   @Override
   public boolean isDict() { return false; }
 
+  @Override
+  public boolean isScalar() { return false; }
+
   @Override
   public boolean isDynamic() { return false; }
 
@@ -376,4 +378,4 @@ public abstract class AbstractColumnMetadata extends AbstractPropertied
   protected String escapeSpecialSymbols(String value) {
     return value.replaceAll("(\\\\)|(`)", "\\\\$0");
   }
-}
\ No newline at end of file
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
index 2117bed12f..0f78f55ee9 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
@@ -232,6 +232,7 @@ public interface ColumnMetadata extends Propertied {
   boolean isMap();
   boolean isVariant();
   boolean isDict();
+  boolean isScalar();
 
   /**
    * Reports if the column is dynamic. A dynamic column is one with
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
index e79412a2e1..375a43d589 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
@@ -200,10 +200,13 @@ public class MetadataUtils {
     return new DictColumnMetadata(name, DataMode.REPEATED);
   }
 
-  public static PrimitiveColumnMetadata newScalar(String name, MinorType type,
-      DataMode mode) {
+  public static PrimitiveColumnMetadata newScalar(String name, MinorType type, DataMode mode) {
+    return newScalar(name, type, mode, false);
+  }
+
+  public static PrimitiveColumnMetadata newScalar(String name, MinorType type, DataMode mode, boolean schemaForUnknown) {
     assert isScalar(type);
-    return new PrimitiveColumnMetadata(name, type, mode);
+    return new PrimitiveColumnMetadata(name, type, mode, schemaForUnknown);
   }
 
   public static PrimitiveColumnMetadata newScalar(String name, MajorType type) {
@@ -212,13 +215,11 @@ public class MetadataUtils {
     return new PrimitiveColumnMetadata(name, type);
   }
 
-  public static ColumnMetadata newDecimal(String name, DataMode mode,
-      int precision, int scale) {
+  public static ColumnMetadata newDecimal(String name, DataMode mode, int precision, int scale) {
     return newDecimal(name, MinorType.VARDECIMAL, mode, precision, scale);
   }
 
-  public static ColumnMetadata newDecimal(String name, MinorType type, DataMode mode,
-      int precision, int scale) {
+  public static ColumnMetadata newDecimal(String name, MinorType type, DataMode mode, int precision, int scale) {
     if (precision < 0 ) {
       throw new IllegalArgumentException("Precision cannot be negative : " +
           precision);
@@ -278,8 +279,7 @@ public class MetadataUtils {
            col.name().equals(DynamicColumn.WILDCARD);
   }
 
-  public static ColumnMetadata cloneMapWithSchema(ColumnMetadata source,
-      TupleMetadata members) {
+  public static ColumnMetadata cloneMapWithSchema(ColumnMetadata source, TupleMetadata members) {
     return newMap(source.name(), source.mode(), members);
   }
 
@@ -292,8 +292,7 @@ public class MetadataUtils {
     }
   }
 
-  public static TupleMetadata diffTuple(TupleMetadata base,
-      TupleMetadata subtend) {
+  public static TupleMetadata diffTuple(TupleMetadata base, TupleMetadata subtend) {
     TupleMetadata diff = new TupleSchema();
     for (ColumnMetadata col : base) {
       ColumnMetadata other = subtend.metadata(col.name());
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java
index bb37b0ff66..78f4008caa 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java
@@ -53,6 +53,8 @@ import org.slf4j.LoggerFactory;
 public class PrimitiveColumnMetadata extends AbstractColumnMetadata {
   private static final Logger logger = LoggerFactory.getLogger(PrimitiveColumnMetadata.class);
 
+  private boolean forUnknownSchema;
+
   public PrimitiveColumnMetadata(MaterializedField schema) {
     super(schema);
   }
@@ -65,6 +67,11 @@ public class PrimitiveColumnMetadata extends AbstractColumnMetadata {
     super(name, type, mode);
   }
 
+  public PrimitiveColumnMetadata(String name, MinorType type, DataMode mode, boolean forUnknownSchema) {
+    this(name, type, mode);
+    this.forUnknownSchema = forUnknownSchema;
+  }
+
   private int estimateWidth(MajorType majorType) {
     if (type() == MinorType.NULL || type() == MinorType.LATE) {
       return 0;
@@ -332,4 +339,14 @@ public class PrimitiveColumnMetadata extends AbstractColumnMetadata {
         return true;
     }
   }
+
+  @Override
+  public boolean isScalar() { return true; }
+
+  /**
+   * @return true in case this primitive is created for unknown schema, for instance the column with all null values
+   */
+  public boolean isSchemaForUnknown() {
+    return forUnknownSchema;
+  }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java
index 099f113111..ed429eb178 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.record.MaterializedField;
 
 public interface NullableVector extends ValueVector {
 
-  public interface Mutator extends ValueVector.Mutator {
+  interface Mutator extends ValueVector.Mutator {
 
     /**
      * Used by the vector accessors to force the last set value.
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
index 16155f4a0a..897df0c54e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
@@ -246,15 +246,13 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
   @Override
   public int addColumn(ColumnMetadata column) {
     verifyAddColumn(column.name());
-    return addColumnWriter(
-        (AbstractObjectWriter) listener.addColumn(this, column));
+    return addColumnWriter((AbstractObjectWriter) listener.addColumn(this, column));
   }
 
   @Override
   public int addColumn(MaterializedField field) {
     verifyAddColumn(field.getName());
-    return addColumnWriter(
-        (AbstractObjectWriter) listener.addColumn(this, field));
+    return addColumnWriter((AbstractObjectWriter) listener.addColumn(this, field));
   }
 
   private void verifyAddColumn(String colName) {