You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by GitBox <gi...@apache.org> on 2019/01/15 05:05:42 UTC
[spark] Diff for: [GitHub] cloud-fan closed pull request #23325:
[SPARK-26376][SQL] Skip inputs without tokens by JSON datasource
diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md
index 0fcdd420bcfe3..8cb7ee78b00d2 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -17,7 +17,7 @@ displayTitle: Spark SQL Upgrading Guide
- Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`.
- - In Spark version 2.4 and earlier, the `from_json` function produces `null`s for JSON strings and JSON datasource skips the same independently of its mode if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`.
+ - In Spark version 2.4 and earlier, the `from_json` function produces `null`s for JSON strings without valid root JSON tokens (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`.
- The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index e0cab537ce1c6..27b6a63956198 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -583,7 +583,11 @@ case class JsonToStructs(
(StructType(StructField("value", other) :: Nil), other)
}
- val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = false)
+ val rawParser = new JacksonParser(
+ actualSchema,
+ parsedOptions,
+ allowArrayAsStructs = false,
+ skipInputWithoutTokens = false)
val createParser = CreateJacksonParser.utf8String _
new FailureSafeParser[UTF8String](
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 3f245e1400fa1..0f206f843cc6f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -40,7 +40,8 @@ import org.apache.spark.util.Utils
class JacksonParser(
schema: DataType,
val options: JSONOptions,
- allowArrayAsStructs: Boolean) extends Logging {
+ allowArrayAsStructs: Boolean,
+ skipInputWithoutTokens: Boolean) extends Logging {
import JacksonUtils._
import com.fasterxml.jackson.core.JsonToken._
@@ -399,6 +400,7 @@ class JacksonParser(
// a null first token is equivalent to testing for input.trim.isEmpty
// but it works on any token stream and not just strings
parser.nextToken() match {
+ case null if skipInputWithoutTokens => Nil
case null => throw new RuntimeException("Not found any JSON token")
case _ => rootConverter.apply(parser) match {
case null => throw new RuntimeException("Root converter returned null")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index ce8e4c8f5b82b..96b3897f1d038 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -455,7 +455,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val createParser = CreateJacksonParser.string _
val parsed = jsonDataset.rdd.mapPartitions { iter =>
- val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
+ val rawParser = new JacksonParser(
+ actualSchema,
+ parsedOptions,
+ allowArrayAsStructs = true,
+ skipInputWithoutTokens = true)
val parser = new FailureSafeParser[String](
input => rawParser.parse(input, createParser, UTF8String.fromString),
parsedOptions.parseMode,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 40f55e7068010..49b6bc12ce18f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -125,7 +125,11 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
}
(file: PartitionedFile) => {
- val parser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
+ val parser = new JacksonParser(
+ actualSchema,
+ parsedOptions,
+ allowArrayAsStructs = true,
+ skipInputWithoutTokens = true)
JsonDataSource(parsedOptions).readFile(
broadcastedHadoopConf.value.value,
file,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 8f575a371c98e..eb18bf8ece999 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -70,7 +70,11 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val dummyOption = new JSONOptions(options, SQLConf.get.sessionLocalTimeZone)
val dummySchema = StructType(Seq.empty)
- val parser = new JacksonParser(dummySchema, dummyOption, allowArrayAsStructs = true)
+ val parser = new JacksonParser(
+ dummySchema,
+ dummyOption,
+ allowArrayAsStructs = true,
+ skipInputWithoutTokens = true)
Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser =>
jsonParser.nextToken()
@@ -1126,7 +1130,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
Row(null, null, null),
Row(null, null, null),
Row(null, null, null),
- Row(null, null, null),
Row("str_a_4", "str_b_4", "str_c_4"),
Row(null, null, null))
)
@@ -1148,7 +1151,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkAnswer(
jsonDF.select($"a", $"b", $"c", $"_unparsed"),
Row(null, null, null, "{") ::
- Row(null, null, null, "") ::
Row(null, null, null, """{"a":1, b:2}""") ::
Row(null, null, null, """{"a":{, b:3}""") ::
Row("str_a_4", "str_b_4", "str_c_4", null) ::
@@ -1163,7 +1165,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkAnswer(
jsonDF.filter($"_unparsed".isNotNull).select($"_unparsed"),
Row("{") ::
- Row("") ::
Row("""{"a":1, b:2}""") ::
Row("""{"a":{, b:3}""") ::
Row("]") :: Nil
@@ -1185,7 +1186,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkAnswer(
jsonDF.selectExpr("a", "b", "c", "_malformed"),
Row(null, null, null, "{") ::
- Row(null, null, null, "") ::
Row(null, null, null, """{"a":1, b:2}""") ::
Row(null, null, null, """{"a":{, b:3}""") ::
Row("str_a_4", "str_b_4", "str_c_4", null) ::
@@ -2531,7 +2531,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
checkCount(2)
- countForMalformedJSON(1, Seq(""))
+ countForMalformedJSON(0, Seq(""))
}
test("SPARK-25040: empty strings should be disallowed") {
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org