You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/09/29 20:01:23 UTC

spark git commit: [SPARK-17699] Support for parsing JSON string columns

Repository: spark
Updated Branches:
  refs/heads/master 027dea8f2 -> fe33121a5


[SPARK-17699] Support for parsing JSON string columns

Spark SQL has great support for reading text files that contain JSON data.  However, in many cases the JSON data is just one column amongst others.  This is particularly true when reading from sources such as Kafka.  This PR adds a new functions `from_json` that converts a string column into a nested `StructType` with a user specified schema.

Example usage:
```scala
val df = Seq("""{"a": 1}""").toDS()
val schema = new StructType().add("a", IntegerType)

df.select(from_json($"value", schema) as 'json) // => [json: <a: int>]
```

This PR adds support for java, scala and python.  I leveraged our existing JSON parsing support by moving it into catalyst (so that we could define expressions using it).  I left SQL out for now, because I'm not sure how users would specify a schema.

Author: Michael Armbrust <mi...@databricks.com>

Closes #15274 from marmbrus/jsonParser.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe33121a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe33121a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe33121a

Branch: refs/heads/master
Commit: fe33121a53384811a8e094ab6c05dc85b7c7ca87
Parents: 027dea8
Author: Michael Armbrust <mi...@databricks.com>
Authored: Thu Sep 29 13:01:10 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu Sep 29 13:01:10 2016 -0700

----------------------------------------------------------------------
 python/pyspark/sql/functions.py                 |  23 +
 .../catalyst/expressions/jsonExpressions.scala  |  31 +-
 .../spark/sql/catalyst/json/JSONOptions.scala   |  84 ++++
 .../spark/sql/catalyst/json/JacksonParser.scala | 443 +++++++++++++++++++
 .../spark/sql/catalyst/json/JacksonUtils.scala  |  32 ++
 .../sql/catalyst/util/CompressionCodecs.scala   |  72 +++
 .../spark/sql/catalyst/util/ParseModes.scala    |  41 ++
 .../expressions/JsonExpressionsSuite.scala      |  26 ++
 .../org/apache/spark/sql/DataFrameReader.scala  |   5 +-
 .../datasources/CompressionCodecs.scala         |  72 ---
 .../sql/execution/datasources/ParseModes.scala  |  41 --
 .../datasources/csv/CSVFileFormat.scala         |   1 +
 .../execution/datasources/csv/CSVOptions.scala  |   2 +-
 .../datasources/json/InferSchema.scala          |   3 +-
 .../datasources/json/JSONOptions.scala          |  84 ----
 .../datasources/json/JacksonGenerator.scala     |   3 +-
 .../datasources/json/JacksonParser.scala        | 440 ------------------
 .../datasources/json/JacksonUtils.scala         |  32 --
 .../datasources/json/JsonFileFormat.scala       |   2 +
 .../datasources/text/TextFileFormat.scala       |   1 +
 .../scala/org/apache/spark/sql/functions.scala  |  58 +++
 .../apache/spark/sql/JsonFunctionsSuite.scala   |  29 ++
 .../json/JsonParsingOptionsSuite.scala          |   1 +
 .../execution/datasources/json/JsonSuite.scala  |   3 +-
 24 files changed, 852 insertions(+), 677 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 89b3c07..45d6bf9 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1706,6 +1706,29 @@ def json_tuple(col, *fields):
     return Column(jc)
 
 
+@since(2.1)
+def from_json(col, schema, options={}):
+    """
+    Parses a column containing a JSON string into a [[StructType]] with the
+    specified schema. Returns `null`, in the case of an unparseable string.
+
+    :param col: string column in json format
+    :param schema: a StructType to use when parsing the json column
+    :param options: options to control parsing. accepts the same options as the json datasource
+
+    >>> from pyspark.sql.types import *
+    >>> data = [(1, '''{"a": 1}''')]
+    >>> schema = StructType([StructField("a", IntegerType())])
+    >>> df = spark.createDataFrame(data, ("key", "value"))
+    >>> df.select(from_json(df.value, schema).alias("json")).collect()
+    [Row(json=Row(a=1))]
+    """
+
+    sc = SparkContext._active_spark_context
+    jc = sc._jvm.functions.from_json(_to_java_column(col), schema.json(), options)
+    return Column(jc)
+
+
 @since(1.5)
 def size(col):
     """

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index c14a2fb..65dbd6a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -23,10 +23,12 @@ import scala.util.parsing.combinator.RegexParsers
 
 import com.fasterxml.jackson.core._
 
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions, SparkSQLJsonProcessingException}
+import org.apache.spark.sql.catalyst.util.ParseModes
+import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
@@ -467,3 +469,28 @@ case class JsonTuple(children: Seq[Expression])
   }
 }
 
+/**
+ * Converts an json input string to a [[StructType]] with the specified schema.
+ */
+case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression)
+  extends Expression with CodegenFallback with ExpectsInputTypes {
+  override def nullable: Boolean = true
+
+  @transient
+  lazy val parser =
+    new JacksonParser(
+      schema,
+      "invalid", // Not used since we force fail fast.  Invalid rows will be set to `null`.
+      new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))
+
+  override def dataType: DataType = schema
+  override def children: Seq[Expression] = child :: Nil
+
+  override def eval(input: InternalRow): Any = {
+    try parser.parse(child.eval(input).toString).head catch {
+      case _: SparkSQLJsonProcessingException => null
+    }
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
new file mode 100644
index 0000000..aec1892
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.json
+
+import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import org.apache.commons.lang3.time.FastDateFormat
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
+
+/**
+ * Options for parsing JSON data into Spark SQL rows.
+ *
+ * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
+ */
+private[sql] class JSONOptions(
+    @transient private val parameters: Map[String, String])
+  extends Logging with Serializable  {
+
+  val samplingRatio =
+    parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
+  val primitivesAsString =
+    parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
+  val prefersDecimal =
+    parameters.get("prefersDecimal").map(_.toBoolean).getOrElse(false)
+  val allowComments =
+    parameters.get("allowComments").map(_.toBoolean).getOrElse(false)
+  val allowUnquotedFieldNames =
+    parameters.get("allowUnquotedFieldNames").map(_.toBoolean).getOrElse(false)
+  val allowSingleQuotes =
+    parameters.get("allowSingleQuotes").map(_.toBoolean).getOrElse(true)
+  val allowNumericLeadingZeros =
+    parameters.get("allowNumericLeadingZeros").map(_.toBoolean).getOrElse(false)
+  val allowNonNumericNumbers =
+    parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true)
+  val allowBackslashEscapingAnyCharacter =
+    parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
+  val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
+  private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
+  val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord")
+
+  // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
+  val dateFormat: FastDateFormat =
+    FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"))
+
+  val timestampFormat: FastDateFormat =
+    FastDateFormat.getInstance(
+      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"))
+
+  // Parse mode flags
+  if (!ParseModes.isValidMode(parseMode)) {
+    logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
+  }
+
+  val failFast = ParseModes.isFailFastMode(parseMode)
+  val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
+  val permissive = ParseModes.isPermissiveMode(parseMode)
+
+  /** Sets config options on a Jackson [[JsonFactory]]. */
+  def setJacksonOptions(factory: JsonFactory): Unit = {
+    factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)
+    factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, allowUnquotedFieldNames)
+    factory.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, allowSingleQuotes)
+    factory.configure(JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS, allowNumericLeadingZeros)
+    factory.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, allowNonNumericNumbers)
+    factory.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER,
+      allowBackslashEscapingAnyCharacter)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
new file mode 100644
index 0000000..f80e637
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.json
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import com.fasterxml.jackson.core._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
+
+private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
+
+/**
+ * Constructs a parser for a given schema that translates a json string to an [[InternalRow]].
+ */
+class JacksonParser(
+    schema: StructType,
+    columnNameOfCorruptRecord: String,
+    options: JSONOptions) extends Logging {
+
+  import JacksonUtils._
+  import ParseModes._
+  import com.fasterxml.jackson.core.JsonToken._
+
+  // A `ValueConverter` is responsible for converting a value from `JsonParser`
+  // to a value in a field for `InternalRow`.
+  private type ValueConverter = (JsonParser) => Any
+
+  // `ValueConverter`s for the root schema for all fields in the schema
+  private val rootConverter: ValueConverter = makeRootConverter(schema)
+
+  private val factory = new JsonFactory()
+  options.setJacksonOptions(factory)
+
+  private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length))
+
+  @transient
+  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+
+  /**
+   * This function deals with the cases it fails to parse. This function will be called
+   * when exceptions are caught during converting. This functions also deals with `mode` option.
+   */
+  private def failedRecord(record: String): Seq[InternalRow] = {
+    // create a row even if no corrupt record column is present
+    if (options.failFast) {
+      throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record")
+    }
+    if (options.dropMalformed) {
+      if (!isWarningPrintedForMalformedRecord) {
+        logWarning(
+          s"""Found at least one malformed records (sample: $record). The JSON reader will drop
+             |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which
+             |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE
+             |mode and use the default inferred schema.
+             |
+             |Code example to print all malformed records (scala):
+             |===================================================
+             |// The corrupted record exists in column ${columnNameOfCorruptRecord}
+             |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+             |
+           """.stripMargin)
+        isWarningPrintedForMalformedRecord = true
+      }
+      Nil
+    } else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
+      if (!isWarningPrintedForMalformedRecord) {
+        logWarning(
+          s"""Found at least one malformed records (sample: $record). The JSON reader will replace
+             |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode.
+             |To find out which corrupted records have been replaced with null, please use the
+             |default inferred schema instead of providing a custom schema.
+             |
+             |Code example to print all malformed records (scala):
+             |===================================================
+             |// The corrupted record exists in column ${columnNameOfCorruptRecord}.
+             |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+             |
+           """.stripMargin)
+        isWarningPrintedForMalformedRecord = true
+      }
+      emptyRow
+    } else {
+      val row = new GenericMutableRow(schema.length)
+      for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
+        require(schema(corruptIndex).dataType == StringType)
+        row.update(corruptIndex, UTF8String.fromString(record))
+      }
+      Seq(row)
+    }
+  }
+
+  /**
+   * Create a converter which converts the JSON documents held by the `JsonParser`
+   * to a value according to a desired schema. This is a wrapper for the method
+   * `makeConverter()` to handle a row wrapped with an array.
+   */
+  def makeRootConverter(dataType: DataType): ValueConverter = dataType match {
+    case st: StructType =>
+      val elementConverter = makeConverter(st)
+      val fieldConverters = st.map(_.dataType).map(makeConverter)
+      (parser: JsonParser) => parseJsonToken(parser, dataType) {
+        case START_OBJECT => convertObject(parser, st, fieldConverters)
+          // SPARK-3308: support reading top level JSON arrays and take every element
+          // in such an array as a row
+          //
+          // For example, we support, the JSON data as below:
+          //
+          // [{"a":"str_a_1"}]
+          // [{"a":"str_a_2"}, {"b":"str_b_3"}]
+          //
+          // resulting in:
+          //
+          // List([str_a_1,null])
+          // List([str_a_2,null], [null,str_b_3])
+          //
+        case START_ARRAY => convertArray(parser, elementConverter)
+      }
+
+    case ArrayType(st: StructType, _) =>
+      val elementConverter = makeConverter(st)
+      val fieldConverters = st.map(_.dataType).map(makeConverter)
+      (parser: JsonParser) => parseJsonToken(parser, dataType) {
+        // the business end of SPARK-3308:
+        // when an object is found but an array is requested just wrap it in a list.
+        // This is being wrapped in `JacksonParser.parse`.
+        case START_OBJECT => convertObject(parser, st, fieldConverters)
+        case START_ARRAY => convertArray(parser, elementConverter)
+      }
+
+    case _ => makeConverter(dataType)
+  }
+
+  /**
+   * Create a converter which converts the JSON documents held by the `JsonParser`
+   * to a value according to a desired schema.
+   */
+  private def makeConverter(dataType: DataType): ValueConverter = dataType match {
+    case BooleanType =>
+      (parser: JsonParser) => parseJsonToken(parser, dataType) {
+        case VALUE_TRUE => true
+        case VALUE_FALSE => false
+      }
+
+    case ByteType =>
+      (parser: JsonParser) => parseJsonToken(parser, dataType) {
+        case VALUE_NUMBER_INT => parser.getByteValue
+      }
+
+    case ShortType =>
+      (parser: JsonParser) => parseJsonToken(parser, dataType) {
+        case VALUE_NUMBER_INT => parser.getShortValue
+      }
+
+    case IntegerType =>
+      (parser: JsonParser) => parseJsonToken(parser, dataType) {
+        case VALUE_NUMBER_INT => parser.getIntValue
+      }
+
+    case LongType =>
+      (parser: JsonParser) => parseJsonToken(parser, dataType) {
+        case VALUE_NUMBER_INT => parser.getLongValue
+      }
+
+    case FloatType =>
+      (parser: JsonParser) => parseJsonToken(parser, dataType) {
+        case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
+          parser.getFloatValue
+
+        case VALUE_STRING =>
+          // Special case handling for NaN and Infinity.
+          val value = parser.getText
+          val lowerCaseValue = value.toLowerCase
+          if (lowerCaseValue.equals("nan") ||
+            lowerCaseValue.equals("infinity") ||
+            lowerCaseValue.equals("-infinity") ||
+            lowerCaseValue.equals("inf") ||
+            lowerCaseValue.equals("-inf")) {
+            value.toFloat
+          } else {
+            throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.")
+          }
+      }
+
+    case DoubleType =>
+      (parser: JsonParser) => parseJsonToken(parser, dataType) {
+        case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
+          parser.getDoubleValue
+
+        case VALUE_STRING =>
+          // Special case handling for NaN and Infinity.
+          val value = parser.getText
+          val lowerCaseValue = value.toLowerCase
+          if (lowerCaseValue.equals("nan") ||
+            lowerCaseValue.equals("infinity") ||
+            lowerCaseValue.equals("-infinity") ||
+            lowerCaseValue.equals("inf") ||
+            lowerCaseValue.equals("-inf")) {
+            value.toDouble
+          } else {
+            throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.")
+          }
+      }
+
+    case StringType =>
+      (parser: JsonParser) => parseJsonToken(parser, dataType) {
+        case VALUE_STRING =>
+          UTF8String.fromString(parser.getText)
+
+        case _ =>
+          // Note that it always tries to convert the data as string without the case of failure.
+          val writer = new ByteArrayOutputStream()
+          Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
+            generator => generator.copyCurrentStructure(parser)
+          }
+          UTF8String.fromBytes(writer.toByteArray)
+      }
+
+    case TimestampType =>
+      (parser: JsonParser) => parseJsonToken(parser, dataType) {
+        case VALUE_STRING =>
+          // This one will lose microseconds parts.
+          // See https://issues.apache.org/jira/browse/SPARK-10681.
+          Try(options.timestampFormat.parse(parser.getText).getTime * 1000L)
+            .getOrElse {
+              // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+              // compatibility.
+              DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
+            }
+
+        case VALUE_NUMBER_INT =>
+          parser.getLongValue * 1000000L
+      }
+
+    case DateType =>
+      (parser: JsonParser) => parseJsonToken(parser, dataType) {
+        case VALUE_STRING =>
+          val stringValue = parser.getText
+          // This one will lose microseconds parts.
+          // See https://issues.apache.org/jira/browse/SPARK-10681.x
+          Try(DateTimeUtils.millisToDays(options.dateFormat.parse(parser.getText).getTime))
+            .getOrElse {
+            // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+            // compatibility.
+            Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime))
+              .getOrElse {
+              // In Spark 1.5.0, we store the data as number of days since epoch in string.
+              // So, we just convert it to Int.
+              stringValue.toInt
+            }
+          }
+      }
+
+    case BinaryType =>
+      (parser: JsonParser) => parseJsonToken(parser, dataType) {
+        case VALUE_STRING => parser.getBinaryValue
+      }
+
+    case dt: DecimalType =>
+      (parser: JsonParser) => parseJsonToken(parser, dataType) {
+        case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) =>
+          Decimal(parser.getDecimalValue, dt.precision, dt.scale)
+      }
+
+    case st: StructType =>
+      val fieldConverters = st.map(_.dataType).map(makeConverter)
+      (parser: JsonParser) => parseJsonToken(parser, dataType) {
+        case START_OBJECT => convertObject(parser, st, fieldConverters)
+      }
+
+    case at: ArrayType =>
+      val elementConverter = makeConverter(at.elementType)
+      (parser: JsonParser) => parseJsonToken(parser, dataType) {
+        case START_ARRAY => convertArray(parser, elementConverter)
+      }
+
+    case mt: MapType =>
+      val valueConverter = makeConverter(mt.valueType)
+      (parser: JsonParser) => parseJsonToken(parser, dataType) {
+        case START_OBJECT => convertMap(parser, valueConverter)
+      }
+
+    case udt: UserDefinedType[_] =>
+      makeConverter(udt.sqlType)
+
+    case _ =>
+      (parser: JsonParser) =>
+        // Here, we pass empty `PartialFunction` so that this case can be
+        // handled as a failed conversion. It will throw an exception as
+        // long as the value is not null.
+        parseJsonToken(parser, dataType)(PartialFunction.empty[JsonToken, Any])
+  }
+
+  /**
+   * This method skips `FIELD_NAME`s at the beginning, and handles nulls ahead before trying
+   * to parse the JSON token using given function `f`. If the `f` failed to parse and convert the
+   * token, call `failedConversion` to handle the token.
+   */
+  private def parseJsonToken(
+      parser: JsonParser,
+      dataType: DataType)(f: PartialFunction[JsonToken, Any]): Any = {
+    parser.getCurrentToken match {
+      case FIELD_NAME =>
+        // There are useless FIELD_NAMEs between START_OBJECT and END_OBJECT tokens
+        parser.nextToken()
+        parseJsonToken(parser, dataType)(f)
+
+      case null | VALUE_NULL => null
+
+      case other => f.applyOrElse(other, failedConversion(parser, dataType))
+    }
+  }
+
+  /**
+   * This function throws an exception for failed conversion, but returns null for empty string,
+   * to guard the non string types.
+   */
+  private def failedConversion(
+      parser: JsonParser,
+      dataType: DataType): PartialFunction[JsonToken, Any] = {
+    case VALUE_STRING if parser.getTextLength < 1 =>
+      // If conversion is failed, this produces `null` rather than throwing exception.
+      // This will protect the mismatch of types.
+      null
+
+    case token =>
+      // We cannot parse this token based on the given data type. So, we throw a
+      // SparkSQLJsonProcessingException and this exception will be caught by
+      // `parse` method.
+      throw new SparkSQLJsonProcessingException(
+        s"Failed to parse a value for data type $dataType (current token: $token).")
+  }
+
+  /**
+   * Parse an object from the token stream into a new Row representing the schema.
+   * Fields in the json that are not defined in the requested schema will be dropped.
+   */
+  private def convertObject(
+      parser: JsonParser,
+      schema: StructType,
+      fieldConverters: Seq[ValueConverter]): InternalRow = {
+    val row = new GenericMutableRow(schema.length)
+    while (nextUntil(parser, JsonToken.END_OBJECT)) {
+      schema.getFieldIndex(parser.getCurrentName) match {
+        case Some(index) =>
+          row.update(index, fieldConverters(index).apply(parser))
+
+        case None =>
+          parser.skipChildren()
+      }
+    }
+
+    row
+  }
+
+  /**
+   * Parse an object as a Map, preserving all fields.
+   */
+  private def convertMap(
+      parser: JsonParser,
+      fieldConverter: ValueConverter): MapData = {
+    val keys = ArrayBuffer.empty[UTF8String]
+    val values = ArrayBuffer.empty[Any]
+    while (nextUntil(parser, JsonToken.END_OBJECT)) {
+      keys += UTF8String.fromString(parser.getCurrentName)
+      values += fieldConverter.apply(parser)
+    }
+
+    ArrayBasedMapData(keys.toArray, values.toArray)
+  }
+
+  /**
+   * Parse an object as a Array.
+   */
+  private def convertArray(
+      parser: JsonParser,
+      fieldConverter: ValueConverter): ArrayData = {
+    val values = ArrayBuffer.empty[Any]
+    while (nextUntil(parser, JsonToken.END_ARRAY)) {
+      values += fieldConverter.apply(parser)
+    }
+
+    new GenericArrayData(values.toArray)
+  }
+
+  /**
+   * Parse the string JSON input to the set of [[InternalRow]]s.
+   */
+  def parse(input: String): Seq[InternalRow] = {
+    if (input.trim.isEmpty) {
+      Nil
+    } else {
+      try {
+        Utils.tryWithResource(factory.createParser(input)) { parser =>
+          parser.nextToken()
+          rootConverter.apply(parser) match {
+            case null => failedRecord(input)
+            case row: InternalRow => row :: Nil
+            case array: ArrayData =>
+              // Here, as we support reading top level JSON arrays and take every element
+              // in such an array as a row, this case is possible.
+              if (array.numElements() == 0) {
+                Nil
+              } else {
+                array.toArray[InternalRow](schema)
+              }
+            case _ =>
+              failedRecord(input)
+          }
+        }
+      } catch {
+        case _: JsonProcessingException =>
+          failedRecord(input)
+        case _: SparkSQLJsonProcessingException =>
+          failedRecord(input)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
new file mode 100644
index 0000000..c4d9abb
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.json
+
+import com.fasterxml.jackson.core.{JsonParser, JsonToken}
+
+object JacksonUtils {
+  /**
+   * Advance the parser until a null or a specific token is found
+   */
+  def nextUntil(parser: JsonParser, stopOn: JsonToken): Boolean = {
+    parser.nextToken() match {
+      case null => false
+      case x => x != stopOn
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
new file mode 100644
index 0000000..435fba9
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.SequenceFile.CompressionType
+import org.apache.hadoop.io.compress._
+
+import org.apache.spark.util.Utils
+
+object CompressionCodecs {
+  private val shortCompressionCodecNames = Map(
+    "none" -> null,
+    "uncompressed" -> null,
+    "bzip2" -> classOf[BZip2Codec].getName,
+    "deflate" -> classOf[DeflateCodec].getName,
+    "gzip" -> classOf[GzipCodec].getName,
+    "lz4" -> classOf[Lz4Codec].getName,
+    "snappy" -> classOf[SnappyCodec].getName)
+
+  /**
+   * Return the full version of the given codec class.
+   * If it is already a class name, just return it.
+   */
+  def getCodecClassName(name: String): String = {
+    val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name)
+    try {
+      // Validate the codec name
+      if (codecName != null) {
+        Utils.classForName(codecName)
+      }
+      codecName
+    } catch {
+      case e: ClassNotFoundException =>
+        throw new IllegalArgumentException(s"Codec [$codecName] " +
+          s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")
+    }
+  }
+
+  /**
+   * Set compression configurations to Hadoop `Configuration`.
+   * `codec` should be a full class path
+   */
+  def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
+    if (codec != null) {
+      conf.set("mapreduce.output.fileoutputformat.compress", "true")
+      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
+      conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
+      conf.set("mapreduce.map.output.compress", "true")
+      conf.set("mapreduce.map.output.compress.codec", codec)
+    } else {
+      // This infers the option `compression` is set to `uncompressed` or `none`.
+      conf.set("mapreduce.output.fileoutputformat.compress", "false")
+      conf.set("mapreduce.map.output.compress", "false")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala
new file mode 100644
index 0000000..0e46696
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+object ParseModes {
+  val PERMISSIVE_MODE = "PERMISSIVE"
+  val DROP_MALFORMED_MODE = "DROPMALFORMED"
+  val FAIL_FAST_MODE = "FAILFAST"
+
+  val DEFAULT = PERMISSIVE_MODE
+
+  def isValidMode(mode: String): Boolean = {
+    mode.toUpperCase match {
+      case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true
+      case _ => false
+    }
+  }
+
+  def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE
+  def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE
+  def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode))  {
+    mode.toUpperCase == PERMISSIVE_MODE
+  } else {
+    true // We default to permissive is the mode string is not valid
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 7b75409..8462393 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.ParseModes
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -317,4 +319,28 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
       JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil),
       InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc"))))
   }
+
+  test("from_json") {
+    val jsonData = """{"a": 1}"""
+    val schema = StructType(StructField("a", IntegerType) :: Nil)
+    checkEvaluation(
+      JsonToStruct(schema, Map.empty, Literal(jsonData)),
+      InternalRow.fromSeq(1 :: Nil)
+    )
+  }
+
+  test("from_json - invalid data") {
+    val jsonData = """{"a" 1}"""
+    val schema = StructType(StructField("a", IntegerType) :: Nil)
+    checkEvaluation(
+      JsonToStruct(schema, Map.empty, Literal(jsonData)),
+      null
+    )
+
+    // Other modes should still return `null`.
+    checkEvaluation(
+      JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData)),
+      null
+    )
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index b10d2c8..b84fb2f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -21,14 +21,15 @@ import java.util.Properties
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.Partition
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.internal.Logging
+import org.apache.spark.Partition
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
-import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.json.InferSchema
 import org.apache.spark.sql.types.StructType
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
deleted file mode 100644
index 41cff07..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.SequenceFile.CompressionType
-import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec}
-
-import org.apache.spark.util.Utils
-
-private[datasources] object CompressionCodecs {
-  private val shortCompressionCodecNames = Map(
-    "none" -> null,
-    "uncompressed" -> null,
-    "bzip2" -> classOf[BZip2Codec].getName,
-    "deflate" -> classOf[DeflateCodec].getName,
-    "gzip" -> classOf[GzipCodec].getName,
-    "lz4" -> classOf[Lz4Codec].getName,
-    "snappy" -> classOf[SnappyCodec].getName)
-
-  /**
-   * Return the full version of the given codec class.
-   * If it is already a class name, just return it.
-   */
-  def getCodecClassName(name: String): String = {
-    val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name)
-    try {
-      // Validate the codec name
-      if (codecName != null) {
-        Utils.classForName(codecName)
-      }
-      codecName
-    } catch {
-      case e: ClassNotFoundException =>
-        throw new IllegalArgumentException(s"Codec [$codecName] " +
-          s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")
-    }
-  }
-
-  /**
-   * Set compression configurations to Hadoop `Configuration`.
-   * `codec` should be a full class path
-   */
-  def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
-    if (codec != null) {
-      conf.set("mapreduce.output.fileoutputformat.compress", "true")
-      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
-      conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
-      conf.set("mapreduce.map.output.compress", "true")
-      conf.set("mapreduce.map.output.compress.codec", codec)
-    } else {
-      // This infers the option `compression` is set to `uncompressed` or `none`.
-      conf.set("mapreduce.output.fileoutputformat.compress", "false")
-      conf.set("mapreduce.map.output.compress", "false")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
deleted file mode 100644
index 4682280..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-private[datasources] object ParseModes {
-  val PERMISSIVE_MODE = "PERMISSIVE"
-  val DROP_MALFORMED_MODE = "DROPMALFORMED"
-  val FAIL_FAST_MODE = "FAILFAST"
-
-  val DEFAULT = PERMISSIVE_MODE
-
-  def isValidMode(mode: String): Boolean = {
-    mode.toUpperCase match {
-      case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true
-      case _ => false
-    }
-  }
-
-  def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE
-  def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE
-  def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode))  {
-    mode.toUpperCase == PERMISSIVE_MODE
-  } else {
-    true // We default to permissive is the mode string is not valid
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 9610746..4e662a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -29,6 +29,7 @@ import org.apache.spark.TaskContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index e7dcc22..014614e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets
 import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
+import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
 
 private[csv] class CSVOptions(@transient private val parameters: Map[String, String])
   extends Logging with Serializable {

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 91c58d0..dc8bd81 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -23,7 +23,8 @@ import com.fasterxml.jackson.core._
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.TypeCoercion
-import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
+import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
+import org.apache.spark.sql.catalyst.json.JSONOptions
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
deleted file mode 100644
index 02d211d..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.json
-
-import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
-import org.apache.commons.lang3.time.FastDateFormat
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
-
-/**
- * Options for the JSON data source.
- *
- * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
- */
-private[sql] class JSONOptions(
-    @transient private val parameters: Map[String, String])
-  extends Logging with Serializable  {
-
-  val samplingRatio =
-    parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
-  val primitivesAsString =
-    parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
-  val prefersDecimal =
-    parameters.get("prefersDecimal").map(_.toBoolean).getOrElse(false)
-  val allowComments =
-    parameters.get("allowComments").map(_.toBoolean).getOrElse(false)
-  val allowUnquotedFieldNames =
-    parameters.get("allowUnquotedFieldNames").map(_.toBoolean).getOrElse(false)
-  val allowSingleQuotes =
-    parameters.get("allowSingleQuotes").map(_.toBoolean).getOrElse(true)
-  val allowNumericLeadingZeros =
-    parameters.get("allowNumericLeadingZeros").map(_.toBoolean).getOrElse(false)
-  val allowNonNumericNumbers =
-    parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true)
-  val allowBackslashEscapingAnyCharacter =
-    parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
-  val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
-  private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
-  val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord")
-
-  // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
-  val dateFormat: FastDateFormat =
-    FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"))
-
-  val timestampFormat: FastDateFormat =
-    FastDateFormat.getInstance(
-      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"))
-
-  // Parse mode flags
-  if (!ParseModes.isValidMode(parseMode)) {
-    logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
-  }
-
-  val failFast = ParseModes.isFailFastMode(parseMode)
-  val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
-  val permissive = ParseModes.isPermissiveMode(parseMode)
-
-  /** Sets config options on a Jackson [[JsonFactory]]. */
-  def setJacksonOptions(factory: JsonFactory): Unit = {
-    factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)
-    factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, allowUnquotedFieldNames)
-    factory.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, allowSingleQuotes)
-    factory.configure(JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS, allowNumericLeadingZeros)
-    factory.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, allowNonNumericNumbers)
-    factory.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER,
-      allowBackslashEscapingAnyCharacter)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index 270e7fb..5b55b70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -21,8 +21,9 @@ import java.io.Writer
 
 import com.fasterxml.jackson.core._
 
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.JSONOptions
 import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
 import org.apache.spark.sql.types._
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
deleted file mode 100644
index 5ce1bf7..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ /dev/null
@@ -1,440 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.json
-
-import java.io.ByteArrayOutputStream
-
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Try
-
-import com.fasterxml.jackson.core._
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE}
-import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.Utils
-
-private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
-
-class JacksonParser(
-    schema: StructType,
-    columnNameOfCorruptRecord: String,
-    options: JSONOptions) extends Logging {
-
-  import com.fasterxml.jackson.core.JsonToken._
-
-  // A `ValueConverter` is responsible for converting a value from `JsonParser`
-  // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
-
-  // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
-
-  private val factory = new JsonFactory()
-  options.setJacksonOptions(factory)
-
-  private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length))
-
-  @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
-
-  /**
-   * This function deals with the cases it fails to parse. This function will be called
-   * when exceptions are caught during converting. This functions also deals with `mode` option.
-   */
-  private def failedRecord(record: String): Seq[InternalRow] = {
-    // create a row even if no corrupt record column is present
-    if (options.failFast) {
-      throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
-    }
-    if (options.dropMalformed) {
-      if (!isWarningPrintedForMalformedRecord) {
-        logWarning(
-          s"""Found at least one malformed records (sample: $record). The JSON reader will drop
-             |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which
-             |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE
-             |mode and use the default inferred schema.
-             |
-             |Code example to print all malformed records (scala):
-             |===================================================
-             |// The corrupted record exists in column ${columnNameOfCorruptRecord}
-             |val parsedJson = spark.read.json("/path/to/json/file/test.json")
-             |
-           """.stripMargin)
-        isWarningPrintedForMalformedRecord = true
-      }
-      Nil
-    } else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
-      if (!isWarningPrintedForMalformedRecord) {
-        logWarning(
-          s"""Found at least one malformed records (sample: $record). The JSON reader will replace
-             |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode.
-             |To find out which corrupted records have been replaced with null, please use the
-             |default inferred schema instead of providing a custom schema.
-             |
-             |Code example to print all malformed records (scala):
-             |===================================================
-             |// The corrupted record exists in column ${columnNameOfCorruptRecord}.
-             |val parsedJson = spark.read.json("/path/to/json/file/test.json")
-             |
-           """.stripMargin)
-        isWarningPrintedForMalformedRecord = true
-      }
-      emptyRow
-    } else {
-      val row = new GenericMutableRow(schema.length)
-      for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
-        require(schema(corruptIndex).dataType == StringType)
-        row.update(corruptIndex, UTF8String.fromString(record))
-      }
-      Seq(row)
-    }
-  }
-
-  /**
-   * Create a converter which converts the JSON documents held by the `JsonParser`
-   * to a value according to a desired schema. This is a wrapper for the method
-   * `makeConverter()` to handle a row wrapped with an array.
-   */
-  def makeRootConverter(dataType: DataType): ValueConverter = dataType match {
-    case st: StructType =>
-      val elementConverter = makeConverter(st)
-      val fieldConverters = st.map(_.dataType).map(makeConverter)
-      (parser: JsonParser) => parseJsonToken(parser, dataType) {
-        case START_OBJECT => convertObject(parser, st, fieldConverters)
-          // SPARK-3308: support reading top level JSON arrays and take every element
-          // in such an array as a row
-          //
-          // For example, we support, the JSON data as below:
-          //
-          // [{"a":"str_a_1"}]
-          // [{"a":"str_a_2"}, {"b":"str_b_3"}]
-          //
-          // resulting in:
-          //
-          // List([str_a_1,null])
-          // List([str_a_2,null], [null,str_b_3])
-          //
-        case START_ARRAY => convertArray(parser, elementConverter)
-      }
-
-    case ArrayType(st: StructType, _) =>
-      val elementConverter = makeConverter(st)
-      val fieldConverters = st.map(_.dataType).map(makeConverter)
-      (parser: JsonParser) => parseJsonToken(parser, dataType) {
-        // the business end of SPARK-3308:
-        // when an object is found but an array is requested just wrap it in a list.
-        // This is being wrapped in `JacksonParser.parse`.
-        case START_OBJECT => convertObject(parser, st, fieldConverters)
-        case START_ARRAY => convertArray(parser, elementConverter)
-      }
-
-    case _ => makeConverter(dataType)
-  }
-
-  /**
-   * Create a converter which converts the JSON documents held by the `JsonParser`
-   * to a value according to a desired schema.
-   */
-  private def makeConverter(dataType: DataType): ValueConverter = dataType match {
-    case BooleanType =>
-      (parser: JsonParser) => parseJsonToken(parser, dataType) {
-        case VALUE_TRUE => true
-        case VALUE_FALSE => false
-      }
-
-    case ByteType =>
-      (parser: JsonParser) => parseJsonToken(parser, dataType) {
-        case VALUE_NUMBER_INT => parser.getByteValue
-      }
-
-    case ShortType =>
-      (parser: JsonParser) => parseJsonToken(parser, dataType) {
-        case VALUE_NUMBER_INT => parser.getShortValue
-      }
-
-    case IntegerType =>
-      (parser: JsonParser) => parseJsonToken(parser, dataType) {
-        case VALUE_NUMBER_INT => parser.getIntValue
-      }
-
-    case LongType =>
-      (parser: JsonParser) => parseJsonToken(parser, dataType) {
-        case VALUE_NUMBER_INT => parser.getLongValue
-      }
-
-    case FloatType =>
-      (parser: JsonParser) => parseJsonToken(parser, dataType) {
-        case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
-          parser.getFloatValue
-
-        case VALUE_STRING =>
-          // Special case handling for NaN and Infinity.
-          val value = parser.getText
-          val lowerCaseValue = value.toLowerCase
-          if (lowerCaseValue.equals("nan") ||
-            lowerCaseValue.equals("infinity") ||
-            lowerCaseValue.equals("-infinity") ||
-            lowerCaseValue.equals("inf") ||
-            lowerCaseValue.equals("-inf")) {
-            value.toFloat
-          } else {
-            throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.")
-          }
-      }
-
-    case DoubleType =>
-      (parser: JsonParser) => parseJsonToken(parser, dataType) {
-        case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
-          parser.getDoubleValue
-
-        case VALUE_STRING =>
-          // Special case handling for NaN and Infinity.
-          val value = parser.getText
-          val lowerCaseValue = value.toLowerCase
-          if (lowerCaseValue.equals("nan") ||
-            lowerCaseValue.equals("infinity") ||
-            lowerCaseValue.equals("-infinity") ||
-            lowerCaseValue.equals("inf") ||
-            lowerCaseValue.equals("-inf")) {
-            value.toDouble
-          } else {
-            throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.")
-          }
-      }
-
-    case StringType =>
-      (parser: JsonParser) => parseJsonToken(parser, dataType) {
-        case VALUE_STRING =>
-          UTF8String.fromString(parser.getText)
-
-        case _ =>
-          // Note that it always tries to convert the data as string without the case of failure.
-          val writer = new ByteArrayOutputStream()
-          Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
-            generator => generator.copyCurrentStructure(parser)
-          }
-          UTF8String.fromBytes(writer.toByteArray)
-      }
-
-    case TimestampType =>
-      (parser: JsonParser) => parseJsonToken(parser, dataType) {
-        case VALUE_STRING =>
-          // This one will lose microseconds parts.
-          // See https://issues.apache.org/jira/browse/SPARK-10681.
-          Try(options.timestampFormat.parse(parser.getText).getTime * 1000L)
-            .getOrElse {
-              // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
-              // compatibility.
-              DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
-            }
-
-        case VALUE_NUMBER_INT =>
-          parser.getLongValue * 1000000L
-      }
-
-    case DateType =>
-      (parser: JsonParser) => parseJsonToken(parser, dataType) {
-        case VALUE_STRING =>
-          val stringValue = parser.getText
-          // This one will lose microseconds parts.
-          // See https://issues.apache.org/jira/browse/SPARK-10681.x
-          Try(DateTimeUtils.millisToDays(options.dateFormat.parse(parser.getText).getTime))
-            .getOrElse {
-            // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
-            // compatibility.
-            Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime))
-              .getOrElse {
-              // In Spark 1.5.0, we store the data as number of days since epoch in string.
-              // So, we just convert it to Int.
-              stringValue.toInt
-            }
-          }
-      }
-
-    case BinaryType =>
-      (parser: JsonParser) => parseJsonToken(parser, dataType) {
-        case VALUE_STRING => parser.getBinaryValue
-      }
-
-    case dt: DecimalType =>
-      (parser: JsonParser) => parseJsonToken(parser, dataType) {
-        case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) =>
-          Decimal(parser.getDecimalValue, dt.precision, dt.scale)
-      }
-
-    case st: StructType =>
-      val fieldConverters = st.map(_.dataType).map(makeConverter)
-      (parser: JsonParser) => parseJsonToken(parser, dataType) {
-        case START_OBJECT => convertObject(parser, st, fieldConverters)
-      }
-
-    case at: ArrayType =>
-      val elementConverter = makeConverter(at.elementType)
-      (parser: JsonParser) => parseJsonToken(parser, dataType) {
-        case START_ARRAY => convertArray(parser, elementConverter)
-      }
-
-    case mt: MapType =>
-      val valueConverter = makeConverter(mt.valueType)
-      (parser: JsonParser) => parseJsonToken(parser, dataType) {
-        case START_OBJECT => convertMap(parser, valueConverter)
-      }
-
-    case udt: UserDefinedType[_] =>
-      makeConverter(udt.sqlType)
-
-    case _ =>
-      (parser: JsonParser) =>
-        // Here, we pass empty `PartialFunction` so that this case can be
-        // handled as a failed conversion. It will throw an exception as
-        // long as the value is not null.
-        parseJsonToken(parser, dataType)(PartialFunction.empty[JsonToken, Any])
-  }
-
-  /**
-   * This method skips `FIELD_NAME`s at the beginning, and handles nulls ahead before trying
-   * to parse the JSON token using given function `f`. If the `f` failed to parse and convert the
-   * token, call `failedConversion` to handle the token.
-   */
-  private def parseJsonToken(
-      parser: JsonParser,
-      dataType: DataType)(f: PartialFunction[JsonToken, Any]): Any = {
-    parser.getCurrentToken match {
-      case FIELD_NAME =>
-        // There are useless FIELD_NAMEs between START_OBJECT and END_OBJECT tokens
-        parser.nextToken()
-        parseJsonToken(parser, dataType)(f)
-
-      case null | VALUE_NULL => null
-
-      case other => f.applyOrElse(other, failedConversion(parser, dataType))
-    }
-  }
-
-  /**
-   * This function throws an exception for failed conversion, but returns null for empty string,
-   * to guard the non string types.
-   */
-  private def failedConversion(
-      parser: JsonParser,
-      dataType: DataType): PartialFunction[JsonToken, Any] = {
-    case VALUE_STRING if parser.getTextLength < 1 =>
-      // If conversion is failed, this produces `null` rather than throwing exception.
-      // This will protect the mismatch of types.
-      null
-
-    case token =>
-      // We cannot parse this token based on the given data type. So, we throw a
-      // SparkSQLJsonProcessingException and this exception will be caught by
-      // `parse` method.
-      throw new SparkSQLJsonProcessingException(
-        s"Failed to parse a value for data type $dataType (current token: $token).")
-  }
-
-  /**
-   * Parse an object from the token stream into a new Row representing the schema.
-   * Fields in the json that are not defined in the requested schema will be dropped.
-   */
-  private def convertObject(
-      parser: JsonParser,
-      schema: StructType,
-      fieldConverters: Seq[ValueConverter]): InternalRow = {
-    val row = new GenericMutableRow(schema.length)
-    while (nextUntil(parser, JsonToken.END_OBJECT)) {
-      schema.getFieldIndex(parser.getCurrentName) match {
-        case Some(index) =>
-          row.update(index, fieldConverters(index).apply(parser))
-
-        case None =>
-          parser.skipChildren()
-      }
-    }
-
-    row
-  }
-
-  /**
-   * Parse an object as a Map, preserving all fields.
-   */
-  private def convertMap(
-      parser: JsonParser,
-      fieldConverter: ValueConverter): MapData = {
-    val keys = ArrayBuffer.empty[UTF8String]
-    val values = ArrayBuffer.empty[Any]
-    while (nextUntil(parser, JsonToken.END_OBJECT)) {
-      keys += UTF8String.fromString(parser.getCurrentName)
-      values += fieldConverter.apply(parser)
-    }
-
-    ArrayBasedMapData(keys.toArray, values.toArray)
-  }
-
-  /**
-   * Parse an object as a Array.
-   */
-  private def convertArray(
-      parser: JsonParser,
-      fieldConverter: ValueConverter): ArrayData = {
-    val values = ArrayBuffer.empty[Any]
-    while (nextUntil(parser, JsonToken.END_ARRAY)) {
-      values += fieldConverter.apply(parser)
-    }
-
-    new GenericArrayData(values.toArray)
-  }
-
-  /**
-   * Parse the string JSON input to the set of [[InternalRow]]s.
-   */
-  def parse(input: String): Seq[InternalRow] = {
-    if (input.trim.isEmpty) {
-      Nil
-    } else {
-      try {
-        Utils.tryWithResource(factory.createParser(input)) { parser =>
-          parser.nextToken()
-          rootConverter.apply(parser) match {
-            case null => failedRecord(input)
-            case row: InternalRow => row :: Nil
-            case array: ArrayData =>
-              // Here, as we support reading top level JSON arrays and take every element
-              // in such an array as a row, this case is possible.
-              if (array.numElements() == 0) {
-                Nil
-              } else {
-                array.toArray[InternalRow](schema)
-              }
-            case _ =>
-              failedRecord(input)
-          }
-        }
-      } catch {
-        case _: JsonProcessingException =>
-          failedRecord(input)
-        case _: SparkSQLJsonProcessingException =>
-          failedRecord(input)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala
deleted file mode 100644
index 005546f..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.json
-
-import com.fasterxml.jackson.core.{JsonParser, JsonToken}
-
-private object JacksonUtils {
-  /**
-   * Advance the parser until a null or a specific token is found
-   */
-  def nextUntil(parser: JsonParser, stopOn: JsonToken): Boolean = {
-    parser.nextToken() match {
-      case null => false
-      case x => x != stopOn
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 6882a6c..9fe38cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -32,6 +32,8 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index a875b01..9f96667 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructType}

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 47bf41a..3bc1c5b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 import scala.reflect.runtime.universe.{typeTag, TypeTag}
 import scala.util.Try
@@ -2819,6 +2820,63 @@ object functions {
   }
 
   /**
+   * (Scala-specific) Parses a column containing a JSON string into a [[StructType]] with the
+   * specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param schema the schema to use when parsing the json string
+   * @param options options to control how the json is parsed. accepts the same options and the
+   *                json data source.
+   * @param e a string column containing JSON data.
+   *
+   * @group collection_funcs
+   * @since 2.1.0
+   */
+  def from_json(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr {
+    JsonToStruct(schema, options, e.expr)
+  }
+
+  /**
+   * (Java-specific) Parses a column containing a JSON string into a [[StructType]] with the
+   * specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing JSON data.
+   * @param schema the schema to use when parsing the json string
+   * @param options options to control how the json is parsed. accepts the same options and the
+   *                json data source.
+   *
+   * @group collection_funcs
+   * @since 2.1.0
+   */
+  def from_json(e: Column, schema: StructType, options: java.util.Map[String, String]): Column =
+    from_json(e, schema, options.asScala.toMap)
+
+  /**
+   * Parses a column containing a JSON string into a [[StructType]] with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing JSON data.
+   * @param schema the schema to use when parsing the json string
+   *
+   * @group collection_funcs
+   * @since 2.1.0
+   */
+  def from_json(e: Column, schema: StructType): Column =
+    from_json(e, schema, Map.empty[String, String])
+
+  /**
+   * Parses a column containing a JSON string into a [[StructType]] with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing JSON data.
+   * @param schema the schema to use when parsing the json string as a json string
+   *
+   * @group collection_funcs
+   * @since 2.1.0
+   */
+  def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column =
+    from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options)
+
+  /**
    * Returns length of array or map.
    *
    * @group collection_funcs

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 1391c9d..518d6e9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.functions.from_json
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructType}
 
 class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
@@ -94,4 +96,31 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
 
     checkAnswer(expr, expected)
   }
+
+  test("json_parser") {
+    val df = Seq("""{"a": 1}""").toDS()
+    val schema = new StructType().add("a", IntegerType)
+
+    checkAnswer(
+      df.select(from_json($"value", schema)),
+      Row(Row(1)) :: Nil)
+  }
+
+  test("json_parser missing columns") {
+    val df = Seq("""{"a": 1}""").toDS()
+    val schema = new StructType().add("b", IntegerType)
+
+    checkAnswer(
+      df.select(from_json($"value", schema)),
+      Row(Row(null)) :: Nil)
+  }
+
+  test("json_parser invalid json") {
+    val df = Seq("""{"a" 1}""").toDS()
+    val schema = new StructType().add("a", IntegerType)
+
+    checkAnswer(
+      df.select(from_json($"value", schema)),
+      Row(null) :: Nil)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
index c31dffe..0b72da5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.json
 
 import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.json.JSONOptions
 import org.apache.spark.sql.test.SharedSQLContext
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 3d533c1..456052f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -26,9 +26,10 @@ import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.GzipCodec
 
-import org.apache.spark.SparkException
 import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkException
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org