You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/08/15 01:33:12 UTC

[spark] branch master updated: [SPARK-35320][SQL] Align error message for unsupported key types in MapType in Json reader

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a9ab41a  [SPARK-35320][SQL] Align error message for unsupported key types in MapType in Json reader
a9ab41a is described below

commit a9ab41ad56c1b118ec28e4f9535f734980851179
Author: Pablo Langa <so...@gmail.com>
AuthorDate: Sun Aug 15 10:31:57 2021 +0900

    [SPARK-35320][SQL] Align error message for unsupported key types in MapType in Json reader
    
    ### What changes were proposed in this pull request?
    
    This PR is related with https://github.com/apache/spark/pull/33525.
    The purpose is to align error messages between the function from_json and the Json reader for unsupported key types in MapType.
    Current behavior:
    ```
    scala> spark.read.schema(StructType(Seq(StructField("col", MapType(IntegerType, StringType))))).json(Seq("""{"1": "test"}""").toDS()).show
    +----+
    | col|
    +----+
    |null|
    +----+
    
    ```
    ```
    scala> Seq("""{"1": "test"}""").toDF("col").write.json("/tmp/jsontests1234")
    
    scala> spark.read.schema(StructType(Seq(StructField("col", MapType(IntegerType, StringType))))).json("/tmp/jsontests1234").show
    +----+
    | col|
    +----+
    |null|
    +----+
    ```
    With this change, an AnalysisException with the message `"Input schema $schema can only contain StringType as a key type for a MapType."` wil be thrown
    
    ### Why are the changes needed?
    
    It's more consistent to align the behavior
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, now an Exception will be thrown
    
    ### How was this patch tested?
    
    Unit testing, manual testing
    
    Closes #33672 from planga82/feature/spark35320_improve_error_message_reader.
    
    Authored-by: Pablo Langa <so...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 core/src/main/resources/error/error-classes.json   |  3 +++
 .../spark/sql/catalyst/expressions/ExprUtils.scala | 17 ++++++++++++++
 .../sql/catalyst/expressions/jsonExpressions.scala | 12 +++-------
 .../spark/sql/errors/QueryCompilationErrors.scala  |  6 +++++
 .../org/apache/spark/sql/DataFrameReader.scala     |  6 ++++-
 .../spark/sql/streaming/DataStreamReader.scala     |  6 ++++-
 .../org/apache/spark/sql/DataFrameSuite.scala      | 27 ++++++++++++++++++++++
 .../sql/streaming/FileStreamSourceSuite.scala      | 19 +++++++++++++++
 8 files changed, 85 insertions(+), 11 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 9ac5f06..d6895a1 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -33,6 +33,9 @@
     "message" : [ "Field name %s is invalid: %s is not a struct." ],
     "sqlState" : "42000"
   },
+  "INVALID_JSON_SCHEMA_MAPTYPE" : {
+    "message" : [ "Input schema %s can only contain StringType as a key type for a MapType." ]
+  },
   "MISSING_COLUMN" : {
     "message" : [ "cannot resolve '%s' given input columns: [%s]" ],
     "sqlState" : "42000"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
index 4f50802..204b753 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
@@ -94,4 +94,21 @@ object ExprUtils {
       }
     }
   }
+
+  /**
+   * Check if the schema is valid for Json
+   * @param schema
+   * @return
+   *  None if the schema is valid
+   *  Some(msg) with the error message if the schema is not valid
+   */
+  def checkJsonSchema(schema: DataType): Option[Throwable] =
+    if (schema.existsRecursively {
+      case MapType(keyType, _, _) if keyType != StringType => true
+      case _ => false
+    }) {
+      Some(QueryCompilationErrors.invalidJsonSchema(schema))
+    } else {
+      None
+    }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 947cbf7..a807f49 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -561,15 +561,9 @@ case class JsonToStructs(
 
   override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
     case _: StructType | _: ArrayType | _: MapType =>
-      val invalidMapType = nullableSchema.existsRecursively(dataType => dataType match {
-        case MapType(keyType, _, _) if keyType != StringType => true
-        case _ => false
-      })
-      if (invalidMapType) {
-        TypeCheckResult.TypeCheckFailure(
-          s"Input schema ${nullableSchema.catalogString} can only contain StringType " +
-            "as a key type for a MapType.")
-      } else {
+      ExprUtils.checkJsonSchema(nullableSchema).map { e =>
+        TypeCheckResult.TypeCheckFailure(e.getMessage)
+      } getOrElse {
         super.checkInputDataTypes()
       }
     case _ => TypeCheckResult.TypeCheckFailure(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index b62729a..00648a9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2366,4 +2366,10 @@ private[spark] object QueryCompilationErrors {
       messageParameters = Array(fieldName.quoted, path.quoted),
       origin = context)
   }
+
+  def invalidJsonSchema(schema: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "INVALID_JSON_SCHEMA_MAPTYPE",
+      messageParameters = Array(schema.toString))
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index b13a352..8a0e449 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -402,7 +402,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * @since 2.0.0
    */
   @scala.annotation.varargs
-  def json(paths: String*): DataFrame = format("json").load(paths : _*)
+  def json(paths: String*): DataFrame = {
+    userSpecifiedSchema.foreach(ExprUtils.checkJsonSchema(_).foreach(throw _))
+    format("json").load(paths : _*)
+  }
 
   /**
    * Loads a `JavaRDD[String]` storing JSON objects (<a href="http://jsonlines.org/">JSON
@@ -449,6 +452,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
       sparkSession.sessionState.conf.sessionLocalTimeZone,
       sparkSession.sessionState.conf.columnNameOfCorruptRecord)
 
+    userSpecifiedSchema.foreach(ExprUtils.checkJsonSchema(_).foreach(throw _))
     val schema = userSpecifiedSchema.map(_.asNullable).getOrElse {
       TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions)
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 4486398..ba825a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -25,6 +25,7 @@ import org.apache.spark.annotation.Evolving
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.expressions.ExprUtils
 import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
 import org.apache.spark.sql.connector.catalog.{SupportsRead, TableProvider}
@@ -230,7 +231,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    *
    * @since 2.0.0
    */
-  def json(path: String): DataFrame = format("json").load(path)
+  def json(path: String): DataFrame = {
+    userSpecifiedSchema.foreach(ExprUtils.checkJsonSchema(_).foreach(throw _))
+    format("json").load(path)
+  }
 
   /**
    * Loads a CSV file stream and returns the result as a `DataFrame`.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f2d0b60..bf161a1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2993,6 +2993,33 @@ class DataFrameSuite extends QueryTest
       .withSequenceColumn("default_index").collect().map(_.getLong(0))
     assert(ids.toSet === Range(0, 10).toSet)
   }
+
+  test("SPARK-35320: Reading JSON with key type different to String in a map should fail") {
+    Seq(
+      (MapType(IntegerType, StringType), """{"1": "test"}"""),
+      (StructType(Seq(StructField("test", MapType(IntegerType, StringType)))),
+        """"test": {"1": "test"}"""),
+      (ArrayType(MapType(IntegerType, StringType)), """[{"1": "test"}]"""),
+      (MapType(StringType, MapType(IntegerType, StringType)), """{"key": {"1" : "test"}}""")
+    ).foreach { case (schema, jsonData) =>
+      withTempDir { dir =>
+        val colName = "col"
+        val msg = "can only contain StringType as a key type for a MapType"
+
+        val thrown1 = intercept[AnalysisException](
+          spark.read.schema(StructType(Seq(StructField(colName, schema))))
+            .json(Seq(jsonData).toDS()).collect())
+        assert(thrown1.getMessage.contains(msg))
+
+        val jsonDir = new File(dir, "json").getCanonicalPath
+        Seq(jsonData).toDF(colName).write.json(jsonDir)
+        val thrown2 = intercept[AnalysisException](
+          spark.read.schema(StructType(Seq(StructField(colName, schema))))
+            .json(jsonDir).collect())
+        assert(thrown2.getMessage.contains(msg))
+      }
+    }
+  }
 }
 
 case class GroupByKey(a: Int, b: Int)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index a91856a..868946f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -2240,6 +2240,25 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     require(stringToFile(tempFile, content).renameTo(finalFile))
     finalFile
   }
+
+  test("SPARK-35320: Reading JSON with key type different to String in a map should fail") {
+    Seq(
+      MapType(IntegerType, StringType),
+      StructType(Seq(StructField("test", MapType(IntegerType, StringType)))),
+      ArrayType(MapType(IntegerType, StringType)),
+      MapType(StringType, MapType(IntegerType, StringType))
+    ).foreach { schema =>
+      withTempDir { dir =>
+        val colName = "col"
+        val msg = "can only contain StringType as a key type for a MapType"
+
+        val thrown1 = intercept[AnalysisException](
+          spark.readStream.schema(StructType(Seq(StructField(colName, schema))))
+            .json(dir.getCanonicalPath).schema)
+        assert(thrown1.getMessage.contains(msg))
+      }
+    }
+  }
 }
 
 class FileStreamSourceStressTestSuite extends FileStreamSourceTest {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org