You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/01/04 21:00:59 UTC
[spark] branch branch-3.1 updated: [SPARK-33980][SS] Invalidate
char/varchar in spark.readStream.schema
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 390316e [SPARK-33980][SS] Invalidate char/varchar in spark.readStream.schema
390316e is described below
commit 390316ef919e25cec710c5229245818efee88a71
Author: Kent Yao <ya...@apache.org>
AuthorDate: Mon Jan 4 12:59:45 2021 -0800
[SPARK-33980][SS] Invalidate char/varchar in spark.readStream.schema
### What changes were proposed in this pull request?
invalidate char/varchar in `spark.readStream.schema` just like what we've done for `spark.read.schema` in da72b87374a7be5416b99ed016dc2fc9da0ed88a
### Why are the changes needed?
bugfix, char/varchar is only for table schema while `spark.sql.legacy.charVarcharAsString=false`
### Does this PR introduce _any_ user-facing change?
yes, char/varchar will fail to define ss readers when `spark.sql.legacy.charVarcharAsString=false`
### How was this patch tested?
new tests
Closes #31003 from yaooqinn/SPARK-33980.
Authored-by: Kent Yao <ya...@apache.org>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
(cherry picked from commit ac4651a7d19b248c86290d419ac3f6d69ed2b61e)
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../org/apache/spark/sql/streaming/DataStreamReader.scala | 7 +++++--
.../scala/org/apache/spark/sql/CharVarcharTestSuite.scala | 15 +++++++++++++++
2 files changed, 20 insertions(+), 2 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index eb7bb5c..d82fa9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -64,7 +64,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* @since 2.0.0
*/
def schema(schema: StructType): DataStreamReader = {
- this.userSpecifiedSchema = Option(CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema))
+ val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
+ this.userSpecifiedSchema = Option(replaced)
this
}
@@ -76,7 +77,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* @since 2.3.0
*/
def schema(schemaString: String): DataStreamReader = {
- this.userSpecifiedSchema = Option(StructType.fromDDL(schemaString))
+ val rawSchema = StructType.fromDDL(schemaString)
+ val schema = CharVarcharUtils.failIfHasCharVarchar(rawSchema).asInstanceOf[StructType]
+ this.userSpecifiedSchema = Option(schema)
this
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index 9d4b7c4..62d0f51 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -549,6 +549,21 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession {
assert(df2.schema.head.dataType === StringType)
}
}
+
+ test("invalidate char/varchar in spark.readStream.schema") {
+ failWithInvalidCharUsage(spark.readStream.schema(new StructType().add("id", CharType(5))))
+ failWithInvalidCharUsage(spark.readStream.schema("id char(5)"))
+ withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
+ withTempPath { dir =>
+ spark.range(2).write.save(dir.toString)
+ val df1 = spark.readStream.schema(new StructType().add("id", CharType(5)))
+ .load(dir.toString)
+ assert(df1.schema.map(_.dataType) == Seq(StringType))
+ val df2 = spark.readStream.schema("id char(5)").load(dir.toString)
+ assert(df2.schema.map(_.dataType) == Seq(StringType))
+ }
+ }
+ }
}
class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSparkSession {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org