You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "WweiL (via GitHub)" <gi...@apache.org> on 2023/12/29 02:24:48 UTC

Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1437952483


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##########
@@ -814,68 +814,183 @@ class MultiStatefulOperatorsSuite
     }
   }
 
-  test("stream-stream time interval join - output watermark for various intervals") {
-    def testOutputWatermarkInJoin(
-        df: DataFrame,
-        input: MemoryStream[(String, Timestamp)],
-        expectedOutputWatermark: Long): Unit = {
-      testStream(df)(
-        // dummy row to trigger execution
-        AddData(input, ("1", Timestamp.valueOf("2023-01-01 01:00:10"))),
-        CheckAnswer(),
-        Execute { query =>
-          val lastExecution = query.lastExecution
-          val joinOperator = lastExecution.executedPlan.collect {
-            case j: StreamingSymmetricHashJoinExec => j
-          }.head
+  test("SPARK-45637-1 join on window, append mode") {

Review Comment:
   ```
   -=-=-=-=-=-=-=
   currentBatchId = 0, lastExecutionRequiresAnotherBatch = false, isNewDataAvailable = true, shouldConstructNextBatch (with currbatchID) = true
   stateKeyWatermarkPredicateFunc: (input[0, struct<start:timestamp,end:timestamp>, false].end <= 0)
   stateKeyWatermarkPredicateFunc: (input[0, struct<start:timestamp,end:timestamp>, false].end <= 0)
   wei==thisrow: [0,1000000018,0,0,4c4b40]
   this row filtered: [0,1000000018,0,0,4c4b40]
   getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 0
   !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,0,4c4b40]
   wei==thisrow: [0,1000000018,0,4c4b40,989680]
   this row filtered: [0,1000000018,0,4c4b40,989680]
   getJoinedRows ------ key: [0,1000000018,0,4c4b40,989680] numValues: 0
   !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,4c4b40,989680]
   wei==thisrow: [0,1000000018,0,0,4c4b40]
   this row filtered: [0,1000000018,0,0,4c4b40]
   getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 1
   joinedRow: {[0,1000000018,0,0,4c4b40] + [0,1000000018,0,0,4c4b40]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,0,4c4b40]
   wei==thisrow: [0,1000000018,0,0,4c4b40]
   this row filtered: [0,1000000018,0,0,4c4b40]
   getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 1
   joinedRow: {[0,1000000018,0,0,4c4b40] + [0,1000000018,0,0,4c4b40]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,0,4c4b40]
   wei==thisrow: [0,1000000018,0,0,4c4b40]
   this row filtered: [0,1000000018,0,0,4c4b40]
   getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 1
   joinedRow: {[0,1000000018,0,0,4c4b40] + [0,1000000018,0,0,4c4b40]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,0,4c4b40]
   wei==thisrow: [0,1000000018,0,0,4c4b40]
   this row filtered: [0,1000000018,0,0,4c4b40]
   getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 1
   joinedRow: {[0,1000000018,0,0,4c4b40] + [0,1000000018,0,0,4c4b40]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,0,4c4b40]
   wei==thisrow: [0,1000000018,0,0,4c4b40]
   this row filtered: [0,1000000018,0,0,4c4b40]
   getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 1
   joinedRow: {[0,1000000018,0,0,4c4b40] + [0,1000000018,0,0,4c4b40]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,0,4c4b40]
   wei==thisrow: [0,1000000018,0,4c4b40,989680]
   this row filtered: [0,1000000018,0,4c4b40,989680]
   getJoinedRows ------ key: [0,1000000018,0,4c4b40,989680] numValues: 1
   joinedRow: {[0,1000000018,0,4c4b40,989680] + [0,1000000018,0,4c4b40,989680]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,4c4b40,989680]
   wei==thisrow: [0,1000000018,0,4c4b40,989680]
   this row filtered: [0,1000000018,0,4c4b40,989680]
   getJoinedRows ------ key: [0,1000000018,0,4c4b40,989680] numValues: 1
   joinedRow: {[0,1000000018,0,4c4b40,989680] + [0,1000000018,0,4c4b40,989680]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,4c4b40,989680]
   wei==thisrow: [0,1000000018,0,4c4b40,989680]
   this row filtered: [0,1000000018,0,4c4b40,989680]
   getJoinedRows ------ key: [0,1000000018,0,4c4b40,989680] numValues: 1
   joinedRow: {[0,1000000018,0,4c4b40,989680] + [0,1000000018,0,4c4b40,989680]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,4c4b40,989680]
   wei==thisrow: [0,1000000018,0,4c4b40,989680]
   this row filtered: [0,1000000018,0,4c4b40,989680]
   getJoinedRows ------ key: [0,1000000018,0,4c4b40,989680] numValues: 1
   joinedRow: {[0,1000000018,0,4c4b40,989680] + [0,1000000018,0,4c4b40,989680]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,4c4b40,989680]
   allKeyToNumValues.hasNext: true
   ---------------removeByKeyCondition--------------- allKeyToNumValues.hasNext
   18:21:47.427 ERROR org.apache.spark.util.Utils: Aborting task
   java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyAndNumValues.key()" because the return value of "org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKeyToNumValue()" is null
   	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKey(SymmetricHashJoinStateManager.scala:155)
   	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:178)
   	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:148)
   	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
   	at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:1149)
   	at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$processPartitions$25(StreamingSymmetricHashJoinExec.scala:479)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
   18:21:47.429 ERROR org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask: Aborting commit for partition 0 (task 2, attempt 0, stage 2.0)
   18:21:47.429 ERROR org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask: Aborted commit for partition 0 (task 2, attempt 0, stage 2.0)
   18:21:47.438 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 2.0 (TID 2)
   java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyAndNumValues.key()" because the return value of "org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKeyToNumValue()" is null
   	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKey(SymmetricHashJoinStateManager.scala:155)
   	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:178)
   	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:148)
   	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
   	at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:1149)
   	at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$processPartitions$25(StreamingSymmetricHashJoinExec.scala:479)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
   18:21:47.445 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (10.0.0.11 executor driver): java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyAndNumValues.key()" because the return value of "org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKeyToNumValue()" is null
   	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKey(SymmetricHashJoinStateManager.scala:155)
   	at org.apache...
   18:21:47.446 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
   18:21:47.450 ERROR org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 0, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@553911d0] is aborting.
   18:21:47.450 ERROR org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 0, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@553911d0] aborted.
   18:21:47.496 ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 716405ff-bb4c-4d94-8604-6d160c7b6ab9, runId = 39081f98-c9a5-4def-a219-d5a119cbda74] terminated with error
   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (10.0.0.11 executor driver): java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyAndNumValues.key()" because the return value of "org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKeyToNumValue()" is null
   	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKey(SymmetricHashJoinStateManager.scala:155)
   	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:178)
   	at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:148)
   	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
   	at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:1149)
   	at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$processPartitions$25(StreamingSymmetricHashJoinExec.scala:479)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org