You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2018/12/11 08:11:18 UTC

[GitHub] asfgit closed pull request #23253: [SPARK-26303][SQL] Return partial results for bad JSON records

asfgit closed pull request #23253: [SPARK-26303][SQL] Return partial results for bad JSON records
URL: https://github.com/apache/spark/pull/23253
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md
index ed2ff139bcc33..3c346be4c45d0 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -35,7 +35,9 @@ displayTitle: Spark SQL Upgrading Guide
 
   - Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
 
-  - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully.
+  - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, the returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully.
+
+  - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, the returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully.
 
 ## Upgrading From Spark SQL 2.3 to 2.4
 
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 1d2dd4d808930..7b10512a43294 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -211,7 +211,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
                      set, it uses the default value, ``PERMISSIVE``.
 
                 * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
-                  into a field configured by ``columnNameOfCorruptRecord``, and sets other \
+                  into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
                   fields to ``null``. To keep corrupt records, an user can set a string type \
                   field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
                   schema does not have the field, it drops corrupt records during parsing. \
@@ -424,7 +424,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
                      set, it uses the default value, ``PERMISSIVE``.
 
                 * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
-                  into a field configured by ``columnNameOfCorruptRecord``, and sets other \
+                  into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
                   fields to ``null``. To keep corrupt records, an user can set a string type \
                   field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
                   schema does not have the field, it drops corrupt records during parsing. \
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index d92b0d5677e25..fc23b9d99c34a 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -441,7 +441,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
                      set, it uses the default value, ``PERMISSIVE``.
 
                 * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
-                  into a field configured by ``columnNameOfCorruptRecord``, and sets other \
+                  into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
                   fields to ``null``. To keep corrupt records, an user can set a string type \
                   field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
                   schema does not have the field, it drops corrupt records during parsing. \
@@ -648,7 +648,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
                      set, it uses the default value, ``PERMISSIVE``.
 
                 * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
-                  into a field configured by ``columnNameOfCorruptRecord``, and sets other \
+                  into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
                   fields to ``null``. To keep corrupt records, an user can set a string type \
                   field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
                   schema does not have the field, it drops corrupt records during parsing. \
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 2357595906b11..7e3bd4df51bb7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -22,6 +22,7 @@ import java.nio.charset.MalformedInputException
 
 import scala.collection.mutable.ArrayBuffer
 import scala.util.Try
+import scala.util.control.NonFatal
 
 import com.fasterxml.jackson.core._
 
@@ -29,7 +30,6 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
@@ -347,17 +347,28 @@ class JacksonParser(
       schema: StructType,
       fieldConverters: Array[ValueConverter]): InternalRow = {
     val row = new GenericInternalRow(schema.length)
+    var badRecordException: Option[Throwable] = None
+
     while (nextUntil(parser, JsonToken.END_OBJECT)) {
       schema.getFieldIndex(parser.getCurrentName) match {
         case Some(index) =>
-          row.update(index, fieldConverters(index).apply(parser))
-
+          try {
+            row.update(index, fieldConverters(index).apply(parser))
+          } catch {
+            case NonFatal(e) =>
+              badRecordException = badRecordException.orElse(Some(e))
+              parser.skipChildren()
+          }
         case None =>
           parser.skipChildren()
       }
     }
 
-    row
+    if (badRecordException.isEmpty) {
+      row
+    } else {
+      throw PartialResultException(row, badRecordException.get)
+    }
   }
 
   /**
@@ -428,6 +439,11 @@ class JacksonParser(
         val wrappedCharException = new CharConversionException(msg)
         wrappedCharException.initCause(e)
         throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException)
+      case PartialResultException(row, cause) =>
+        throw BadRecordException(
+          record = () => recordLiteral(record),
+          partialResult = () => Some(row),
+          cause)
     }
   }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
index 985f0dc1cd60e..d719a33929fcc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
@@ -20,6 +20,16 @@ package org.apache.spark.sql.catalyst.util
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.unsafe.types.UTF8String
 
+/**
+ * Exception thrown when the underlying parser returns a partial result of parsing.
+ * @param partialResult the partial result of parsing a bad record.
+ * @param cause the actual exception about why the parser cannot return full result.
+ */
+case class PartialResultException(
+     partialResult: InternalRow,
+     cause: Throwable)
+  extends Exception(cause)
+
 /**
  * Exception thrown when the underlying parser meet a bad record and can't parse it.
  * @param record a function to return the record that cause the parser to fail
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 661fe98d8c901..9751528654ffb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -362,7 +362,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * during parsing.
    *   <ul>
    *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
-   *     field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To
+   *     field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`. To
    *     keep corrupt records, an user can set a string type field named
    *     `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have the
    *     field, it drops corrupt records during parsing. When inferring a schema, it implicitly
@@ -598,13 +598,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    *    during parsing. It supports the following case-insensitive modes.
    *   <ul>
    *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
-   *     field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep
-   *     corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
-   *     in an user-defined schema. If a schema does not have the field, it drops corrupt records
-   *     during parsing. A record with less/more tokens than schema is not a corrupted record to
-   *     CSV. When it meets a record having fewer tokens than the length of the schema, sets
-   *     `null` to extra fields. When the record has more tokens than the length of the schema,
-   *     it drops extra tokens.</li>
+   *     field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`.
+   *     To keep corrupt records, an user can set a string type field named
+   *     `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have
+   *     the field, it drops corrupt records during parsing. A record with less/more tokens
+   *     than schema is not a corrupted record to CSV. When it meets a record having fewer
+   *     tokens than the length of the schema, sets `null` to extra fields. When the record
+   *     has more tokens than the length of the schema, it drops extra tokens.</li>
    *     <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *     <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
    *   </ul>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index c8e3e1c191044..914fa90ae7e14 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -273,7 +273,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    * during parsing.
    *   <ul>
    *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
-   *     field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To
+   *     field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`. To
    *     keep corrupt records, an user can set a string type field named
    *     `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have the
    *     field, it drops corrupt records during parsing. When inferring a schema, it implicitly
@@ -360,13 +360,13 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    *    during parsing. It supports the following case-insensitive modes.
    *   <ul>
    *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
-   *     field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep
-   *     corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
-   *     in an user-defined schema. If a schema does not have the field, it drops corrupt records
-   *     during parsing. A record with less/more tokens than schema is not a corrupted record to
-   *     CSV. When it meets a record having fewer tokens than the length of the schema, sets
-   *     `null` to extra fields. When the record has more tokens than the length of the schema,
-   *     it drops extra tokens.</li>
+   *     field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`.
+   *     To keep corrupt records, an user can set a string type field named
+   *     `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have
+   *     the field, it drops corrupt records during parsing. A record with less/more tokens
+   *     than schema is not a corrupted record to CSV. When it meets a record having fewer
+   *     tokens than the length of the schema, sets `null` to extra fields. When the record
+   *     has more tokens than the length of the schema, it drops extra tokens.</li>
    *     <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *     <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
    *   </ul>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index dff37ca2d40f0..3330de3584ebb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -248,7 +248,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
 
     checkAnswer(
       sql("select nullstr, headers.Host from jsonTable"),
-      Seq(Row("", "1.abc.com"), Row("", null), Row(null, null), Row(null, null))
+      Seq(Row("", "1.abc.com"), Row("", null), Row("", null), Row(null, null))
     )
   }
 
@@ -2563,4 +2563,17 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       assert(!files.exists(_.getName.endsWith("json")))
     }
   }
+
+  test("return partial result for bad records") {
+    val schema = "a double, b array<int>, c string, _corrupt_record string"
+    val badRecords = Seq(
+      """{"a":"-","b":[0, 1, 2],"c":"abc"}""",
+      """{"a":0.1,"b":{},"c":"def"}""").toDS()
+    val df = spark.read.schema(schema).json(badRecords)
+
+    checkAnswer(
+      df,
+      Row(null, Array(0, 1, 2), "abc", """{"a":"-","b":[0, 1, 2],"c":"abc"}""") ::
+      Row(0.1, null, "def", """{"a":0.1,"b":{},"c":"def"}""") :: Nil)
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org