You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2022/03/09 02:23:18 UTC

[spark] branch branch-3.2 updated: [SPARK-38412][SS] Fix the swapped sequence of from and to in StateSchemaCompatibilityChecker

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new f1efc95  [SPARK-38412][SS] Fix the swapped sequence of from and to in StateSchemaCompatibilityChecker
f1efc95 is described below

commit f1efc955940176c9fa84cdf7b2c71563c5df47d2
Author: Jungtaek Lim <ka...@gmail.com>
AuthorDate: Wed Mar 9 11:03:57 2022 +0900

    [SPARK-38412][SS] Fix the swapped sequence of from and to in StateSchemaCompatibilityChecker
    
    ### What changes were proposed in this pull request?
    
    This PR fixes the StateSchemaCompatibilityChecker which mistakenly swapped `from` (should be provided schema) and `to` (should be existing schema).
    
    ### Why are the changes needed?
    
    The bug mistakenly allows the case where it should not be allowed, and disallows the case where it should be allowed.
    
    That allows nullable column to be stored into non-nullable column, which should be prohibited. This is less likely making runtime problem since state schema is conceptual one and row can be stored even not respecting the state schema.
    
    The opposite case is worse, that disallows non-nullable column to be stored into nullable column, which should be allowed. Spark fails the query for this case.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, after the fix, storing non-nullable column into nullable column for state will be allowed, which should have been allowed.
    
    ### How was this patch tested?
    
    Modified UTs.
    
    Closes #35731 from HeartSaVioR/SPARK-38412.
    
    Authored-by: Jungtaek Lim <ka...@gmail.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
    (cherry picked from commit 43c7824bba40ebfb64dcd50d8d0e84b5a4d3c8c7)
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../state/StateSchemaCompatibilityChecker.scala    |  2 +-
 .../StateSchemaCompatibilityCheckerSuite.scala     | 51 ++++++++++++++++++----
 2 files changed, 44 insertions(+), 9 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
index 20625e1..0c8cabb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
@@ -72,7 +72,7 @@ class StateSchemaCompatibilityChecker(
   }
 
   private def schemasCompatible(storedSchema: StructType, schema: StructType): Boolean =
-    DataType.equalsIgnoreNameAndCompatibleNullability(storedSchema, schema)
+    DataType.equalsIgnoreNameAndCompatibleNullability(schema, storedSchema)
 
   // Visible for testing
   private[sql] def readSchemaFile(): (StructType, StructType) = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
index a9cc90c..1539341 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
@@ -63,6 +63,8 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession {
   private val valueSchema65535Bytes = new StructType()
     .add(StructField("v" * (65535 - 87), IntegerType, nullable = true))
 
+  // Checks on adding/removing (nested) field.
+
   test("adding field to key should fail") {
     val fieldAddedKeySchema = keySchema.add(StructField("newKey", IntegerType))
     verifyException(keySchema, valueSchema, fieldAddedKeySchema, valueSchema)
@@ -107,6 +109,8 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession {
     verifyException(keySchema, valueSchema, keySchema, newValueSchema)
   }
 
+  // Checks on changing type of (nested) field.
+
   test("changing the type of field in key should fail") {
     val typeChangedKeySchema = StructType(keySchema.map(_.copy(dataType = TimestampType)))
     verifyException(keySchema, valueSchema, typeChangedKeySchema, valueSchema)
@@ -129,28 +133,59 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession {
     verifyException(keySchema, valueSchema, keySchema, newValueSchema)
   }
 
-  test("changing the nullability of nullable to non-nullable in key should fail") {
+  // Checks on changing nullability of (nested) field.
+  // Note that these tests have different format of the test name compared to others, since it was
+  // misleading to understand the assignment as the opposite way.
+
+  test("storing non-nullable column into nullable column in key should be allowed") {
     val nonNullChangedKeySchema = StructType(keySchema.map(_.copy(nullable = false)))
-    verifyException(keySchema, valueSchema, nonNullChangedKeySchema, valueSchema)
+    verifySuccess(keySchema, valueSchema, nonNullChangedKeySchema, valueSchema)
   }
 
-  test("changing the nullability of nullable to non-nullable in value should fail") {
+  test("storing non-nullable column into nullable column in value schema should be allowed") {
     val nonNullChangedValueSchema = StructType(valueSchema.map(_.copy(nullable = false)))
-    verifyException(keySchema, valueSchema, keySchema, nonNullChangedValueSchema)
+    verifySuccess(keySchema, valueSchema, keySchema, nonNullChangedValueSchema)
   }
 
-  test("changing the nullability of nullable to nonnullable in nested field in key should fail") {
+  test("storing non-nullable into nullable in nested field in key should be allowed") {
     val typeChangedNestedSchema = StructType(structSchema.map(_.copy(nullable = false)))
     val newKeySchema = applyNewSchemaToNestedFieldInKey(typeChangedNestedSchema)
-    verifyException(keySchema, valueSchema, newKeySchema, valueSchema)
+    verifySuccess(keySchema, valueSchema, newKeySchema, valueSchema)
   }
 
-  test("changing the nullability of nullable to nonnullable in nested field in value should fail") {
+  test("storing non-nullable into nullable in nested field in value should be allowed") {
     val typeChangedNestedSchema = StructType(structSchema.map(_.copy(nullable = false)))
     val newValueSchema = applyNewSchemaToNestedFieldInValue(typeChangedNestedSchema)
-    verifyException(keySchema, valueSchema, keySchema, newValueSchema)
+    verifySuccess(keySchema, valueSchema, keySchema, newValueSchema)
+  }
+
+  test("storing nullable column into non-nullable column in key should fail") {
+    val nonNullChangedKeySchema = StructType(keySchema.map(_.copy(nullable = false)))
+    verifyException(nonNullChangedKeySchema, valueSchema, keySchema, valueSchema)
+  }
+
+  test("storing nullable column into non-nullable column in value schema should fail") {
+    val nonNullChangedValueSchema = StructType(valueSchema.map(_.copy(nullable = false)))
+    verifyException(keySchema, nonNullChangedValueSchema, keySchema, valueSchema)
+  }
+
+  test("storing nullable column into non-nullable column in nested field in key should fail") {
+    val typeChangedNestedSchema = StructType(structSchema.map(_.copy(nullable = false)))
+    val newKeySchema = applyNewSchemaToNestedFieldInKey(typeChangedNestedSchema)
+    verifyException(newKeySchema, valueSchema, keySchema, valueSchema)
   }
 
+  test("storing nullable column into non-nullable column in nested field in value should fail") {
+    val typeChangedNestedSchema = StructType(structSchema.map(_.copy(nullable = false)))
+    val newValueSchema = applyNewSchemaToNestedFieldInValue(typeChangedNestedSchema)
+    verifyException(keySchema, newValueSchema, keySchema, valueSchema)
+  }
+
+  // Checks on changing name of (nested) field.
+  // Changing the name is allowed since it may be possible Spark can make relevant changes from
+  // operators/functions by chance. This opens a risk that end users swap two fields having same
+  // data type, but there is no way to address both.
+
   test("changing the name of field in key should be allowed") {
     val newName: StructField => StructField = f => f.copy(name = f.name + "_new")
     val fieldNameChangedKeySchema = StructType(keySchema.map(newName))

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org