You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2022/07/02 14:23:19 UTC

[spark] branch branch-3.2 updated: [SPARK-39650][SS] Fix incorrect value schema in streaming deduplication with backward compatibility

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 9adfc3a21fd [SPARK-39650][SS] Fix incorrect value schema in streaming deduplication with backward compatibility
9adfc3a21fd is described below

commit 9adfc3a21fd566f0eb37537e4137dd448af3aee1
Author: Jungtaek Lim <ka...@gmail.com>
AuthorDate: Sat Jul 2 22:46:03 2022 +0900

    [SPARK-39650][SS] Fix incorrect value schema in streaming deduplication with backward compatibility
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to fix the incorrect value schema in streaming deduplication. It stores the empty row having a single column with null (using NullType), but the value schema is specified as all columns, which leads incorrect behavior from state store schema compatibility checker.
    
    This PR proposes to set the schema of value as `StructType(Array(StructField("__dummy__", NullType)))` to fit with the empty row. With this change, the streaming queries creating the checkpoint after this fix would work smoothly.
    
    To not break the existing streaming queries having incorrect value schema, this PR proposes to disable the check for value schema on streaming deduplication. Disabling the value check was there for the format validation (we have two different checkers for state store), but it has been missing for state store schema compatibility check. To avoid adding more config, this PR leverages the existing config "format validation" is using.
    
    ### Why are the changes needed?
    
    This is a bug fix. Suppose the streaming query below:
    
    ```
    # df has the columns `a`, `b`, `c`
    val df = spark.readStream.format("...").load()
    val query = df.dropDuplicate("a").writeStream.format("...").start()
    ```
    
    while the query is running, df can produce a different set of columns (e.g. `a`, `b`, `c`, `d`) from the same source due to schema evolution. Since we only deduplicate the rows with column `a`, the change of schema should not matter for streaming deduplication, but state store schema checker throws error saying "value schema is not compatible" before this fix.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is basically a bug fix which end users wouldn't notice unless they encountered a bug.
    
    ### How was this patch tested?
    
    New tests.
    
    Closes #37041 from HeartSaVioR/SPARK-39650.
    
    Authored-by: Jungtaek Lim <ka...@gmail.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
    (cherry picked from commit fe536033bdd00d921b3c86af329246ca55a4f46a)
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../state/StateSchemaCompatibilityChecker.scala    |  26 ++++++--
 .../sql/execution/streaming/state/StateStore.scala |   7 ++-
 .../execution/streaming/state/StateStoreConf.scala |   7 ++-
 .../execution/streaming/statefulOperators.scala    |   4 +-
 .../commits/.0.crc                                 | Bin 0 -> 12 bytes
 .../commits/.1.crc                                 | Bin 0 -> 12 bytes
 .../commits/0                                      |   2 +
 .../commits/1                                      |   2 +
 .../metadata                                       |   1 +
 .../offsets/.0.crc                                 | Bin 0 -> 16 bytes
 .../offsets/.1.crc                                 | Bin 0 -> 16 bytes
 .../offsets/0                                      |   3 +
 .../offsets/1                                      |   3 +
 .../state/0/0/.1.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/0/.2.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/0/1.delta                              | Bin 0 -> 77 bytes
 .../state/0/0/2.delta                              | Bin 0 -> 46 bytes
 .../state/0/0/_metadata/.schema.crc                | Bin 0 -> 12 bytes
 .../state/0/0/_metadata/schema                     | Bin 0 -> 254 bytes
 .../state/0/1/.1.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/1/.2.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/1/1.delta                              | Bin 0 -> 46 bytes
 .../state/0/1/2.delta                              | Bin 0 -> 77 bytes
 .../state/0/2/.1.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/2/.2.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/2/1.delta                              | Bin 0 -> 46 bytes
 .../state/0/2/2.delta                              | Bin 0 -> 46 bytes
 .../state/0/3/.1.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/3/.2.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/3/1.delta                              | Bin 0 -> 46 bytes
 .../state/0/3/2.delta                              | Bin 0 -> 46 bytes
 .../state/0/4/.1.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/4/.2.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/4/1.delta                              | Bin 0 -> 46 bytes
 .../state/0/4/2.delta                              | Bin 0 -> 46 bytes
 .../StateSchemaCompatibilityCheckerSuite.scala     |  49 +++++++++++----
 .../streaming/StreamingDeduplicationSuite.scala    |  70 +++++++++++++++++++++
 37 files changed, 152 insertions(+), 22 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
index 0c8cabb75ed..80384f8cb3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
@@ -41,20 +41,34 @@ class StateSchemaCompatibilityChecker(
   fm.mkdirs(schemaFileLocation.getParent)
 
   def check(keySchema: StructType, valueSchema: StructType): Unit = {
+    check(keySchema, valueSchema, ignoreValueSchema = false)
+  }
+
+  def check(keySchema: StructType, valueSchema: StructType, ignoreValueSchema: Boolean): Unit = {
     if (fm.exists(schemaFileLocation)) {
       logDebug(s"Schema file for provider $providerId exists. Comparing with provided schema.")
       val (storedKeySchema, storedValueSchema) = readSchemaFile()
-      if (storedKeySchema.equals(keySchema) && storedValueSchema.equals(valueSchema)) {
+      if (storedKeySchema.equals(keySchema) &&
+        (ignoreValueSchema || storedValueSchema.equals(valueSchema))) {
         // schema is exactly same
       } else if (!schemasCompatible(storedKeySchema, keySchema) ||
-        !schemasCompatible(storedValueSchema, valueSchema)) {
+        (!ignoreValueSchema && !schemasCompatible(storedValueSchema, valueSchema))) {
+        val errorMsgForKeySchema = s"- Provided key schema: $keySchema\n" +
+          s"- Existing key schema: $storedKeySchema\n"
+
+        // If it is requested to skip checking the value schema, we also don't expose the value
+        // schema information to the error message.
+        val errorMsgForValueSchema = if (!ignoreValueSchema) {
+          s"- Provided value schema: $valueSchema\n" +
+            s"- Existing value schema: $storedValueSchema\n"
+        } else {
+          ""
+        }
         val errorMsg = "Provided schema doesn't match to the schema for existing state! " +
           "Please note that Spark allow difference of field name: check count of fields " +
           "and data type of each field.\n" +
-          s"- Provided key schema: $keySchema\n" +
-          s"- Provided value schema: $valueSchema\n" +
-          s"- Existing key schema: $storedKeySchema\n" +
-          s"- Existing value schema: $storedValueSchema\n" +
+          errorMsgForKeySchema +
+          errorMsgForValueSchema +
           s"If you want to force running query without schema validation, please set " +
           s"${SQLConf.STATE_SCHEMA_CHECK_ENABLED.key} to false.\n" +
           "Please note running query with incompatible schema could cause indeterministic" +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 5020638abc4..5d65c8e9f20 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -511,7 +511,12 @@ object StateStore extends Logging {
           val checker = new StateSchemaCompatibilityChecker(storeProviderId, hadoopConf)
           // regardless of configuration, we check compatibility to at least write schema file
           // if necessary
-          val ret = Try(checker.check(keySchema, valueSchema)).toEither.fold(Some(_), _ => None)
+          // if the format validation for value schema is disabled, we also disable the schema
+          // compatibility checker for value schema as well.
+          val ret = Try(
+            checker.check(keySchema, valueSchema,
+              ignoreValueSchema = !storeConf.formatValidationCheckValue)
+          ).toEither.fold(Some(_), _ => None)
           if (storeConf.stateSchemaCheckEnabled) {
             ret
           } else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index 529db2609cd..66bb37d7a57 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -48,7 +48,12 @@ class StateStoreConf(
   /** Whether validate the underlying format or not. */
   val formatValidationEnabled: Boolean = sqlConf.stateStoreFormatValidationEnabled
 
-  /** Whether validate the value format when the format invalidation enabled. */
+  /**
+   * Whether to validate the value side. This config is applied to both validators as below:
+   *
+   * - whether to validate the value format when the format validation is enabled.
+   * - whether to validate the value schema when the state schema check is enabled.
+   */
   val formatValidationCheckValue: Boolean =
     extraOptions.getOrElse(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG, "true") == "true"
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index bcfdeb4f85c..11a9776fa9b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -750,13 +750,15 @@ case class StreamingDeduplicateExec(
       keyExpressions, getStateInfo, conf) :: Nil
   }
 
+  private val schemaForEmptyRow: StructType = StructType(Array(StructField("__dummy__", NullType)))
+
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
     child.execute().mapPartitionsWithStateStore(
       getStateInfo,
       keyExpressions.toStructType,
-      child.output.toStructType,
+      schemaForEmptyRow,
       numColsPrefixKey = 0,
       session.sessionState,
       Some(session.streams.stateStoreCoordinator),
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.0.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.0.crc
new file mode 100644
index 00000000000..1aee7033161
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.0.crc differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.1.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.1.crc
new file mode 100644
index 00000000000..1aee7033161
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.1.crc differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/0
new file mode 100644
index 00000000000..9c1e3021c3e
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/0
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/1 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/1
new file mode 100644
index 00000000000..9c1e3021c3e
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/1
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/metadata b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/metadata
new file mode 100644
index 00000000000..78bd74a789f
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/metadata
@@ -0,0 +1 @@
+{"id":"33e8de33-00b8-4b60-8246-df2f433257ff"}
\ No newline at end of file
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.0.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.0.crc
new file mode 100644
index 00000000000..726c678bc6a
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.0.crc differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.1.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.1.crc
new file mode 100644
index 00000000000..790f681f1aa
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.1.crc differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/0
new file mode 100644
index 00000000000..443c6824358
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/0
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1656644489789,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","s [...]
+0
\ No newline at end of file
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/1 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/1
new file mode 100644
index 00000000000..67b42175563
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/1
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1656644492462,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","s [...]
+1
\ No newline at end of file
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.1.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.1.delta.crc
new file mode 100644
index 00000000000..1992982c58f
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.1.delta.crc differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.2.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.2.delta.crc
new file mode 100644
index 00000000000..cf1d68e2ace
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.2.delta.crc differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/1.delta
new file mode 100644
index 00000000000..fec40e83a54
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/1.delta differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/2.delta
new file mode 100644
index 00000000000..63529780518
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/2.delta differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/.schema.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/.schema.crc
new file mode 100644
index 00000000000..022717c6b50
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/.schema.crc differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/schema b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/schema
new file mode 100644
index 00000000000..f132f9601b7
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/schema differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.1.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.1.delta.crc
new file mode 100644
index 00000000000..cf1d68e2ace
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.1.delta.crc differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.2.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.2.delta.crc
new file mode 100644
index 00000000000..d18b77b93af
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.2.delta.crc differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/1.delta
new file mode 100644
index 00000000000..63529780518
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/1.delta differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/2.delta
new file mode 100644
index 00000000000..fcbf8df80f5
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/2.delta differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.1.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.1.delta.crc
new file mode 100644
index 00000000000..cf1d68e2ace
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.1.delta.crc differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.2.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.2.delta.crc
new file mode 100644
index 00000000000..cf1d68e2ace
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.2.delta.crc differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/1.delta
new file mode 100644
index 00000000000..63529780518
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/1.delta differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/2.delta
new file mode 100644
index 00000000000..63529780518
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/2.delta differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.1.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.1.delta.crc
new file mode 100644
index 00000000000..cf1d68e2ace
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.1.delta.crc differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.2.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.2.delta.crc
new file mode 100644
index 00000000000..cf1d68e2ace
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.2.delta.crc differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/1.delta
new file mode 100644
index 00000000000..63529780518
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/1.delta differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/2.delta
new file mode 100644
index 00000000000..63529780518
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/2.delta differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.1.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.1.delta.crc
new file mode 100644
index 00000000000..cf1d68e2ace
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.1.delta.crc differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.2.delta.crc b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.2.delta.crc
new file mode 100644
index 00000000000..cf1d68e2ace
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.2.delta.crc differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/1.delta
new file mode 100644
index 00000000000..63529780518
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/1.delta differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/2.delta
new file mode 100644
index 00000000000..63529780518
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/2.delta differ
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
index 15393413593..7ba18a81404 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
@@ -231,6 +231,16 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession {
     assert((resultKeySchema, resultValueSchema) === (keySchema, valueSchema))
   }
 
+  test("SPARK-39650: ignore value schema on compatibility check") {
+    val typeChangedValueSchema = StructType(valueSchema.map(_.copy(dataType = TimestampType)))
+    verifySuccess(keySchema, valueSchema, keySchema, typeChangedValueSchema,
+      ignoreValueSchema = true)
+
+    val typeChangedKeySchema = StructType(keySchema.map(_.copy(dataType = TimestampType)))
+    verifyException(keySchema, valueSchema, typeChangedKeySchema, valueSchema,
+      ignoreValueSchema = true)
+  }
+
   private def applyNewSchemaToNestedFieldInKey(newNestedSchema: StructType): StructType = {
     applyNewSchemaToNestedField(keySchema, newNestedSchema, "key3")
   }
@@ -257,44 +267,57 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession {
       dir: String,
       queryId: UUID,
       newKeySchema: StructType,
-      newValueSchema: StructType): Unit = {
+      newValueSchema: StructType,
+      ignoreValueSchema: Boolean): Unit = {
     // in fact, Spark doesn't support online state schema change, so need to check
     // schema only once for each running of JVM
     val providerId = StateStoreProviderId(
       StateStoreId(dir, opId, partitionId), queryId)
 
     new StateSchemaCompatibilityChecker(providerId, hadoopConf)
-      .check(newKeySchema, newValueSchema)
+      .check(newKeySchema, newValueSchema, ignoreValueSchema = ignoreValueSchema)
   }
 
   private def verifyException(
       oldKeySchema: StructType,
       oldValueSchema: StructType,
       newKeySchema: StructType,
-      newValueSchema: StructType): Unit = {
+      newValueSchema: StructType,
+      ignoreValueSchema: Boolean = false): Unit = {
     val dir = newDir()
     val queryId = UUID.randomUUID()
-    runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema)
+    runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema,
+      ignoreValueSchema = ignoreValueSchema)
 
     val e = intercept[StateSchemaNotCompatible] {
-      runSchemaChecker(dir, queryId, newKeySchema, newValueSchema)
+      runSchemaChecker(dir, queryId, newKeySchema, newValueSchema,
+        ignoreValueSchema = ignoreValueSchema)
     }
 
-    e.getMessage.contains("Provided schema doesn't match to the schema for existing state!")
-    e.getMessage.contains(newKeySchema.json)
-    e.getMessage.contains(newValueSchema.json)
-    e.getMessage.contains(oldKeySchema.json)
-    e.getMessage.contains(oldValueSchema.json)
+    assert(e.getMessage.contains("Provided schema doesn't match to the schema for existing state!"))
+    assert(e.getMessage.contains(newKeySchema.toString()))
+    assert(e.getMessage.contains(oldKeySchema.toString()))
+
+    if (ignoreValueSchema) {
+      assert(!e.getMessage.contains(newValueSchema.toString()))
+      assert(!e.getMessage.contains(oldValueSchema.toString()))
+    } else {
+      assert(e.getMessage.contains(newValueSchema.toString()))
+      assert(e.getMessage.contains(oldValueSchema.toString()))
+    }
   }
 
   private def verifySuccess(
       oldKeySchema: StructType,
       oldValueSchema: StructType,
       newKeySchema: StructType,
-      newValueSchema: StructType): Unit = {
+      newValueSchema: StructType,
+      ignoreValueSchema: Boolean = false): Unit = {
     val dir = newDir()
     val queryId = UUID.randomUUID()
-    runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema)
-    runSchemaChecker(dir, queryId, newKeySchema, newValueSchema)
+    runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema,
+      ignoreValueSchema = ignoreValueSchema)
+    runSchemaChecker(dir, queryId, newKeySchema, newValueSchema,
+      ignoreValueSchema = ignoreValueSchema)
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
index aa03da6c584..39e46a12d65 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
@@ -17,11 +17,16 @@
 
 package org.apache.spark.sql.streaming
 
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
 
 class StreamingDeduplicationSuite extends StateStoreMetricsTest {
 
@@ -413,4 +418,69 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 1)
     )
   }
+
+  test("SPARK-39650: duplicate with specific keys should allow input to change schema") {
+    withTempDir { checkpoint =>
+      val dedupeInputData = MemoryStream[(String, Int)]
+      val dedupe = dedupeInputData.toDS().dropDuplicates("_1")
+
+      testStream(dedupe, Append)(
+        StartStream(checkpointLocation = checkpoint.getCanonicalPath),
+
+        AddData(dedupeInputData, "a" -> 1),
+        CheckLastBatch("a" -> 1),
+
+        AddData(dedupeInputData, "a" -> 2, "b" -> 3),
+        CheckLastBatch("b" -> 3)
+      )
+
+      val dedupeInputData2 = MemoryStream[(String, Int, String)]
+      val dedupe2 = dedupeInputData2.toDS().dropDuplicates("_1")
+
+      // initialize new memory stream with previously executed batches
+      dedupeInputData2.addData(("a", 1, "dummy"))
+      dedupeInputData2.addData(Seq(("a", 2, "dummy"), ("b", 3, "dummy")))
+
+      testStream(dedupe2, Append)(
+        StartStream(checkpointLocation = checkpoint.getCanonicalPath),
+
+        AddData(dedupeInputData2, ("a", 5, "a"), ("b", 2, "b"), ("c", 9, "c")),
+        CheckLastBatch(("c", 9, "c"))
+      )
+    }
+  }
+
+  test("SPARK-39650: recovery from checkpoint having all columns as value schema") {
+    // NOTE: We are also changing the schema of input compared to the checkpoint. In the checkpoint
+    // we define the input schema as (String, Int).
+    val inputData = MemoryStream[(String, Int, String)]
+    val dedupe = inputData.toDS().dropDuplicates("_1")
+
+    // The fix will land after Spark 3.3.0, hence we can check backward compatibility with
+    // checkpoint being built from Spark 3.3.0.
+    val resourceUri = this.getClass.getResource(
+      "/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/").toURI
+
+    val checkpointDir = Utils.createTempDir().getCanonicalFile
+    // Copy the checkpoint to a temp dir to prevent changes to the original.
+    // Not doing this will lead to the test passing on the first run, but fail subsequent runs.
+    FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
+
+    inputData.addData(("a", 1, "dummy"))
+    inputData.addData(("a", 2, "dummy"), ("b", 3, "dummy"))
+
+    testStream(dedupe, Append)(
+      StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+      /*
+        Note: The checkpoint was generated using the following input in Spark version 3.3.0
+        AddData(inputData, ("a", 1)),
+        CheckLastBatch(("a", 1)),
+        AddData(inputData, ("a", 2), ("b", 3)),
+        CheckLastBatch(("b", 3))
+       */
+
+      AddData(inputData, ("a", 5, "a"), ("b", 2, "b"), ("c", 9, "c")),
+      CheckLastBatch(("c", 9, "c"))
+    )
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org