You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/03/15 12:21:48 UTC

[drill] branch master updated: DRILL-8167: Add JSON Config Options to Format Config (#2494)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 9475181  DRILL-8167: Add JSON Config Options to Format Config (#2494)
9475181 is described below

commit 9475181e7781803b55f28bac057e0b79b47f9da4
Author: Charles S. Givre <cg...@apache.org>
AuthorDate: Tue Mar 15 08:21:37 2022 -0400

    DRILL-8167: Add JSON Config Options to Format Config (#2494)
    
    * Initial work'
    
    * Initial commit
    
    * Added unit tests
    
    * Code cleanup and final param
    
    * Fixed Unit Test
    
    * Addressed Review Comments
---
 .../exec/store/easy/json/JSONFormatPlugin.java     |  65 ++++++++-
 .../exec/store/easy/json/JSONRecordReader.java     | 115 +++++++++++++--
 .../drill/exec/store/DropboxFileSystemTest.java    |   2 +-
 .../store/dfs/TestFormatPluginOptionExtractor.java |   2 +-
 .../drill/exec/store/json/TestJsonModes.java       | 162 +++++++++++++++++++++
 .../apache/drill/test/PhysicalOpUnitTestBase.java  |   2 +-
 .../src/test/resources/jsoninput/nan_test.json     |   1 +
 7 files changed, 324 insertions(+), 25 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 34da747..f7a25d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
@@ -70,7 +71,7 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
 
   public JSONFormatPlugin(String name, DrillbitContext context,
       Configuration fsConf, StoragePluginConfig storageConfig) {
-    this(name, context, fsConf, storageConfig, new JSONFormatConfig(null));
+    this(name, context, fsConf, storageConfig, new JSONFormatConfig(null, null, null, null, null, null));
   }
 
   public JSONFormatPlugin(String name, DrillbitContext context,
@@ -85,7 +86,7 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
                                       FileWork fileWork,
                                       List<SchemaPath> columns,
                                       String userName) {
-    return new JSONRecordReader(context, fileWork.getPath(), dfs, columns);
+    return new JSONRecordReader(context, fileWork.getPath(), dfs, columns, formatConfig);
   }
 
   @Override
@@ -178,12 +179,27 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
     private static final List<String> DEFAULT_EXTS = ImmutableList.of("json");
 
     private final List<String> extensions;
+    private final Boolean allTextMode;
+    private final Boolean readNumbersAsDouble;
+    private final Boolean skipMalformedJSONRecords;
+    private final Boolean escapeAnyChar;
+    private final Boolean nanInf;
 
     @JsonCreator
     public JSONFormatConfig(
-        @JsonProperty("extensions") List<String> extensions) {
+        @JsonProperty("extensions") List<String> extensions,
+        @JsonProperty("allTextMode") Boolean allTextMode,
+        @JsonProperty("readNumbersAsDouble") Boolean readNumbersAsDouble,
+        @JsonProperty("skipMalformedJSONRecords") Boolean skipMalformedJSONRecords,
+        @JsonProperty("escapeAnyChar") Boolean escapeAnyChar,
+        @JsonProperty("nanInf") Boolean nanInf) {
       this.extensions = extensions == null ?
           DEFAULT_EXTS : ImmutableList.copyOf(extensions);
+      this.allTextMode = allTextMode;
+      this.readNumbersAsDouble = readNumbersAsDouble;
+      this.skipMalformedJSONRecords = skipMalformedJSONRecords;
+      this.escapeAnyChar = escapeAnyChar;
+      this.nanInf = nanInf;
     }
 
     @JsonInclude(JsonInclude.Include.NON_DEFAULT)
@@ -191,9 +207,34 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
       return extensions;
     }
 
+    @JsonInclude(JsonInclude.Include.NON_ABSENT)
+    public Boolean getAllTextMode() {
+      return allTextMode;
+    }
+
+    @JsonInclude(JsonInclude.Include.NON_ABSENT)
+    public Boolean getReadNumbersAsDouble() {
+      return readNumbersAsDouble;
+    }
+
+    @JsonInclude(JsonInclude.Include.NON_ABSENT)
+    public Boolean getSkipMalformedJSONRecords() {
+      return skipMalformedJSONRecords;
+    }
+
+    @JsonInclude(Include.NON_ABSENT)
+    public Boolean getEscapeAnyChar() {
+      return escapeAnyChar;
+    }
+
+    @JsonInclude(JsonInclude.Include.NON_ABSENT)
+    public Boolean getNanInf() {
+      return nanInf;
+    }
+
     @Override
     public int hashCode() {
-      return Objects.hash(extensions);
+      return Objects.hash(extensions, allTextMode, readNumbersAsDouble, skipMalformedJSONRecords, escapeAnyChar, nanInf);
     }
 
     @Override
@@ -205,14 +246,24 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
         return false;
       }
       JSONFormatConfig other = (JSONFormatConfig) obj;
-      return Objects.deepEquals(extensions, other.extensions);
+      return Objects.deepEquals(extensions, other.extensions) &&
+        Objects.equals(allTextMode, other.allTextMode) &&
+        Objects.equals(readNumbersAsDouble, other.readNumbersAsDouble) &&
+        Objects.equals(skipMalformedJSONRecords, other.skipMalformedJSONRecords) &&
+        Objects.equals(escapeAnyChar, other.escapeAnyChar) &&
+        Objects.equals(nanInf, other.nanInf);
     }
 
     @Override
     public String toString() {
       return new PlanStringBuilder(this)
-          .field("extensions", extensions)
-          .toString();
+        .field("extensions", extensions)
+        .field("allTextMode", allTextMode)
+        .field("readNumbersAsDouble", readNumbersAsDouble)
+        .field("skipMalformedRecords", skipMalformedJSONRecords)
+        .field("escapeAnyChar", escapeAnyChar)
+        .field("nanInf", nanInf)
+        .toString();
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 19367c3..d5f498b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
 
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -29,8 +30,11 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue.OptionScope;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
 import org.apache.drill.exec.store.easy.json.JsonProcessor.ReadState;
 import org.apache.drill.exec.store.easy.json.reader.CountingJsonReader;
 import org.apache.drill.exec.vector.BaseValueVector;
@@ -49,6 +53,7 @@ public class JSONRecordReader extends AbstractRecordReader {
   private static final Logger logger = LoggerFactory.getLogger(JSONRecordReader.class);
 
   public static final long DEFAULT_ROWS_PER_BATCH = BaseValueVector.INITIAL_VALUE_ALLOCATION;
+  private static final OptionScope MIN_SCOPE = OptionScope.SESSION;
 
   private VectorContainerWriter writer;
 
@@ -69,6 +74,7 @@ public class JSONRecordReader extends AbstractRecordReader {
   private long parseErrorCount;
   private final boolean skipMalformedJSONRecords;
   private final boolean printSkippedMalformedJSONRecordLineNumber;
+  private final JSONFormatConfig config;
   private ReadState write;
   private InputStream inputStream;
 
@@ -78,38 +84,51 @@ public class JSONRecordReader extends AbstractRecordReader {
    * @param inputPath the input path
    * @param fileSystem a Drill file system wrapper around the file system implementation
    * @param columns path names of columns/subfields to read
-   * @throws OutOfMemoryException
+   * @param config The JSONFormatConfig for the storage plugin
+   * @throws OutOfMemoryException If there is insufficient memory, Drill will throw an Out of Memory Exception
    */
   public JSONRecordReader(FragmentContext fragmentContext, Path inputPath, DrillFileSystem fileSystem,
-      List<SchemaPath> columns) throws OutOfMemoryException {
-    this(fragmentContext, inputPath, null, fileSystem, columns, false);
+      List<SchemaPath> columns, JSONFormatConfig config) throws OutOfMemoryException {
+    this(fragmentContext, inputPath, null, fileSystem, columns, false, config);
   }
 
   /**
-   * Create a new JSON Record Reader that uses a in memory materialized JSON stream.
+   * Create a new JSON Record Reader that uses an in memory materialized JSON stream.
    * @param fragmentContext the Drill fragment
    * @param embeddedContent embedded content
    * @param fileSystem a Drill file system wrapper around the file system implementation
    * @param columns path names of columns/subfields to read
-   * @throws OutOfMemoryException
+   * @throws OutOfMemoryException If Drill runs out of memory, OME will be thrown
    */
   public JSONRecordReader(FragmentContext fragmentContext, JsonNode embeddedContent, DrillFileSystem fileSystem,
       List<SchemaPath> columns) throws OutOfMemoryException {
-    this(fragmentContext, null, embeddedContent, fileSystem, columns, false);
+    this(fragmentContext, null, embeddedContent, fileSystem, columns, false,
+      new JSONFormatConfig(null,
+        embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR),
+        embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR),
+        fragmentContext.getOptions().getOption(ExecConstants.JSON_SKIP_MALFORMED_RECORDS_VALIDATOR),
+        fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ESCAPE_ANY_CHAR_VALIDATOR),
+        fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR)));
   }
 
   /**
    * Create a JSON Record Reader that uses an InputStream directly
    * @param fragmentContext the Drill fragment
    * @param columns path names of columns/subfields to read
-   * @throws OutOfMemoryException
+   * @throws OutOfMemoryException If there is insufficient memory, Drill will throw an Out of Memory Exception
    */
   public JSONRecordReader(FragmentContext fragmentContext, List<SchemaPath> columns) throws OutOfMemoryException {
-    this(fragmentContext, null, null, null, columns, true);
+    this(fragmentContext, null, null, null, columns, true,
+      new JSONFormatConfig(null,
+        fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR),
+        fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR),
+        fragmentContext.getOptions().getOption(ExecConstants.JSON_SKIP_MALFORMED_RECORDS_VALIDATOR),
+        fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ESCAPE_ANY_CHAR_VALIDATOR),
+        fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR)));
   }
 
   private JSONRecordReader(FragmentContext fragmentContext, Path inputPath, JsonNode embeddedContent,
-      DrillFileSystem fileSystem, List<SchemaPath> columns, boolean hasInputStream) {
+      DrillFileSystem fileSystem, List<SchemaPath> columns, boolean hasInputStream, JSONFormatConfig config) {
 
     Preconditions.checkArgument(
         (inputPath == null && embeddedContent != null && !hasInputStream) ||
@@ -118,25 +137,91 @@ public class JSONRecordReader extends AbstractRecordReader {
       "One of inputPath, inputStream or embeddedContent must be set but not all."
         );
 
+    OptionManager contextOpts = fragmentContext.getOptions();
+
     if (inputPath != null) {
       this.hadoopPath = inputPath;
     } else {
       this.embeddedContent = embeddedContent;
     }
 
+    // If the config is null, create a temporary one with the global options.
+    if (config == null) {
+      this.config = new JSONFormatConfig(null,
+        embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR),
+        embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR),
+        fragmentContext.getOptions().getOption(ExecConstants.JSON_SKIP_MALFORMED_RECORDS_VALIDATOR),
+        fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ESCAPE_ANY_CHAR_VALIDATOR),
+        fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR));
+    } else {
+      this.config = config;
+    }
+
     this.fileSystem = fileSystem;
     this.fragmentContext = fragmentContext;
-    // only enable all text mode if we aren't using embedded content mode.
-    this.enableAllTextMode = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR);
-    this.enableNanInf = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR);
-    this.enableEscapeAnyChar = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ESCAPE_ANY_CHAR_VALIDATOR);
-    this.readNumbersAsDouble = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR);
+
+    this.enableAllTextMode = allTextMode(contextOpts);
+    this.enableNanInf = nanInf(contextOpts);
+    this.enableEscapeAnyChar = escapeAnyChar(contextOpts);
+    this.readNumbersAsDouble = readNumbersAsDouble(contextOpts);
     this.unionEnabled = embeddedContent == null && fragmentContext.getOptions().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
-    this.skipMalformedJSONRecords = fragmentContext.getOptions().getOption(ExecConstants.JSON_SKIP_MALFORMED_RECORDS_VALIDATOR);
+    this.skipMalformedJSONRecords = skipMalformedJSONRecords(contextOpts);
     this.printSkippedMalformedJSONRecordLineNumber = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG_VALIDATOR);
     setColumns(columns);
   }
 
+  /**
+   * Returns the value of the all text mode.  Values set in the format config will override global values.
+   * @return The value of allTextMode
+   */
+  private boolean allTextMode(OptionManager contextOpts) {
+    // only enable all text mode if we aren't using embedded content mode.
+    boolean allTextMode = (Boolean) ObjectUtils.firstNonNull(
+      contextOpts.getOption(ExecConstants.JSON_ALL_TEXT_MODE).getValueMinScope(MIN_SCOPE),
+      config.getAllTextMode(),
+      contextOpts.getBoolean(ExecConstants.JSON_ALL_TEXT_MODE)
+    );
+
+    return embeddedContent == null && allTextMode;
+  }
+
+  private boolean readNumbersAsDouble(OptionManager contextOpts) {
+    boolean numbersAsDouble = (Boolean) ObjectUtils.firstNonNull(
+      contextOpts.getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).getValueMinScope(MIN_SCOPE),
+      config.getReadNumbersAsDouble(),
+      contextOpts.getBoolean(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE)
+    );
+
+    return embeddedContent == null && numbersAsDouble;
+  }
+
+  private boolean skipMalformedJSONRecords(OptionManager contextOpts) {
+    boolean skipMalformedRecords = (Boolean) ObjectUtils.firstNonNull(
+      contextOpts.getOption(ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG).getValueMinScope(MIN_SCOPE),
+      config.getSkipMalformedJSONRecords(),
+      contextOpts.getBoolean(ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG)
+    );
+    return embeddedContent == null && skipMalformedRecords;
+  }
+
+  private boolean escapeAnyChar(OptionManager contextOpts) {
+    boolean allowNaN = (Boolean) ObjectUtils.firstNonNull(
+      contextOpts.getOption(ExecConstants.JSON_READER_ESCAPE_ANY_CHAR).getValueMinScope(MIN_SCOPE),
+      config.getEscapeAnyChar(),
+      contextOpts.getBoolean(ExecConstants.JSON_READER_ESCAPE_ANY_CHAR)
+    );
+    return embeddedContent == null && allowNaN;
+  }
+
+  private boolean nanInf(OptionManager contextOpts) {
+    boolean allowNaN = (Boolean) ObjectUtils.firstNonNull(
+      contextOpts.getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS).getValueMinScope(MIN_SCOPE),
+      config.getNanInf(),
+      contextOpts.getBoolean(ExecConstants.JSON_READER_NAN_INF_NUMBERS)
+    );
+    return embeddedContent == null && allowNaN;
+  }
+
   @Override
   public String toString() {
     return super.toString()
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java
index 9385e38..317c0c7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java
@@ -79,7 +79,7 @@ public class DropboxFileSystemTest extends ClusterTest {
     Map<String, FormatPluginConfig> formats = new HashMap<>();
     List<String> jsonExtensions = new ArrayList<>();
     jsonExtensions.add("json");
-    FormatPluginConfig jsonFormatConfig = new JSONFormatConfig(jsonExtensions);
+    FormatPluginConfig jsonFormatConfig = new JSONFormatConfig(jsonExtensions, null, null, null, null, null);
 
     // CSV Format
     List<String> csvExtensions = new ArrayList<>();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
index d9848b6..88a7b94 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
@@ -64,7 +64,7 @@ public class TestFormatPluginOptionExtractor extends BaseTest {
           );
           break;
         case "json":
-          assertEquals(d.typeName, "(type: String)", d.presentParams());
+          assertEquals(d.typeName, "(type: String, allTextMode: Boolean, readNumbersAsDouble: Boolean, skipMalformedJSONRecords: Boolean, escapeAnyChar: Boolean, nanInf: Boolean)", d.presentParams());
           break;
         case "sequencefile":
           assertEquals(d.typeName, "(type: String)", d.presentParams());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonModes.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonModes.java
new file mode 100644
index 0000000..1af8e1d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonModes.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.json;
+
+import org.apache.drill.categories.RowSetTests;
+
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.DirectRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category(RowSetTests.class)
+public class TestJsonModes extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+  }
+
+  @Test
+  public void testAllTextMode() throws Exception {
+    String sql = "SELECT `integer`, `float` FROM cp.`jsoninput/input2.json`";
+    RowSet results  = client.queryBuilder().sql(sql).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("integer", MinorType.BIGINT)
+      .addNullable("float", MinorType.FLOAT8)
+      .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(2010, 17.4)
+      .addRow(-2002, -1.2)
+      .addRow(2001, 1.2)
+      .addRow(6005, 1.2)
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+
+    // Now try with all text mode
+    sql = "SELECT `integer`, `float` FROM table(cp.`jsoninput/input2.json` (type => 'json', allTextMode => True))";
+    results  = client.queryBuilder().sql(sql).rowSet();
+    expectedSchema = new SchemaBuilder()
+      .addNullable("integer", MinorType.VARCHAR)
+      .addNullable("float", MinorType.VARCHAR)
+      .build();
+
+    expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("2010", "17.4")
+      .addRow("-2002", "-1.2")
+      .addRow("2001", "1.2")
+      .addRow("6005", "1.2")
+      .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testReadDoubles() throws Exception {
+    String sql = "SELECT `integer`, `float` FROM table(cp.`jsoninput/input2.json` (type => 'json', readNumbersAsDouble => True))";
+    DirectRowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("integer", MinorType.FLOAT8)
+      .addNullable("float", MinorType.FLOAT8)
+      .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(2010.0, 17.4)
+      .addRow(-2002.0, -1.2)
+      .addRow(2001.0, 1.2)
+      .addRow(6005.0, 1.2)
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testSkipInvalidRecords() throws Exception {
+    String sql = "SELECT SUM(balance) FROM cp.`jsoninput/drill4653/file.json`";
+    DirectRowSet results;
+    try {
+      client.queryBuilder().sql(sql).rowSet();
+      fail();
+    } catch (UserRemoteException e) {
+      assertTrue(e.getMessage().contains("Error parsing JSON - Illegal unquoted character"));
+    }
+
+    sql = "SELECT SUM(balance) AS total FROM table(cp.`jsoninput/drill4653/file.json` (type => 'json', skipMalformedJSONRecords => True))";
+    results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("total", MinorType.FLOAT8)
+      .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(6003.9)
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testNanInf() throws Exception {
+    String sql = "SELECT * FROM cp.`jsoninput/nan_test.json`";
+    DirectRowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("nan_col", MinorType.FLOAT8)
+      .addNullable("inf_col", MinorType.FLOAT8)
+      .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(Double.NaN, Double.POSITIVE_INFINITY)
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+
+    // Now disallow NaN
+    sql = "SELECT * FROM table(cp.`jsoninput/nan_test.json` (type => 'json', nanInf => False))";
+    try {
+      client.queryBuilder().sql(sql).rowSet();
+      fail();
+    } catch (UserRemoteException e) {
+      assertTrue(e.getMessage().contains("Error parsing JSON - Non-standard token 'NaN'"));
+    }
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String sql = "SELECT COUNT(*) as cnt FROM cp.`jsoninput/input2.json`";
+    String plan = queryBuilder().sql(sql).explainJson();
+    long cnt = queryBuilder().physical(plan).singletonLong();
+    assertEquals("Counts should match", 4L, cnt);
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index a329b53..8d20789 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -399,7 +399,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
   public static Iterator<RecordReader> getJsonReadersFromInputFiles(DrillFileSystem fs, List<Path> inputPaths, FragmentContext fragContext, List<SchemaPath> columnsToRead) {
     List<RecordReader> readers = new ArrayList<>();
     for (Path inputPath : inputPaths) {
-      readers.add(new JSONRecordReader(fragContext, inputPath, fs, columnsToRead));
+      readers.add(new JSONRecordReader(fragContext, inputPath, fs, columnsToRead, null));
     }
     return readers.iterator();
   }
diff --git a/exec/java-exec/src/test/resources/jsoninput/nan_test.json b/exec/java-exec/src/test/resources/jsoninput/nan_test.json
new file mode 100644
index 0000000..31335dc
--- /dev/null
+++ b/exec/java-exec/src/test/resources/jsoninput/nan_test.json
@@ -0,0 +1 @@
+{"nan_col":NaN, "inf_col":Infinity}