You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/02/23 05:39:26 UTC

spark git commit: [SPARK-19695][SQL] Throw an exception if a `columnNameOfCorruptRecord` field violates requirements in json formats

Repository: spark
Updated Branches:
  refs/heads/master 66c4b79af -> 769aa0f1d


[SPARK-19695][SQL] Throw an exception if a `columnNameOfCorruptRecord` field violates requirements in json formats

## What changes were proposed in this pull request?
This pr comes from #16928 and fixed a json behaviour along with the CSV one.

## How was this patch tested?
Added tests in `JsonSuite`.

Author: Takeshi Yamamuro <ya...@apache.org>

Closes #17023 from maropu/SPARK-19695.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/769aa0f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/769aa0f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/769aa0f1

Branch: refs/heads/master
Commit: 769aa0f1d22d3c6d4c7871468344d82c8dc36260
Parents: 66c4b79
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Wed Feb 22 21:39:20 2017 -0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Feb 22 21:39:20 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/json/JacksonParser.scala |  5 +++-
 .../org/apache/spark/sql/DataFrameReader.scala  | 11 ++++++-
 .../datasources/json/JsonFileFormat.scala       | 13 ++++++--
 .../execution/datasources/json/JsonSuite.scala  | 31 ++++++++++++++++++++
 4 files changed, 56 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/769aa0f1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 9950959..9b80c0f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -58,7 +58,10 @@ class JacksonParser(
   private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length))
 
   private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
-  corruptFieldIndex.foreach(idx => require(schema(idx).dataType == StringType))
+  corruptFieldIndex.foreach { corrFieldIndex =>
+    require(schema(corrFieldIndex).dataType == StringType)
+    require(schema(corrFieldIndex).nullable)
+  }
 
   @transient
   private[this] var isWarningPrinted: Boolean = false

http://git-wip-us.apache.org/repos/asf/spark/blob/769aa0f1/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 4c1341e..2be2276 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.jdbc._
 import org.apache.spark.sql.execution.datasources.json.JsonInferSchema
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
@@ -365,6 +365,15 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
         createParser)
     }
 
+    // Check a field requirement for corrupt records here to throw an exception in a driver side
+    schema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).foreach { corruptFieldIndex =>
+      val f = schema(corruptFieldIndex)
+      if (f.dataType != StringType || !f.nullable) {
+        throw new AnalysisException(
+          "The field for corrupt records must be string type and nullable")
+      }
+    }
+
     val parsed = jsonDataset.rdd.mapPartitions { iter =>
       val parser = new JacksonParser(schema, parsedOptions)
       iter.flatMap(parser.parse(_, createParser, UTF8String.fromString))

http://git-wip-us.apache.org/repos/asf/spark/blob/769aa0f1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 2cbf4ea..902fee5a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -22,13 +22,13 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions}
 import org.apache.spark.sql.catalyst.util.CompressionCodecs
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
 class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
@@ -102,6 +102,15 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
       sparkSession.sessionState.conf.sessionLocalTimeZone,
       sparkSession.sessionState.conf.columnNameOfCorruptRecord)
 
+    // Check a field requirement for corrupt records here to throw an exception in a driver side
+    dataSchema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).foreach { corruptFieldIndex =>
+      val f = dataSchema(corruptFieldIndex)
+      if (f.dataType != StringType || !f.nullable) {
+        throw new AnalysisException(
+          "The field for corrupt records must be string type and nullable")
+      }
+    }
+
     (file: PartitionedFile) => {
       val parser = new JacksonParser(requiredSchema, parsedOptions)
       JsonDataSource(parsedOptions).readFile(

http://git-wip-us.apache.org/repos/asf/spark/blob/769aa0f1/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 05aa2ab..0e01be2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1944,4 +1944,35 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       assert(exceptionTwo.getMessage.contains("Malformed line in FAILFAST mode"))
     }
   }
+
+  test("Throw an exception if a `columnNameOfCorruptRecord` field violates requirements") {
+    val columnNameOfCorruptRecord = "_unparsed"
+    val schema = StructType(
+      StructField(columnNameOfCorruptRecord, IntegerType, true) ::
+        StructField("a", StringType, true) ::
+        StructField("b", StringType, true) ::
+        StructField("c", StringType, true) :: Nil)
+    val errMsg = intercept[AnalysisException] {
+      spark.read
+        .option("mode", "PERMISSIVE")
+        .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
+        .schema(schema)
+        .json(corruptRecords)
+    }.getMessage
+    assert(errMsg.startsWith("The field for corrupt records must be string type and nullable"))
+
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      corruptRecords.toDF("value").write.text(path)
+      val errMsg = intercept[AnalysisException] {
+        spark.read
+          .option("mode", "PERMISSIVE")
+          .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
+          .schema(schema)
+          .json(path)
+          .collect
+      }.getMessage
+      assert(errMsg.startsWith("The field for corrupt records must be string type and nullable"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org