You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/08/15 01:33:12 UTC
[spark] branch master updated: [SPARK-35320][SQL] Align error
message for unsupported key types in MapType in Json reader
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a9ab41a [SPARK-35320][SQL] Align error message for unsupported key types in MapType in Json reader
a9ab41a is described below
commit a9ab41ad56c1b118ec28e4f9535f734980851179
Author: Pablo Langa <so...@gmail.com>
AuthorDate: Sun Aug 15 10:31:57 2021 +0900
[SPARK-35320][SQL] Align error message for unsupported key types in MapType in Json reader
### What changes were proposed in this pull request?
This PR is related with https://github.com/apache/spark/pull/33525.
The purpose is to align error messages between the function from_json and the Json reader for unsupported key types in MapType.
Current behavior:
```
scala> spark.read.schema(StructType(Seq(StructField("col", MapType(IntegerType, StringType))))).json(Seq("""{"1": "test"}""").toDS()).show
+----+
| col|
+----+
|null|
+----+
```
```
scala> Seq("""{"1": "test"}""").toDF("col").write.json("/tmp/jsontests1234")
scala> spark.read.schema(StructType(Seq(StructField("col", MapType(IntegerType, StringType))))).json("/tmp/jsontests1234").show
+----+
| col|
+----+
|null|
+----+
```
With this change, an AnalysisException with the message `"Input schema $schema can only contain StringType as a key type for a MapType."` wil be thrown
### Why are the changes needed?
It's more consistent to align the behavior
### Does this PR introduce _any_ user-facing change?
Yes, now an Exception will be thrown
### How was this patch tested?
Unit testing, manual testing
Closes #33672 from planga82/feature/spark35320_improve_error_message_reader.
Authored-by: Pablo Langa <so...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
core/src/main/resources/error/error-classes.json | 3 +++
.../spark/sql/catalyst/expressions/ExprUtils.scala | 17 ++++++++++++++
.../sql/catalyst/expressions/jsonExpressions.scala | 12 +++-------
.../spark/sql/errors/QueryCompilationErrors.scala | 6 +++++
.../org/apache/spark/sql/DataFrameReader.scala | 6 ++++-
.../spark/sql/streaming/DataStreamReader.scala | 6 ++++-
.../org/apache/spark/sql/DataFrameSuite.scala | 27 ++++++++++++++++++++++
.../sql/streaming/FileStreamSourceSuite.scala | 19 +++++++++++++++
8 files changed, 85 insertions(+), 11 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 9ac5f06..d6895a1 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -33,6 +33,9 @@
"message" : [ "Field name %s is invalid: %s is not a struct." ],
"sqlState" : "42000"
},
+ "INVALID_JSON_SCHEMA_MAPTYPE" : {
+ "message" : [ "Input schema %s can only contain StringType as a key type for a MapType." ]
+ },
"MISSING_COLUMN" : {
"message" : [ "cannot resolve '%s' given input columns: [%s]" ],
"sqlState" : "42000"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
index 4f50802..204b753 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
@@ -94,4 +94,21 @@ object ExprUtils {
}
}
}
+
+ /**
+ * Check if the schema is valid for Json
+ * @param schema
+ * @return
+ * None if the schema is valid
+ * Some(msg) with the error message if the schema is not valid
+ */
+ def checkJsonSchema(schema: DataType): Option[Throwable] =
+ if (schema.existsRecursively {
+ case MapType(keyType, _, _) if keyType != StringType => true
+ case _ => false
+ }) {
+ Some(QueryCompilationErrors.invalidJsonSchema(schema))
+ } else {
+ None
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 947cbf7..a807f49 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -561,15 +561,9 @@ case class JsonToStructs(
override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
case _: StructType | _: ArrayType | _: MapType =>
- val invalidMapType = nullableSchema.existsRecursively(dataType => dataType match {
- case MapType(keyType, _, _) if keyType != StringType => true
- case _ => false
- })
- if (invalidMapType) {
- TypeCheckResult.TypeCheckFailure(
- s"Input schema ${nullableSchema.catalogString} can only contain StringType " +
- "as a key type for a MapType.")
- } else {
+ ExprUtils.checkJsonSchema(nullableSchema).map { e =>
+ TypeCheckResult.TypeCheckFailure(e.getMessage)
+ } getOrElse {
super.checkInputDataTypes()
}
case _ => TypeCheckResult.TypeCheckFailure(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index b62729a..00648a9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2366,4 +2366,10 @@ private[spark] object QueryCompilationErrors {
messageParameters = Array(fieldName.quoted, path.quoted),
origin = context)
}
+
+ def invalidJsonSchema(schema: DataType): Throwable = {
+ new AnalysisException(
+ errorClass = "INVALID_JSON_SCHEMA_MAPTYPE",
+ messageParameters = Array(schema.toString))
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index b13a352..8a0e449 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -402,7 +402,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @since 2.0.0
*/
@scala.annotation.varargs
- def json(paths: String*): DataFrame = format("json").load(paths : _*)
+ def json(paths: String*): DataFrame = {
+ userSpecifiedSchema.foreach(ExprUtils.checkJsonSchema(_).foreach(throw _))
+ format("json").load(paths : _*)
+ }
/**
* Loads a `JavaRDD[String]` storing JSON objects (<a href="http://jsonlines.org/">JSON
@@ -449,6 +452,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+ userSpecifiedSchema.foreach(ExprUtils.checkJsonSchema(_).foreach(throw _))
val schema = userSpecifiedSchema.map(_.asNullable).getOrElse {
TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 4486398..ba825a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -25,6 +25,7 @@ import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
import org.apache.spark.sql.connector.catalog.{SupportsRead, TableProvider}
@@ -230,7 +231,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*
* @since 2.0.0
*/
- def json(path: String): DataFrame = format("json").load(path)
+ def json(path: String): DataFrame = {
+ userSpecifiedSchema.foreach(ExprUtils.checkJsonSchema(_).foreach(throw _))
+ format("json").load(path)
+ }
/**
* Loads a CSV file stream and returns the result as a `DataFrame`.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f2d0b60..bf161a1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2993,6 +2993,33 @@ class DataFrameSuite extends QueryTest
.withSequenceColumn("default_index").collect().map(_.getLong(0))
assert(ids.toSet === Range(0, 10).toSet)
}
+
+ test("SPARK-35320: Reading JSON with key type different to String in a map should fail") {
+ Seq(
+ (MapType(IntegerType, StringType), """{"1": "test"}"""),
+ (StructType(Seq(StructField("test", MapType(IntegerType, StringType)))),
+ """"test": {"1": "test"}"""),
+ (ArrayType(MapType(IntegerType, StringType)), """[{"1": "test"}]"""),
+ (MapType(StringType, MapType(IntegerType, StringType)), """{"key": {"1" : "test"}}""")
+ ).foreach { case (schema, jsonData) =>
+ withTempDir { dir =>
+ val colName = "col"
+ val msg = "can only contain StringType as a key type for a MapType"
+
+ val thrown1 = intercept[AnalysisException](
+ spark.read.schema(StructType(Seq(StructField(colName, schema))))
+ .json(Seq(jsonData).toDS()).collect())
+ assert(thrown1.getMessage.contains(msg))
+
+ val jsonDir = new File(dir, "json").getCanonicalPath
+ Seq(jsonData).toDF(colName).write.json(jsonDir)
+ val thrown2 = intercept[AnalysisException](
+ spark.read.schema(StructType(Seq(StructField(colName, schema))))
+ .json(jsonDir).collect())
+ assert(thrown2.getMessage.contains(msg))
+ }
+ }
+ }
}
case class GroupByKey(a: Int, b: Int)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index a91856a..868946f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -2240,6 +2240,25 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
require(stringToFile(tempFile, content).renameTo(finalFile))
finalFile
}
+
+ test("SPARK-35320: Reading JSON with key type different to String in a map should fail") {
+ Seq(
+ MapType(IntegerType, StringType),
+ StructType(Seq(StructField("test", MapType(IntegerType, StringType)))),
+ ArrayType(MapType(IntegerType, StringType)),
+ MapType(StringType, MapType(IntegerType, StringType))
+ ).foreach { schema =>
+ withTempDir { dir =>
+ val colName = "col"
+ val msg = "can only contain StringType as a key type for a MapType"
+
+ val thrown1 = intercept[AnalysisException](
+ spark.readStream.schema(StructType(Seq(StructField(colName, schema))))
+ .json(dir.getCanonicalPath).schema)
+ assert(thrown1.getMessage.contains(msg))
+ }
+ }
+ }
}
class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org