You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/01/04 21:00:59 UTC

[spark] branch branch-3.1 updated: [SPARK-33980][SS] Invalidate char/varchar in spark.readStream.schema

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 390316e  [SPARK-33980][SS] Invalidate char/varchar in spark.readStream.schema
390316e is described below

commit 390316ef919e25cec710c5229245818efee88a71
Author: Kent Yao <ya...@apache.org>
AuthorDate: Mon Jan 4 12:59:45 2021 -0800

    [SPARK-33980][SS] Invalidate char/varchar in spark.readStream.schema
    
    ### What changes were proposed in this pull request?
    
    invalidate char/varchar in `spark.readStream.schema` just like what we've done for `spark.read.schema` in da72b87374a7be5416b99ed016dc2fc9da0ed88a
    
    ### Why are the changes needed?
    
    bugfix, char/varchar is only for table schema while `spark.sql.legacy.charVarcharAsString=false`
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, char/varchar will fail to define ss readers when `spark.sql.legacy.charVarcharAsString=false`
    
    ### How was this patch tested?
    
    new tests
    
    Closes #31003 from yaooqinn/SPARK-33980.
    
    Authored-by: Kent Yao <ya...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit ac4651a7d19b248c86290d419ac3f6d69ed2b61e)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/sql/streaming/DataStreamReader.scala |  7 +++++--
 .../scala/org/apache/spark/sql/CharVarcharTestSuite.scala | 15 +++++++++++++++
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index eb7bb5c..d82fa9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -64,7 +64,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    * @since 2.0.0
    */
   def schema(schema: StructType): DataStreamReader = {
-    this.userSpecifiedSchema = Option(CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema))
+    val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
+    this.userSpecifiedSchema = Option(replaced)
     this
   }
 
@@ -76,7 +77,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    * @since 2.3.0
    */
   def schema(schemaString: String): DataStreamReader = {
-    this.userSpecifiedSchema = Option(StructType.fromDDL(schemaString))
+    val rawSchema = StructType.fromDDL(schemaString)
+    val schema = CharVarcharUtils.failIfHasCharVarchar(rawSchema).asInstanceOf[StructType]
+    this.userSpecifiedSchema = Option(schema)
     this
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index 9d4b7c4..62d0f51 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -549,6 +549,21 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession {
       assert(df2.schema.head.dataType === StringType)
     }
   }
+
+  test("invalidate char/varchar in spark.readStream.schema") {
+    failWithInvalidCharUsage(spark.readStream.schema(new StructType().add("id", CharType(5))))
+    failWithInvalidCharUsage(spark.readStream.schema("id char(5)"))
+    withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
+      withTempPath { dir =>
+        spark.range(2).write.save(dir.toString)
+        val df1 = spark.readStream.schema(new StructType().add("id", CharType(5)))
+          .load(dir.toString)
+        assert(df1.schema.map(_.dataType) == Seq(StringType))
+        val df2 = spark.readStream.schema("id char(5)").load(dir.toString)
+        assert(df2.schema.map(_.dataType) == Seq(StringType))
+      }
+    }
+  }
 }
 
 class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSparkSession {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org