You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/15 17:55:36 UTC

[GitHub] [spark] WweiL opened a new pull request, #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

WweiL opened a new pull request, #38503:
URL: https://github.com/apache/spark/pull/38503

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   As a followup to [SPARK-40925], [github PR](https://github.com/apache/spark/pull/38405), Remove corresponding checks in UnsupportedOperationChecker so that customers don't have to explicitly add new conf withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") to use the new multi-stateful operators. In other words we are enabling multi-stateful operators by default.
   
   As a side effect, the API of `checkStreamingQueryGlobalWatermarkLimit(LogicalPlan, OutputMode)` is also changed to `checkStreamingQueryGlobalWatermarkLimit(LogicalPlan)` 
   
   New tests are added to `MultiStatefulOperatorsSuite.scala`, but I could also add equivalent ones to `UnsupportedOperationsSuite.scala` if requested. 
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   To enable new multiple-stateful operators by default. Right now users need to set SQL conf `unsupportedOperationCheck` to false explicitly, which also disables many other useful checks.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No. All current running queries won't be impacted. But new queries could use chained stateful operators.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   Unit Tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1016177560


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -169,16 +179,24 @@ object UnsupportedOperationChecker extends Logging {
           "DataFrames/Datasets")(plan)
     }
 
-    // Disallow multiple streaming aggregations
-    val aggregates = collectStreamingAggregates(plan)
+    val statefulOps = plan.collect {
+      case p: LogicalPlan if isStatefulOperation(p) => p
+    }
 
-    if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) {
+    if (statefulOps.size > 1 &&
+      outputMode != InternalOutputModes.Append &&

Review Comment:
   nit: indent this line and next line (2 spaces more)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1013649051


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The behavior is
    * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {
     def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
       case Join(left, right, joinType, _, _)

Review Comment:
   We can remove this line as we support outer join as well. We only have issue with stream-stream time interval join and flatMapGroupsWithState.
   (Arguably flatMapGroupsWithState with all output modes should be disallowed, but I believe we have a separate check for output mode so OK.)



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming

Review Comment:
   maybe missing &&? otherwise this line would be no-op



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -507,15 +507,13 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
       assertPassOnGlobalWatermarkLimit(
         s"single $joinType join in Append mode",
         streamRelation.join(streamRelation, joinType = RightOuter,
-          condition = Some(attributeWithWatermark === attribute)),

Review Comment:
   The above code comment and related tests are outdated. 
   
   For (time-window) equality join, (inner, left outer, right outer, and full outer) should just work with recent fix of late record filtering. For time-interval join, all types won't work if there is following stateful operator.
   
   That said, we should now have redundant test cases for time-interval join specifically to check the different expectation.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The behavior is
    * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {
     def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
       case Join(left, right, joinType, _, _)
         if left.isStreaming && right.isStreaming && joinType != Inner => true
       case f: FlatMapGroupsWithState
         if f.isStreaming && f.outputMode == OutputMode.Append() => true
-      case _ => false
+      case _ =>

Review Comment:
   nit: unnecessary change



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -157,10 +193,11 @@ object UnsupportedOperationChecker extends Logging {
     // Disallow multiple streaming aggregations
     val aggregates = collectStreamingAggregates(plan)
 
-    if (aggregates.size > 1) {
+    if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) {

Review Comment:
   I think it's not only multiple aggregations. Reasoning streaming aggregation followed by time window join with update mode would be very tricky. I'd say we should simply disallow multiple stateful operators with update and complete mode at all.
   (There could be some combinations where they still make sense for update/complete mode, but it doesn't seem to be trivial to reason and list up allowlist or blocklist.)
   
   cc. @alex-balikov 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The behavior is
    * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {
     def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
       case Join(left, right, joinType, _, _)

Review Comment:
   I think we should check the stream-stream time interval join in here. I understand having separate check helps us to produce better error message, but we are disallowing the query at all whereas it could run with the correctness check config turned off.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {

Review Comment:
   The method name is too general than what it actually does. Maybe `hasRangeExprAgainstEventTimeCol` ?
   Or just define the method from the start of `isStreamStreamIntervalJoin` so that it is under the proper implicit context.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##########
@@ -40,378 +40,465 @@ class MultiStatefulOperatorsSuite
   }
 
   test("window agg -> window agg, append mode") {
-    // TODO: SPARK-40940 - Fix the unsupported ops checker to allow chaining of stateful ops.
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val stream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .groupBy(window($"window", "10 seconds"))
-        .agg(count("*").as("count"), sum("count").as("sum"))
-        .select($"window".getField("start").cast("long").as[Long],
-          $"count".as[Long], $"sum".as[Long])
-
-      testStream(stream)(
-        AddData(inputData, 10 to 21: _*),
-        // op1 W (0, 0)
-        // agg: [10, 15) 5, [15, 20) 5, [20, 25) 2
-        // output: None
-        // state: [10, 15) 5, [15, 20) 5, [20, 25) 2
-        // op2 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 21)
-        // agg: None
-        // output: [10, 15) 5, [15, 20) 5
-        // state: [20, 25) 2
-        // op2 W (0, 21)
-        // agg: [10, 20) (2, 10)
-        // output: [10, 20) (2, 10)
-        // state: None
-        CheckNewAnswer((10, 2, 10)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0)),
-
-        AddData(inputData, 10 to 29: _*),
-        // op1 W (21, 21)
-        // agg: [10, 15) 5 - late, [15, 20) 5 - late, [20, 25) 5, [25, 30) 5
-        // output: None
-        // state: [20, 25) 7, [25, 30) 5
-        // op2 W (21, 21)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (21, 29)
-        // agg: None
-        // output: [20, 25) 7
-        // state: [25, 30) 5
-        // op2 W (21, 29)
-        // agg: [20, 30) (1, 7)
-        // output: None
-        // state: [20, 30) (1, 7)
-        CheckNewAnswer(),
-        assertNumStateRows(Seq(1, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 2)),
-
-        // Move the watermark.
-        AddData(inputData, 30, 31),
-        // op1 W (29, 29)
-        // agg: [30, 35) 2
-        // output: None
-        // state: [25, 30) 5 [30, 35) 2
-        // op2 W (29, 29)
-        // agg: None
-        // output: None
-        // state: [20, 30) (1, 7)
-
-        // no-data batch triggered
-
-        // op1 W (29, 31)
-        // agg: None
-        // output: [25, 30) 5
-        // state: [30, 35) 2
-        // op2 W (29, 31)
-        // agg: [20, 30) (2, 12)
-        // output: [20, 30) (2, 12)
-        // state: None
-        CheckNewAnswer((20, 2, 12)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val stream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .groupBy(window($"window", "10 seconds"))
+      .agg(count("*").as("count"), sum("count").as("sum"))
+      .select($"window".getField("start").cast("long").as[Long],
+        $"count".as[Long], $"sum".as[Long])
+
+    testStream(stream)(
+      AddData(inputData, 10 to 21: _*),
+      // op1 W (0, 0)
+      // agg: [10, 15) 5, [15, 20) 5, [20, 25) 2
+      // output: None
+      // state: [10, 15) 5, [15, 20) 5, [20, 25) 2
+      // op2 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 21)
+      // agg: None
+      // output: [10, 15) 5, [15, 20) 5
+      // state: [20, 25) 2
+      // op2 W (0, 21)
+      // agg: [10, 20) (2, 10)
+      // output: [10, 20) (2, 10)
+      // state: None
+      CheckNewAnswer((10, 2, 10)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0)),
+
+      AddData(inputData, 10 to 29: _*),
+      // op1 W (21, 21)
+      // agg: [10, 15) 5 - late, [15, 20) 5 - late, [20, 25) 5, [25, 30) 5
+      // output: None
+      // state: [20, 25) 7, [25, 30) 5
+      // op2 W (21, 21)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (21, 29)
+      // agg: None
+      // output: [20, 25) 7
+      // state: [25, 30) 5
+      // op2 W (21, 29)
+      // agg: [20, 30) (1, 7)
+      // output: None
+      // state: [20, 30) (1, 7)
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 2)),
+
+      // Move the watermark.
+      AddData(inputData, 30, 31),
+      // op1 W (29, 29)
+      // agg: [30, 35) 2
+      // output: None
+      // state: [25, 30) 5 [30, 35) 2
+      // op2 W (29, 29)
+      // agg: None
+      // output: None
+      // state: [20, 30) (1, 7)
+
+      // no-data batch triggered
+
+      // op1 W (29, 31)
+      // agg: None
+      // output: [25, 30) 5
+      // state: [30, 35) 2
+      // op2 W (29, 31)
+      // agg: [20, 30) (2, 12)
+      // output: [20, 30) (2, 12)
+      // state: None
+      CheckNewAnswer((20, 2, 12)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
   }
 
   test("agg -> agg -> agg, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val stream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .groupBy(window(window_time($"window"), "10 seconds"))
-        .agg(count("*").as("count"), sum("count").as("sum"))
-        .groupBy(window(window_time($"window"), "20 seconds"))
-        .agg(count("*").as("count"), sum("sum").as("sum"))
-        .select(
-          $"window".getField("start").cast("long").as[Long],
-          $"window".getField("end").cast("long").as[Long],
-          $"count".as[Long], $"sum".as[Long])
-
-      testStream(stream)(
-        AddData(inputData, 0 to 37: _*),
-        // op1 W (0, 0)
-        // agg: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5,
-        //   [35, 40) 3
-        // output: None
-        // state: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5,
-        //   [35, 40) 3
-        // op2 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-        // op3 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 37)
-        // agg: None
-        // output: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5
-        // state: [35, 40) 3
-        // op2 W (0, 37)
-        // agg: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10), [30, 40) (1, 5)
-        // output: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10)
-        // state: [30, 40) (1, 5)
-        // op3 W (0, 37)
-        // agg: [0, 20) (2, 20), [20, 40) (1, 10)
-        // output: [0, 20) (2, 20)
-        // state: [20, 40) (1, 10)
-        CheckNewAnswer((0, 20, 2, 20)),
-        assertNumStateRows(Seq(1, 1, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0, 0)),
-
-        AddData(inputData, 30 to 60: _*),
-        // op1 W (37, 37)
-        // dropped rows: [30, 35), 1 row <= note that 35, 36, 37 are still in effect
-        // agg: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 65) 1
-        // output: None
-        // state: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 65) 1
-        // op2 W (37, 37)
-        // output: None
-        // state: [30, 40) (1, 5)
-        // op3 W (37, 37)
-        // output: None
-        // state: [20, 40) (1, 10)
-
-        // no-data batch
-        // op1 W (37, 60)
-        // output: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5
-        // state: [60, 65) 1
-        // op2 W (37, 60)
-        // agg: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
-        // output: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
-        // state: None
-        // op3 W (37, 60)
-        // agg: [20, 40) (2, 23), [40, 60) (2, 20)
-        // output: [20, 40) (2, 23), [40, 60) (2, 20)
-        // state: None
-
-        CheckNewAnswer((20, 40, 2, 23), (40, 60, 2, 20)),
-        assertNumStateRows(Seq(0, 0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0, 1))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val stream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .groupBy(window(window_time($"window"), "10 seconds"))
+      .agg(count("*").as("count"), sum("count").as("sum"))
+      .groupBy(window(window_time($"window"), "20 seconds"))
+      .agg(count("*").as("count"), sum("sum").as("sum"))
+      .select(
+        $"window".getField("start").cast("long").as[Long],
+        $"window".getField("end").cast("long").as[Long],
+        $"count".as[Long], $"sum".as[Long])
+
+    testStream(stream)(
+      AddData(inputData, 0 to 37: _*),
+      // op1 W (0, 0)
+      // agg: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5,
+      //   [35, 40) 3
+      // output: None
+      // state: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5,
+      //   [35, 40) 3
+      // op2 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+      // op3 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 37)
+      // agg: None
+      // output: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5
+      // state: [35, 40) 3
+      // op2 W (0, 37)
+      // agg: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10), [30, 40) (1, 5)
+      // output: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10)
+      // state: [30, 40) (1, 5)
+      // op3 W (0, 37)
+      // agg: [0, 20) (2, 20), [20, 40) (1, 10)
+      // output: [0, 20) (2, 20)
+      // state: [20, 40) (1, 10)
+      CheckNewAnswer((0, 20, 2, 20)),
+      assertNumStateRows(Seq(1, 1, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0, 0)),
+
+      AddData(inputData, 30 to 60: _*),
+      // op1 W (37, 37)
+      // dropped rows: [30, 35), 1 row <= note that 35, 36, 37 are still in effect
+      // agg: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 65) 1
+      // output: None
+      // state: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 65) 1
+      // op2 W (37, 37)
+      // output: None
+      // state: [30, 40) (1, 5)
+      // op3 W (37, 37)
+      // output: None
+      // state: [20, 40) (1, 10)
+
+      // no-data batch
+      // op1 W (37, 60)
+      // output: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5
+      // state: [60, 65) 1
+      // op2 W (37, 60)
+      // agg: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
+      // output: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
+      // state: None
+      // op3 W (37, 60)
+      // agg: [20, 40) (2, 23), [40, 60) (2, 20)
+      // output: [20, 40) (2, 23), [40, 60) (2, 20)
+      // state: None
+
+      CheckNewAnswer((20, 40, 2, 23), (40, 60, 2, 20)),
+      assertNumStateRows(Seq(0, 0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0, 1))
+    )
   }
 
   test("stream deduplication -> aggregation, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val deduplication = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "10 seconds")
-        .dropDuplicates("value", "eventTime")
-
-      val windowedAggregation = deduplication
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"), sum("value").as("sum"))
-        .select($"window".getField("start").cast("long").as[Long],
-          $"count".as[Long])
-
-      testStream(windowedAggregation)(
-        AddData(inputData, 1 to 15: _*),
-        // op1 W (0, 0)
-        // input: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // deduplicated: None
-        // output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // state: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // op2 W (0, 0)
-        // agg: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
-        // output: None
-        // state: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
-
-        // no-data batch triggered
-
-        // op1 W (0, 5)
-        // agg: None
-        // output: None
-        // state: 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // op2 W (0, 5)
-        // agg: None
-        // output: [0, 5) 4
-        // state: [5, 10) 5 [10, 15) 5, [15, 20) 1
-        CheckNewAnswer((0, 4)),
-        assertNumStateRows(Seq(3, 10)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val deduplication = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "10 seconds")
+      .dropDuplicates("value", "eventTime")
+
+    val windowedAggregation = deduplication
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"), sum("value").as("sum"))
+      .select($"window".getField("start").cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(windowedAggregation)(
+      AddData(inputData, 1 to 15: _*),
+      // op1 W (0, 0)
+      // input: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // deduplicated: None
+      // output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // state: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // op2 W (0, 0)
+      // agg: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
+      // output: None
+      // state: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
+
+      // no-data batch triggered
+
+      // op1 W (0, 5)
+      // agg: None
+      // output: None
+      // state: 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // op2 W (0, 5)
+      // agg: None
+      // output: [0, 5) 4
+      // state: [5, 10) 5 [10, 15) 5, [15, 20) 1
+      CheckNewAnswer((0, 4)),
+      assertNumStateRows(Seq(3, 10)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
   }
 
   test("join -> window agg, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val input1 = MemoryStream[Int]
-      val inputDF1 = input1.toDF
-        .withColumnRenamed("value", "value1")
-        .withColumn("eventTime1", timestamp_seconds($"value1"))
-        .withWatermark("eventTime1", "0 seconds")
-
-      val input2 = MemoryStream[Int]
-      val inputDF2 = input2.toDF
-        .withColumnRenamed("value", "value2")
-        .withColumn("eventTime2", timestamp_seconds($"value2"))
-        .withWatermark("eventTime2", "0 seconds")
-
-      val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), "inner")
-        .groupBy(window($"eventTime1", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[Int]
+    val inputDF2 = input2.toDF
+      .withColumnRenamed("value", "value2")
+      .withColumn("eventTime2", timestamp_seconds($"value2"))
+      .withWatermark("eventTime2", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), "inner")
+      .groupBy(window($"eventTime1", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(stream)(
+      MultiAddData(input1, 1 to 4: _*)(input2, 1 to 4: _*),
+
+      // op1 W (0, 0)
+      // join output: (1, 1), (2, 2), (3, 3), (4, 4)
+      // state: (1, 1), (2, 2), (3, 3), (4, 4)
+      // op2 W (0, 0)
+      // agg: [0, 5) 4
+      // output: None
+      // state: [0, 5) 4
+
+      // no-data batch triggered
+
+      // op1 W (0, 4)
+      // join output: None
+      // state: None
+      // op2 W (0, 4)
+      // agg: None
+      // output: None
+      // state: [0, 5) 4
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0)),
+
+      // Move the watermark
+      MultiAddData(input1, 5)(input2, 5),
+
+      // op1 W (4, 4)
+      // join output: (5, 5)
+      // state: (5, 5)
+      // op2 W (4, 4)
+      // agg: [5, 10) 1
+      // output: None
+      // state: [0, 5) 4, [5, 10) 1
+
+      // no-data batch triggered
+
+      // op1 W (4, 5)
+      // join output: None
+      // state: None
+      // op2 W (4, 5)
+      // agg: None
+      // output: [0, 5) 4
+      // state: [5, 10) 1
+      CheckNewAnswer((0, 4)),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+  }
 
+  test("aggregation -> stream deduplication, append mode") {
+    val inputData = MemoryStream[Int]
+
+    val aggStream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .withColumn("windowEnd", expr("window.end"))
+
+    // dropDuplicates from aggStream without event time column for dropDuplicates - the
+    // state does not get trimmed due to watermark advancement.
+    val dedupNoEventTime = aggStream
+      .dropDuplicates("count", "windowEnd")
+      .select(
+        $"windowEnd".cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(dedupNoEventTime)(
+      AddData(inputData, 1, 5, 10, 15),
+
+      // op1 W (0, 0)
+      // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // output: None
+      // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // op2 W (0, 0)
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 15)
+      // agg: None
+      // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
+      // state: [15, 20) 1
+      // op2 W (0, 15)
+      // output: (5, 1), (10, 1), (15, 1)
+      // state: (5, 1), (10, 1), (15, 1)
+
+      CheckNewAnswer((5, 1), (10, 1), (15, 1)),
+      assertNumStateRows(Seq(3, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+
+    // Similar to the above but add event time. The dedup state will get trimmed.
+    val dedupWithEventTime = aggStream
+      .withColumn("windowTime", expr("window_time(window)"))
+      .withColumn("windowTimeMicros", expr("unix_micros(windowTime)"))
+      .dropDuplicates("count", "windowEnd", "windowTime")
+      .select(
+        $"windowEnd".cast("long").as[Long],
+        $"windowTimeMicros".cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(dedupWithEventTime)(
+      AddData(inputData, 1, 5, 10, 15),
+
+      // op1 W (0, 0)
+      // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // output: None
+      // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // op2 W (0, 0)
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 15)
+      // agg: None
+      // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
+      // state: [15, 20) 1
+      // op2 W (0, 15)
+      // output: (5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)
+      // state: None - trimmed by watermark
+
+      CheckNewAnswer((5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+  }
+
+  // TODO: This test should be modified or deleted after
+  // we support stream-stream join followed by aggregation
+  test("join on time interval -> window agg, append mode, should fail") {
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[(Int, Int)]
+    val inputDF2 = input2.toDS().toDF("start", "end")
+      .withColumn("eventTime2Start", timestamp_seconds($"start"))
+      .withColumn("eventTime2End", timestamp_seconds($"end"))
+      .withColumn("start2", timestamp_seconds($"start"))
+      .withWatermark("eventTime2Start", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2,
+      expr("eventTime1 >= eventTime2Start AND eventTime1 < eventTime2End " +
+        "AND eventTime1 = start2"), "inner")
+      .groupBy(window($"eventTime1", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    var excpetionThrown: Boolean = false
+    try {
       testStream(stream)(
-        MultiAddData(input1, 1 to 4: _*)(input2, 1 to 4: _*),
-
-        // op1 W (0, 0)
-        // join output: (1, 1), (2, 2), (3, 3), (4, 4)
-        // state: (1, 1), (2, 2), (3, 3), (4, 4)
-        // op2 W (0, 0)
-        // agg: [0, 5) 4
-        // output: None
-        // state: [0, 5) 4
-
-        // no-data batch triggered
-
-        // op1 W (0, 4)
-        // join output: None
-        // state: None
-        // op2 W (0, 4)
-        // agg: None
-        // output: None
-        // state: [0, 5) 4
-        CheckNewAnswer(),
-        assertNumStateRows(Seq(1, 0)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0)),
-
-        // Move the watermark
-        MultiAddData(input1, 5)(input2, 5),
-
-        // op1 W (4, 4)
-        // join output: (5, 5)
-        // state: (5, 5)
-        // op2 W (4, 4)
-        // agg: [5, 10) 1
-        // output: None
-        // state: [0, 5) 4, [5, 10) 1
-
-        // no-data batch triggered
-
-        // op1 W (4, 5)
-        // join output: None
-        // state: None
-        // op2 W (4, 5)
-        // agg: None
-        // output: [0, 5) 4
-        // state: [5, 10) 1
-        CheckNewAnswer((0, 4)),
-        assertNumStateRows(Seq(1, 0)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
+        StartStream()
       )
+    } catch {
+      case t: AnalysisException =>
+        assert(t.getMessage.contains("stream-stream interval join " +
+          "followed by any stateful operator is not supported yet"))
+        excpetionThrown = true
     }
+    assert(excpetionThrown)
   }
 
-  test("aggregation -> stream deduplication, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val aggStream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .withColumn("windowEnd", expr("window.end"))
-
-      // dropDuplicates from aggStream without event time column for dropDuplicates - the
-      // state does not get trimmed due to watermark advancement.
-      val dedupNoEventTime = aggStream
-        .dropDuplicates("count", "windowEnd")
-        .select(
-          $"windowEnd".cast("long").as[Long],
-          $"count".as[Long])
-
-      testStream(dedupNoEventTime)(
-        AddData(inputData, 1, 5, 10, 15),
-
-        // op1 W (0, 0)
-        // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
-        // output: None
-        // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
-        // op2 W (0, 0)
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 15)
-        // agg: None
-        // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
-        // state: [15, 20) 1
-        // op2 W (0, 15)
-        // output: (5, 1), (10, 1), (15, 1)
-        // state: (5, 1), (10, 1), (15, 1)
-
-        CheckNewAnswer((5, 1), (10, 1), (15, 1)),
-        assertNumStateRows(Seq(3, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
+  // TODO: This test should be modified or deleted after
+  // we support stream-stream join followed by aggregation
+  test("join with range join on non-time intervals -> window agg, append mode, shouldn't fail") {
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withColumn("v1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[(Int, Int)]
+    val inputDF2 = input2.toDS().toDF("start", "end")
+      .withColumn("eventTime2Start", timestamp_seconds($"start"))
+      .withColumn("start2", timestamp_seconds($"start"))
+      .withColumn("end2", timestamp_seconds($"end"))
+      .withWatermark("eventTime2Start", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2,
+      expr("v1 >= start2 AND v1 < end2 " +
+        "AND eventTime1 = start2"), "inner")
+      .groupBy(window($"eventTime1", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(stream)(
+      AddData(input1, 1, 2, 3, 4),
+      AddData(input2, (1, 2), (2, 3), (3, 4), (4, 5)),
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+  }
 
-      // Similar to the above but add event time. The dedup state will get trimmed.
-      val dedupWithEventTime = aggStream
-        .withColumn("windowTime", expr("window_time(window)"))
-        .withColumn("windowTimeMicros", expr("unix_micros(windowTime)"))
-        .dropDuplicates("count", "windowEnd", "windowTime")
-        .select(
-          $"windowEnd".cast("long").as[Long],
-          $"windowTimeMicros".cast("long").as[Long],
-          $"count".as[Long])
-
-      testStream(dedupWithEventTime)(
-        AddData(inputData, 1, 5, 10, 15),
-
-        // op1 W (0, 0)
-        // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
-        // output: None
-        // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
-        // op2 W (0, 0)
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 15)
-        // agg: None
-        // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
-        // state: [15, 20) 1
-        // op2 W (0, 15)
-        // output: (5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)
-        // state: None - trimmed by watermark
-
-        CheckNewAnswer((5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+  // TODO: This test should be modified or deleted after

Review Comment:
   This is just obvious and should have covered in stream-stream join suite. We can remove the test case.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##########
@@ -40,378 +40,465 @@ class MultiStatefulOperatorsSuite
   }
 
   test("window agg -> window agg, append mode") {
-    // TODO: SPARK-40940 - Fix the unsupported ops checker to allow chaining of stateful ops.
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val stream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .groupBy(window($"window", "10 seconds"))
-        .agg(count("*").as("count"), sum("count").as("sum"))
-        .select($"window".getField("start").cast("long").as[Long],
-          $"count".as[Long], $"sum".as[Long])
-
-      testStream(stream)(
-        AddData(inputData, 10 to 21: _*),
-        // op1 W (0, 0)
-        // agg: [10, 15) 5, [15, 20) 5, [20, 25) 2
-        // output: None
-        // state: [10, 15) 5, [15, 20) 5, [20, 25) 2
-        // op2 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 21)
-        // agg: None
-        // output: [10, 15) 5, [15, 20) 5
-        // state: [20, 25) 2
-        // op2 W (0, 21)
-        // agg: [10, 20) (2, 10)
-        // output: [10, 20) (2, 10)
-        // state: None
-        CheckNewAnswer((10, 2, 10)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0)),
-
-        AddData(inputData, 10 to 29: _*),
-        // op1 W (21, 21)
-        // agg: [10, 15) 5 - late, [15, 20) 5 - late, [20, 25) 5, [25, 30) 5
-        // output: None
-        // state: [20, 25) 7, [25, 30) 5
-        // op2 W (21, 21)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (21, 29)
-        // agg: None
-        // output: [20, 25) 7
-        // state: [25, 30) 5
-        // op2 W (21, 29)
-        // agg: [20, 30) (1, 7)
-        // output: None
-        // state: [20, 30) (1, 7)
-        CheckNewAnswer(),
-        assertNumStateRows(Seq(1, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 2)),
-
-        // Move the watermark.
-        AddData(inputData, 30, 31),
-        // op1 W (29, 29)
-        // agg: [30, 35) 2
-        // output: None
-        // state: [25, 30) 5 [30, 35) 2
-        // op2 W (29, 29)
-        // agg: None
-        // output: None
-        // state: [20, 30) (1, 7)
-
-        // no-data batch triggered
-
-        // op1 W (29, 31)
-        // agg: None
-        // output: [25, 30) 5
-        // state: [30, 35) 2
-        // op2 W (29, 31)
-        // agg: [20, 30) (2, 12)
-        // output: [20, 30) (2, 12)
-        // state: None
-        CheckNewAnswer((20, 2, 12)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val stream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .groupBy(window($"window", "10 seconds"))
+      .agg(count("*").as("count"), sum("count").as("sum"))
+      .select($"window".getField("start").cast("long").as[Long],
+        $"count".as[Long], $"sum".as[Long])
+
+    testStream(stream)(
+      AddData(inputData, 10 to 21: _*),
+      // op1 W (0, 0)
+      // agg: [10, 15) 5, [15, 20) 5, [20, 25) 2
+      // output: None
+      // state: [10, 15) 5, [15, 20) 5, [20, 25) 2
+      // op2 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 21)
+      // agg: None
+      // output: [10, 15) 5, [15, 20) 5
+      // state: [20, 25) 2
+      // op2 W (0, 21)
+      // agg: [10, 20) (2, 10)
+      // output: [10, 20) (2, 10)
+      // state: None
+      CheckNewAnswer((10, 2, 10)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0)),
+
+      AddData(inputData, 10 to 29: _*),
+      // op1 W (21, 21)
+      // agg: [10, 15) 5 - late, [15, 20) 5 - late, [20, 25) 5, [25, 30) 5
+      // output: None
+      // state: [20, 25) 7, [25, 30) 5
+      // op2 W (21, 21)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (21, 29)
+      // agg: None
+      // output: [20, 25) 7
+      // state: [25, 30) 5
+      // op2 W (21, 29)
+      // agg: [20, 30) (1, 7)
+      // output: None
+      // state: [20, 30) (1, 7)
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 2)),
+
+      // Move the watermark.
+      AddData(inputData, 30, 31),
+      // op1 W (29, 29)
+      // agg: [30, 35) 2
+      // output: None
+      // state: [25, 30) 5 [30, 35) 2
+      // op2 W (29, 29)
+      // agg: None
+      // output: None
+      // state: [20, 30) (1, 7)
+
+      // no-data batch triggered
+
+      // op1 W (29, 31)
+      // agg: None
+      // output: [25, 30) 5
+      // state: [30, 35) 2
+      // op2 W (29, 31)
+      // agg: [20, 30) (2, 12)
+      // output: [20, 30) (2, 12)
+      // state: None
+      CheckNewAnswer((20, 2, 12)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
   }
 
   test("agg -> agg -> agg, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val stream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .groupBy(window(window_time($"window"), "10 seconds"))
-        .agg(count("*").as("count"), sum("count").as("sum"))
-        .groupBy(window(window_time($"window"), "20 seconds"))
-        .agg(count("*").as("count"), sum("sum").as("sum"))
-        .select(
-          $"window".getField("start").cast("long").as[Long],
-          $"window".getField("end").cast("long").as[Long],
-          $"count".as[Long], $"sum".as[Long])
-
-      testStream(stream)(
-        AddData(inputData, 0 to 37: _*),
-        // op1 W (0, 0)
-        // agg: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5,
-        //   [35, 40) 3
-        // output: None
-        // state: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5,
-        //   [35, 40) 3
-        // op2 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-        // op3 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 37)
-        // agg: None
-        // output: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5
-        // state: [35, 40) 3
-        // op2 W (0, 37)
-        // agg: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10), [30, 40) (1, 5)
-        // output: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10)
-        // state: [30, 40) (1, 5)
-        // op3 W (0, 37)
-        // agg: [0, 20) (2, 20), [20, 40) (1, 10)
-        // output: [0, 20) (2, 20)
-        // state: [20, 40) (1, 10)
-        CheckNewAnswer((0, 20, 2, 20)),
-        assertNumStateRows(Seq(1, 1, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0, 0)),
-
-        AddData(inputData, 30 to 60: _*),
-        // op1 W (37, 37)
-        // dropped rows: [30, 35), 1 row <= note that 35, 36, 37 are still in effect
-        // agg: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 65) 1
-        // output: None
-        // state: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 65) 1
-        // op2 W (37, 37)
-        // output: None
-        // state: [30, 40) (1, 5)
-        // op3 W (37, 37)
-        // output: None
-        // state: [20, 40) (1, 10)
-
-        // no-data batch
-        // op1 W (37, 60)
-        // output: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5
-        // state: [60, 65) 1
-        // op2 W (37, 60)
-        // agg: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
-        // output: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
-        // state: None
-        // op3 W (37, 60)
-        // agg: [20, 40) (2, 23), [40, 60) (2, 20)
-        // output: [20, 40) (2, 23), [40, 60) (2, 20)
-        // state: None
-
-        CheckNewAnswer((20, 40, 2, 23), (40, 60, 2, 20)),
-        assertNumStateRows(Seq(0, 0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0, 1))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val stream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .groupBy(window(window_time($"window"), "10 seconds"))
+      .agg(count("*").as("count"), sum("count").as("sum"))
+      .groupBy(window(window_time($"window"), "20 seconds"))
+      .agg(count("*").as("count"), sum("sum").as("sum"))
+      .select(
+        $"window".getField("start").cast("long").as[Long],
+        $"window".getField("end").cast("long").as[Long],
+        $"count".as[Long], $"sum".as[Long])
+
+    testStream(stream)(
+      AddData(inputData, 0 to 37: _*),
+      // op1 W (0, 0)
+      // agg: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5,
+      //   [35, 40) 3
+      // output: None
+      // state: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5,
+      //   [35, 40) 3
+      // op2 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+      // op3 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 37)
+      // agg: None
+      // output: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5
+      // state: [35, 40) 3
+      // op2 W (0, 37)
+      // agg: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10), [30, 40) (1, 5)
+      // output: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10)
+      // state: [30, 40) (1, 5)
+      // op3 W (0, 37)
+      // agg: [0, 20) (2, 20), [20, 40) (1, 10)
+      // output: [0, 20) (2, 20)
+      // state: [20, 40) (1, 10)
+      CheckNewAnswer((0, 20, 2, 20)),
+      assertNumStateRows(Seq(1, 1, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0, 0)),
+
+      AddData(inputData, 30 to 60: _*),
+      // op1 W (37, 37)
+      // dropped rows: [30, 35), 1 row <= note that 35, 36, 37 are still in effect
+      // agg: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 65) 1
+      // output: None
+      // state: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 65) 1
+      // op2 W (37, 37)
+      // output: None
+      // state: [30, 40) (1, 5)
+      // op3 W (37, 37)
+      // output: None
+      // state: [20, 40) (1, 10)
+
+      // no-data batch
+      // op1 W (37, 60)
+      // output: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5
+      // state: [60, 65) 1
+      // op2 W (37, 60)
+      // agg: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
+      // output: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
+      // state: None
+      // op3 W (37, 60)
+      // agg: [20, 40) (2, 23), [40, 60) (2, 20)
+      // output: [20, 40) (2, 23), [40, 60) (2, 20)
+      // state: None
+
+      CheckNewAnswer((20, 40, 2, 23), (40, 60, 2, 20)),
+      assertNumStateRows(Seq(0, 0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0, 1))
+    )
   }
 
   test("stream deduplication -> aggregation, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val deduplication = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "10 seconds")
-        .dropDuplicates("value", "eventTime")
-
-      val windowedAggregation = deduplication
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"), sum("value").as("sum"))
-        .select($"window".getField("start").cast("long").as[Long],
-          $"count".as[Long])
-
-      testStream(windowedAggregation)(
-        AddData(inputData, 1 to 15: _*),
-        // op1 W (0, 0)
-        // input: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // deduplicated: None
-        // output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // state: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // op2 W (0, 0)
-        // agg: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
-        // output: None
-        // state: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
-
-        // no-data batch triggered
-
-        // op1 W (0, 5)
-        // agg: None
-        // output: None
-        // state: 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // op2 W (0, 5)
-        // agg: None
-        // output: [0, 5) 4
-        // state: [5, 10) 5 [10, 15) 5, [15, 20) 1
-        CheckNewAnswer((0, 4)),
-        assertNumStateRows(Seq(3, 10)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val deduplication = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "10 seconds")
+      .dropDuplicates("value", "eventTime")
+
+    val windowedAggregation = deduplication
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"), sum("value").as("sum"))
+      .select($"window".getField("start").cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(windowedAggregation)(
+      AddData(inputData, 1 to 15: _*),
+      // op1 W (0, 0)
+      // input: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // deduplicated: None
+      // output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // state: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // op2 W (0, 0)
+      // agg: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
+      // output: None
+      // state: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
+
+      // no-data batch triggered
+
+      // op1 W (0, 5)
+      // agg: None
+      // output: None
+      // state: 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // op2 W (0, 5)
+      // agg: None
+      // output: [0, 5) 4
+      // state: [5, 10) 5 [10, 15) 5, [15, 20) 1
+      CheckNewAnswer((0, 4)),
+      assertNumStateRows(Seq(3, 10)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
   }
 
   test("join -> window agg, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val input1 = MemoryStream[Int]
-      val inputDF1 = input1.toDF
-        .withColumnRenamed("value", "value1")
-        .withColumn("eventTime1", timestamp_seconds($"value1"))
-        .withWatermark("eventTime1", "0 seconds")
-
-      val input2 = MemoryStream[Int]
-      val inputDF2 = input2.toDF
-        .withColumnRenamed("value", "value2")
-        .withColumn("eventTime2", timestamp_seconds($"value2"))
-        .withWatermark("eventTime2", "0 seconds")
-
-      val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), "inner")
-        .groupBy(window($"eventTime1", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[Int]
+    val inputDF2 = input2.toDF
+      .withColumnRenamed("value", "value2")
+      .withColumn("eventTime2", timestamp_seconds($"value2"))
+      .withWatermark("eventTime2", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), "inner")
+      .groupBy(window($"eventTime1", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(stream)(
+      MultiAddData(input1, 1 to 4: _*)(input2, 1 to 4: _*),
+
+      // op1 W (0, 0)
+      // join output: (1, 1), (2, 2), (3, 3), (4, 4)
+      // state: (1, 1), (2, 2), (3, 3), (4, 4)
+      // op2 W (0, 0)
+      // agg: [0, 5) 4
+      // output: None
+      // state: [0, 5) 4
+
+      // no-data batch triggered
+
+      // op1 W (0, 4)
+      // join output: None
+      // state: None
+      // op2 W (0, 4)
+      // agg: None
+      // output: None
+      // state: [0, 5) 4
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0)),
+
+      // Move the watermark
+      MultiAddData(input1, 5)(input2, 5),
+
+      // op1 W (4, 4)
+      // join output: (5, 5)
+      // state: (5, 5)
+      // op2 W (4, 4)
+      // agg: [5, 10) 1
+      // output: None
+      // state: [0, 5) 4, [5, 10) 1
+
+      // no-data batch triggered
+
+      // op1 W (4, 5)
+      // join output: None
+      // state: None
+      // op2 W (4, 5)
+      // agg: None
+      // output: [0, 5) 4
+      // state: [5, 10) 1
+      CheckNewAnswer((0, 4)),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+  }
 
+  test("aggregation -> stream deduplication, append mode") {
+    val inputData = MemoryStream[Int]
+
+    val aggStream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .withColumn("windowEnd", expr("window.end"))
+
+    // dropDuplicates from aggStream without event time column for dropDuplicates - the
+    // state does not get trimmed due to watermark advancement.
+    val dedupNoEventTime = aggStream
+      .dropDuplicates("count", "windowEnd")
+      .select(
+        $"windowEnd".cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(dedupNoEventTime)(
+      AddData(inputData, 1, 5, 10, 15),
+
+      // op1 W (0, 0)
+      // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // output: None
+      // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // op2 W (0, 0)
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 15)
+      // agg: None
+      // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
+      // state: [15, 20) 1
+      // op2 W (0, 15)
+      // output: (5, 1), (10, 1), (15, 1)
+      // state: (5, 1), (10, 1), (15, 1)
+
+      CheckNewAnswer((5, 1), (10, 1), (15, 1)),
+      assertNumStateRows(Seq(3, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+
+    // Similar to the above but add event time. The dedup state will get trimmed.
+    val dedupWithEventTime = aggStream
+      .withColumn("windowTime", expr("window_time(window)"))
+      .withColumn("windowTimeMicros", expr("unix_micros(windowTime)"))
+      .dropDuplicates("count", "windowEnd", "windowTime")
+      .select(
+        $"windowEnd".cast("long").as[Long],
+        $"windowTimeMicros".cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(dedupWithEventTime)(
+      AddData(inputData, 1, 5, 10, 15),
+
+      // op1 W (0, 0)
+      // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // output: None
+      // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // op2 W (0, 0)
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 15)
+      // agg: None
+      // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
+      // state: [15, 20) 1
+      // op2 W (0, 15)
+      // output: (5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)
+      // state: None - trimmed by watermark
+
+      CheckNewAnswer((5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+  }
+
+  // TODO: This test should be modified or deleted after
+  // we support stream-stream join followed by aggregation
+  test("join on time interval -> window agg, append mode, should fail") {
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[(Int, Int)]
+    val inputDF2 = input2.toDS().toDF("start", "end")
+      .withColumn("eventTime2Start", timestamp_seconds($"start"))
+      .withColumn("eventTime2End", timestamp_seconds($"end"))
+      .withColumn("start2", timestamp_seconds($"start"))
+      .withWatermark("eventTime2Start", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2,
+      expr("eventTime1 >= eventTime2Start AND eventTime1 < eventTime2End " +
+        "AND eventTime1 = start2"), "inner")
+      .groupBy(window($"eventTime1", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    var excpetionThrown: Boolean = false

Review Comment:
   Shall we try `intercept` instead?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The behavior is
    * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {

Review Comment:
   nit: could be combined with above line now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL closed pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL closed pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
URL: https://github.com/apache/spark/pull/38503


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   We only care about parents (down stream ops) of each operator.
   
   - Deduplication: Only counted as a streaming stateful operator when it has event time column.
   
   - In Complete, Update mode, Aggregations followed by any stateful operators are disallowed
     - Note that Dedup w/o event time is not counted here.
    
   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
     - If `flatMapGroupsWithState` is configured with processing time, don't need to check.
     - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. 
       - Note that Dedup w/o event time is not counted here.
     - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. 
     - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior]
   
   - stream-stream join: 
     - only allowed in append mode, inner join with equality.
     - Outer join with equality and time-interval join are disallowed.
     - Append mode: time interval join followed by any stateful ops: disallowed; equality inner & outer join followed by any stateful op: supported
    - Can't do stream-stream join on other two modes.
   
   
     - Currently: `MapGroupsWithState` with aggregation is disallowed 
     - Currently: `MapGroupsWithState` only allowed in Update mode
     - 
   [Q] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? 
   [A] There may be some cases that key space is bounded. Also why Complete mode makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021196487


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   If we sort this out properly, then we probably could come up with a couple of (or several) golden rules from here e.g. no stateful operator is supported after stateful operator A in some output mode(s). The list of rules has to still be a hand-made one though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   
   - In Complete, Update mode, Aggregations followed by any stateful op are disallowed
   - Dedup: don't count, has no effect no matter what stateful op and output mode is.
   - stream-stream join: 
     - only allowed in append mode, inner join with equality.
     - Outer join with equality and time-interval join are disallowed.
     - [?] Other than that, don't need to check its compatibility with other stateful ops.
   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
     - Currently: `MapGroupsWithState` with aggregation is disallowed 
     - Currently: `MapGroupsWithState` only allowed in Update mode
     - [?] After this PR: `MapGroupsWithState` what?
     - Currently: `flatMapGroupsWithState`'s output mode must match query output mode if no aggs -> [keep this behavior] 
     - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior]
     - Currently: agg followed by `flatMapGroupsWithState` in Append mode is disallowed -> [change this behavior]
     - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. 
     - After this PR: `flatMapGroupsWithState` followed by any stateful operator is disallowed.
   
   [?] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   
   - Deduplication: Only counted as a streaming stateful operator when it has event time column.
   
   - In Complete, Update mode, Aggregations followed by any stateful operators are disallowed
     - Note that Dedup w/o event time is not counted here.
    
   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
     - If `flatMapGroupsWithState` is configured with processing time, don't need to check.
     - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. 
       - Note that Dedup w/o event time is not counted here.
     - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. 
     - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior]
   
   - stream-stream join: 
     - Append mode: time interval join followed by any stateful ops: disallowed; 
     - Append mode: equality inner & outer join followed by any stateful op: supported
     - Currently: Only allowed in append mode, inner join with equality -> [keep this behavior]
     - Currently: Outer join with equality and time-interval join are disallowed -> [keep this behavior]
   
   TODO: Map GroupsWithState
     - Currently: `MapGroupsWithState` with aggregation is disallowed 
     - Currently: `MapGroupsWithState` only allowed in Update mode
   
   Eventually, the above boils down to the simple 3 golden rules:
   1. `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. 
   2. Stream-stream time interval join followed by any stateful operator is disallowed. Note that this is only allowed in Append mode.
   3. Aggregation followed by any stateful operators is disallowed in Complete and Update mode.
   
   [Q] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? 
   [A] There may be some cases that key space is bounded. Also why Complete mode makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   We only care about parents (down stream ops) of each operator.
   
   - In Complete, Update mode, Aggregations followed by any stateful op are disallowed
   - Dedup: should count above.
   - stream-stream join: 
     - only allowed in append mode, inner join with equality.
     - Outer join with equality and time-interval join are disallowed.
     - Append mode: time interval join followed by any stateful ops: disallowed; equality inner & outer join followed by any stateful op: supported
    - Can't do stream-stream join on other two modes.
   
   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
     - Currently: `MapGroupsWithState` with aggregation is disallowed 
     - Currently: `MapGroupsWithState` only allowed in Update mode
     - After this PR: `MapGroupsWithState`: disallow after follow any stateful operator.
     - [?] After this PR: `MapGroupsWithState` what?
     - Currently: `flatMapGroupsWithState`'s output mode must match query output mode if no aggs -> [keep this behavior] 
     - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior]
     - Currently: agg followed by `flatMapGroupsWithState` in Append mode is disallowed -> [change this behavior]
     - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. 
     - After this PR: `flatMapGroupsWithState` followed by any stateful operator is disallowed.
     - But `flatMapGroupsWithState` followed by Dedup is allowed. Actually as long as downstream stateful op doesn't require event time col.
     - If `flatMapGroupsWithState` is configured with processing time, don't need to check.
   
   [?] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? => There may be some cases that key space is bounded. Also why Complete mode makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1023563189


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -42,40 +43,101 @@ object UnsupportedOperationChecker extends Logging {
   }
 
   /**
-   * Checks for possible correctness issue in chained stateful operators. The behavior is
-   * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
-   * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
-   * print a warning message.
+   * Checks if the expression has a event time column
+   * @param exp the expression to be checked
+   * @return true if it is a event time column.
    */
-  def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
-    def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
-      case Join(left, right, joinType, _, _)
-        if left.isStreaming && right.isStreaming && joinType != Inner => true
-      case f: FlatMapGroupsWithState
-        if f.isStreaming && f.outputMode == OutputMode.Append() => true
-      case _ => false
+  private def hasEventTimeCol(exp: Expression): Boolean = exp.exists {
+    case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+    case _ => false
+  }
+
+  /**
+   * Checks if the expression contains a range comparison, in which
+   * either side of the comparison is an event-time column. This is used for checking
+   * stream-stream time interval join.
+   * @param e the expression to be checked
+   * @return true if there is a time-interval join.
+   */
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = {
+    def hasEventTimeColBinaryComp(neq: Expression): Boolean = {
+      val exp = neq.asInstanceOf[BinaryComparison]
+      hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
     }
 
-    def isStatefulOperation(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate if s.isStreaming => true
-      case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => true
-      case f: FlatMapGroupsWithState if f.isStreaming => true
-      case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
-      case d: Deduplicate if d.isStreaming => true
+    e.exists {
+      case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+        hasEventTimeColBinaryComp(neq)
       case _ => false
     }
+  }
 
-    val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled
+  /**
+   * This method, combined with isStatefulOperationPossiblyEmitLateRows, determines all disallowed
+   * behaviors in multiple stateful operators.
+   * Concretely, All conditions defined below cannot be followed by any streaming stateful
+   * operator as defined in isStatefulOperationPossiblyEmitLateRows.
+   * @param p logical plan to be checked
+   * @param outputMode query output mode
+   * @return true if it is not allowed when followed by any streaming stateful
+   * operator as defined in isStatefulOperationPossiblyEmitLateRows.
+   */
+  private def ifCannotBeFollowedByStatefulOperation(
+      p: LogicalPlan, outputMode: OutputMode): Boolean = p match {
+    case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+      left.isStreaming && right.isStreaming &&
+        otherCondition.isDefined && hasRangeExprAgainstEventTimeCol(otherCondition.get)
+    // FlatMapGroupsWithState configured with event time
+    case f @ FlatMapGroupsWithState(_, _, _, _, _, _, _, _, _, timeout, _, _, _, _, _, _)
+      if f.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+    case p @ FlatMapGroupsInPandasWithState(_, _, _, _, _, timeout, _)
+      if p.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+    case a: Aggregate if a.isStreaming && outputMode != InternalOutputModes.Append => true
+    // Since the Distinct node will be replaced to Aggregate in the optimizer rule
+    // [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by
+    // assuming it as Aggregate.
+    case d @ Distinct(_: LogicalPlan) if d.isStreaming
+      && outputMode != InternalOutputModes.Append => true
+    case _ => false
+  }
 
+  /**
+   * This method is only used with ifCannotBeFollowedByStatefulOperation.
+   * As can tell from the name, it doesn't contain ALL streaming stateful operations,
+   * only the stateful operations that are possible to emit late rows.
+   * for example, a Deduplicate without a event time column is still a stateful operation
+   * but of less interested because it won't emit late records because of watermark.
+   * @param p the logical plan to be checked
+   * @return true if there is a streaming stateful operation
+   */
+  private def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
+    case s: Aggregate if s.isStreaming => true
+    // Since the Distinct node will be replaced to Aggregate in the optimizer rule
+    // [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by
+    // assuming it as Aggregate.
+    case d @ Distinct(_: LogicalPlan) if d.isStreaming => true
+    case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => true
+    case f: FlatMapGroupsWithState if f.isStreaming => true
+    case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
+    // Deduplicate also works without event time column even in streaming,
+    // in such cases, although Dedup is still a stateful operation in a streaming
+    // query, it could be ignored in all checks below, so let it return false.
+    case d: Deduplicate if d.isStreaming && d.keys.exists(hasEventTimeCol) => true
+    case _ => false
+  }
+  /**
+   * Checks for possible correctness issue in chained stateful operators. The behavior is
+   * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
+   * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
+   * print a warning message.
+   */
+  def checkStreamingQueryGlobalWatermarkLimit(plan: LogicalPlan, outputMode: OutputMode): Unit = {
+    val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled
     try {
       plan.foreach { subPlan =>
-        if (isStatefulOperation(subPlan)) {
+        if (isStatefulOperationPossiblyEmitLateRows(subPlan)) {

Review Comment:
   First of all, the method name is not correct. For example, deduplicate never produces delayed rows.
   
   Say, if the upstream operator (descendant node for the tree structure) is bound to the case of "cannot be followed by stateful operation". Then the downstream operator just needs to be "stateful operation" to be disallowed. Deduplication without event time column is just an exception. 
   
   Could we rename the method to "isStatefulOperation", and also simplify the method doc (we list up stateful operators but have an exception for deduplication in some condition)? Adding exceptional case to the method name doesn't seem to be easy.
   
   (We could be much stricter on semantic and classify several kinds/types from "cannot be followed by stateful operation" as there are multiple reasons the operator cannot be followed by other stateful operator, but I don't think we want to be super exhaustive.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1017435393


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -188,17 +194,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
     expectedMsgs = Seq("Complete"))
 
   // FlatMapGroupsWithState(Update) in streaming with aggregation
-  for (outputMode <- Seq(Append, Update, Complete)) {
+  for (outputMode <- Seq(Update, Complete)) {
     assertNotSupportedInStreamingPlan(
       "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation " +
         s"with aggregation in $outputMode mode",
       TestFlatMapGroupsWithState(
         null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,
         Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
       outputMode = outputMode,
-      expectedMsgs = Seq("flatMapGroupsWithState in update mode", "with aggregation"))
+      expectedMsgs = Seq("Multiple stateful operators", "Update", "Complete"))
   }
 
+  assertNotSupportedInStreamingPlan(
+    "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation " +
+      s"with aggregation in Append mode",
+    TestFlatMapGroupsWithState(
+      null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,

Review Comment:
   Sorry I didn't really get it and I lack the context here. Do you mean flatMapGroupWithState(Update) with aggregation and output mode Append doesn't make sense? I think before there is a test on each output mode: https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala#L191
   
   The reason I separate (Update, Complete) and Append is with the change, (Update, Complete) will be captured before the original check, so their error message is changed. But in Append mode it is still blocked by the original error message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1016172080


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,34 +42,58 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  private def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  private def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  /**
+   * Check if the given logical plan is a streaming stateful operations.
+   * @param p: The logical plan to be checked.
+   */
+  def isStatefulOperation(p: LogicalPlan): Boolean = {
+    p match {
+      case s: Aggregate if s.isStreaming => true
+      // Since the Distinct node will be replaced to Aggregate in the optimizer rule
+      // [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by
+      // assuming it as Aggregate.
+      case d @ Distinct(_: LogicalPlan) if d.isStreaming => true

Review Comment:
   This is borrowed from line 136 of `collectStreamingAggregates`. Note that in the original `isStatefulOperation`(was a function inside `checkStreamingQueryGlobalWatermarkLimit`) we do not check this. lmk if this is needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021172211


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala:
##########
@@ -240,25 +240,30 @@ class FlatMapGroupsInPandasWithStateSuite extends StateStoreMetricsTest {
         .groupBy("key")
         .count()
 
-    testStream(result, Complete)(
-      AddData(inputData, "a"),
-      CheckNewAnswer(("a", 1)),
-      AddData(inputData, "a", "b"),
-      // mapGroups generates ("a", "2"), ("b", "1"); so increases counts of a and b by 1
-      CheckNewAnswer(("a", 2), ("b", 1)),
-      StopStream,
-      StartStream(),
-      AddData(inputData, "a", "b"),
-      // mapGroups should remove state for "a" and generate ("a", "-1"), ("b", "2") ;
-      // so increment a and b by 1
-      CheckNewAnswer(("a", 3), ("b", 2)),
-      StopStream,
-      StartStream(),
-      AddData(inputData, "a", "c"),
-      // mapGroups should recreate state for "a" and generate ("a", "1"), ("c", "1") ;
-      // so increment a and c by 1
-      CheckNewAnswer(("a", 4), ("b", 2), ("c", 1))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default
+    val exp = intercept[AnalysisException] {

Review Comment:
   (DISCLAIMER: I don't like complete mode which complicates things.)
   
   See aggregation closely - it does not leverage event time, hence it doesn't trigger technical limit of the output of flatMapGroupsWithState.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   So every stateful operators have its own characteristics...
   
   1. streaming aggregation
   
   - append mode requires event time column to be set in grouping key. It will emit records which watermark passed by the value of event time column. (delaying the output records in above comment)
   - For update mode and complete mode, having event time column in grouping key is optional. It only helps to evict the state. (In complete mode this even doesn't happen.) It won't delay the output records.
   - For update mode and complete mode, downstream operator(s) followed by streaming aggregation must handle the outputs streaming aggregation produces semantically properly. E.g. update mode will produce outputs multiple times for the same aggregated output, say, correction. complete mode will produce all the historical aggregated outputs.
   - I can't imagine the valid case for update mode and complete mode of streaming aggregation to be followed by another stateful operator.
   
   2. deduplication
   
   - The behavior is same among all output modes. (More clearly, it "ignores" the output mode.)
   - It won't delay the output records.
   - It produces the same output only once.
   - This does not require event time column to be set. It's only used for eviction of the state rows.
   
   So, deduplication operator itself does not have any compatibility limitation. If the combination of stateful operator A and deduplication operator should be blocked, it is due to the operator A.
   
   3. stream-stream join
   
   - I cannot reason about the proper behavior for update and complete mode.
   - It only accepts append mode now, so good to retain it.
   - For the equality join, inner join won't delay the outputs whereas outer join is still able to delay the outputs.
   - For the time interval join, it can delay the output records in both inner and outer. 
   
   4. flatMapGroupsWithState
   
   - It doesn't support complete mode at all.
   - It supports append mode and update mode, although it's purely relying on user function to do the right thing for the output mode, which I believe most of regular end users wouldn't do.
   - It loses the event time column on output of the operator.
   - It supports processing time semantic, which IMHO we should never allow this to be used with event time semantic in other stateful operators.
   
   That said, maybe we still need a hand-made enumeration of allow/block list for update/complete mode...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   We only care about parents (down stream ops) of each operator.
   
   - In Complete, Update mode, Aggregations followed by any stateful op are disallowed
   - Dedup: should count above.
   - stream-stream join: 
     - only allowed in append mode, inner join with equality.
     - Outer join with equality and time-interval join are disallowed.
     - Append mode: time interval join followed by any stateful ops: disallowed; equality inner & outer join followed by any stateful op: supported
    - Can't do stream-stream join on other two modes.
   
   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
     - Currently: `MapGroupsWithState` with aggregation is disallowed 
     - Currently: `MapGroupsWithState` only allowed in Update mode
     - [?] After this PR: `MapGroupsWithState` what?
     - Currently: `flatMapGroupsWithState`'s output mode must match query output mode if no aggs -> [keep this behavior] 
     - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior]
     - Currently: agg followed by `flatMapGroupsWithState` in Append mode is disallowed -> [change this behavior]
     - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. 
     - After this PR: `flatMapGroupsWithState` followed by any stateful operator is disallowed.
     - But `flatMapGroupsWithState` followed by Dedup is allowed. Actually as long as downstream stateful op doesn't require event time col.
     - If `flatMapGroupsWithState` is configured with processing time, don't need to check.
   
   [?] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? => There may be some cases that key space is bounded. Also why Complete mode makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
URL: https://github.com/apache/spark/pull/38503


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021175378


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala:
##########
@@ -240,25 +240,30 @@ class FlatMapGroupsInPandasWithStateSuite extends StateStoreMetricsTest {
         .groupBy("key")
         .count()
 
-    testStream(result, Complete)(
-      AddData(inputData, "a"),
-      CheckNewAnswer(("a", 1)),
-      AddData(inputData, "a", "b"),
-      // mapGroups generates ("a", "2"), ("b", "1"); so increases counts of a and b by 1
-      CheckNewAnswer(("a", 2), ("b", 1)),
-      StopStream,
-      StartStream(),
-      AddData(inputData, "a", "b"),
-      // mapGroups should remove state for "a" and generate ("a", "-1"), ("b", "2") ;
-      // so increment a and b by 1
-      CheckNewAnswer(("a", 3), ("b", 2)),
-      StopStream,
-      StartStream(),
-      AddData(inputData, "a", "c"),
-      // mapGroups should recreate state for "a" and generate ("a", "1"), ("c", "1") ;
-      // so increment a and c by 1
-      CheckNewAnswer(("a", 4), ("b", 2), ("c", 1))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default
+    val exp = intercept[AnalysisException] {

Review Comment:
   In update and complete mode, streaming aggregation does not require event time column to be a part of grouping key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   So every stateful operators have its own characteristics...
   
   1. streaming aggregation
   
   - append mode requires event time column to be set in grouping key. It will emit records which watermark passed by the value of event time column. (delaying the output records in above comment)
   - For update mode and complete mode, having event time column in grouping key is optional. It only helps to evict the state. (In complete mode this even doesn't happen.) It won't delay the output records.
   - For update mode and complete mode, downstream operator(s) followed by streaming aggregation must handle the outputs streaming aggregation produces semantically properly. E.g. update mode will produce outputs multiple times for the same aggregated output, say, correction. complete mode will produce all the historical aggregated outputs.
   - I can't imagine the valid case for update mode and complete mode of streaming aggregation to be followed by another stateful operator.
   
   2. deduplication
   
   - The behavior is same among all output modes. (More clearly, it "ignores" the output mode.)
   - It won't delay the output records.
   - It produces the same output only once.
   
   So, deduplication operator itself does not have any compatibility limitation. If the combination of stateful operator A and deduplication operator should be blocked, it is due to the operator A.
   
   3. stream-stream join
   
   - I cannot reason about the proper behavior for update and complete mode.
   - It only accepts append mode now, so good to retain it.
   - For the equality join, inner join won't delay the outputs whereas outer join is still able to delay the outputs.
   - For the time interval join, it can delay the output records in both inner and outer. 
   
   4. flatMapGroupsWithState
   
   - It doesn't support complete mode at all.
   - It supports append mode and update mode, although it's purely relying on user function to do the right thing for the output mode, which I believe most of regular end users wouldn't do.
   - It loses the event time column on output of the operator.
   - It supports processing time semantic, which IMHO we should never allow this to be used with event time semantic in other stateful operators.
   
   That said, maybe we still need a hand-made enumeration of allow/block list for update/complete mode...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1013649051


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The behavior is
    * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {
     def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
       case Join(left, right, joinType, _, _)

Review Comment:
   We can remove this line as we support outer join as well. We only have issue with stream-stream time interval join (with all types) and flatMapGroupsWithState.
   (Arguably flatMapGroupsWithState followed by other stateful operator with all output modes should be disallowed, but I believe we will have a separate check for output mode so OK.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1016829965


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -169,16 +179,24 @@ object UnsupportedOperationChecker extends Logging {
           "DataFrames/Datasets")(plan)
     }
 
-    // Disallow multiple streaming aggregations
-    val aggregates = collectStreamingAggregates(plan)
+    val statefulOps = plan.collect {
+      case p: LogicalPlan if isStatefulOperation(p) => p
+    }
 
-    if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) {
+    if (statefulOps.size > 1 &&
+      outputMode != InternalOutputModes.Append &&
+      SQLConf.get.statefulOperatorCorrectnessCheckEnabled) {

Review Comment:
   Sure. IMHO it's still needed right now as we are still disabling 1) stream-stream time join as you mentioned above 2) multiple stateful ops when output mode is not append. 3) MapGroupWithState before aggregation. 
   
   For 3). There are few checks added [as in this comment](https://github.com/apache/spark/pull/38503#discussion_r1016174482). Specifically, when `statefulOperatorCorrectnessCheckEnabled` is true, `MapGroupWithState->agg` is disallowed, when it is false, `MapGroupWithState->agg` is allowed to execute.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014302073


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The behavior is
    * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {
     def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
       case Join(left, right, joinType, _, _)

Review Comment:
   Will do!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014391666


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -507,15 +507,13 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
       assertPassOnGlobalWatermarkLimit(
         s"single $joinType join in Append mode",
         streamRelation.join(streamRelation, joinType = RightOuter,
-          condition = Some(attributeWithWatermark === attribute)),

Review Comment:
   Thanks for the check! Resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] alex-balikov commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
alex-balikov commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014428293


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -157,10 +172,11 @@ object UnsupportedOperationChecker extends Logging {
     // Disallow multiple streaming aggregations
     val aggregates = collectStreamingAggregates(plan)
 
-    if (aggregates.size > 1) {
+    if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) {
       throwError(
         "Multiple streaming aggregations are not supported with " +

Review Comment:
   and we need to change the message accordingly - 'aggregations' -> 'stateful operators'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on PR #38503:
URL: https://github.com/apache/spark/pull/38503#issuecomment-1309381602

   [From this comment](https://github.com/apache/spark/pull/38503#discussion_r1013665318), Jungtaek suggested we check not only aggregates but also all stateful operators, with a kind intent to reduce engineering work of reasoning all tricky combinations of allowed / disallowed cases.
   
   I followed his idea and disallowed all stateful ops running on Complete and Update mode. Then there were some test cases failing, because the error message is changed. To name a few:
   ```
   test: streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Update mode: not supported
   exception message: flatMapGroupsWithState in update mode is not supported with aggregation on a streaming DataFrame/Dataset
   
   test: streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Complete mode: not supported
   message: flatMapGroupsWithState in update mode is not supported with aggregation on a streaming DataFrame/Dataset
   
   test: streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Update mode: not supported
   message: mapGroupsWithState is not supported with aggregation on a streaming DataFrame/Dataset;
   ```
   
   I sense that, from here we could say, the tricky cases Jungtaek suggested might have already been handled by the checker. Or put it in another way, __if we only disable multiple aggregates (in Update and Complete mode) but not multiple stateful ops in line 186 of [UnsupportedOperationChecker.scala](https://github.com/apache/spark/pull/38503/files/c0381591fa955042c1cf64221a6f4c909143405f..3637b807d57ff5c534386132fb9d47c7cce72705#diff-7c879c08d2f379c139d5229a88857229ae69bb48f0a138a3d64e1b2dde3502fe), things that failed before will still fail (in Update and Complete mode), but we still loose the restriction by allowing some multiple stateful ops in append mode__. 
   
   I raised this to Alex and Jungtaek, but we finally agree to proceed this direction. But then we found that it will also change existing allowed cases, for example [here](https://github.com/apache/spark/pull/38503#discussion_r1017413562) and [here](https://github.com/apache/spark/pull/38503#discussion_r1017447511). If this is pushed, currently running pipelines will fail.
   
   We could definitely then change the code to specifically allow these cases, but I sense that, for least engineering effort, it's enough to just revert the change from disallowing multiple stateful operators to just multiple aggregates (in Update and Complete mode). Given that the tricky cases will be handled somewhere else in the checker.
   
   I believe only disabling multiple aggregates (in Update and Complete mode) looses the restriction in a proper way. As we will still loose the restriction by allow cases that were disallowed before. (Before all multiple agg is disallowed, but now with append mode it is allowed), and all tests that were passing before still passes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   We only care about parents (down stream ops) of each operator.
   
   - Deduplication: Only counted as a streaming stateful operator when it has event time column.
   
   - In Complete, Update mode, Aggregations followed by any stateful operators are disallowed
     - Note that Dedup w/o event time is not counted here.
    
   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
     - If `flatMapGroupsWithState` is configured with processing time, don't need to check.
     - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. 
       - Note that Dedup w/o event time is not counted here.
     - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. 
     - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior]
   
   - stream-stream join: 
     - only allowed in append mode, inner join with equality.
     - Outer join with equality and time-interval join are disallowed.
     - Append mode: time interval join followed by any stateful ops: disallowed; 
     - Append mode: equality inner & outer join followed by any stateful op: supported
    - Can't do stream-stream join on other two modes.
   
   
     - Currently: `MapGroupsWithState` with aggregation is disallowed 
     - Currently: `MapGroupsWithState` only allowed in Update mode
     - 
   [Q] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? 
   [A] There may be some cases that key space is bounded. Also why Complete mode makes sense.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   We only care about parents (down stream ops) of each operator.
   
   - Deduplication: Only counted as a streaming stateful operator when it has event time column.
   
   - In Complete, Update mode, Aggregations followed by any stateful operators are disallowed
     - Note that Dedup w/o event time is not counted here.
    
   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
     - If `flatMapGroupsWithState` is configured with processing time, don't need to check.
     - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. 
       - Note that Dedup w/o event time is not counted here.
     - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. 
     - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior]
   
   - stream-stream join: 
     - only allowed in append mode, inner join with equality.
     - Outer join with equality and time-interval join are disallowed.
     - Append mode: time interval join followed by any stateful ops: disallowed; 
     - Append mode: equality inner & outer join followed by any stateful op: supported
    - Can't do stream-stream join on other two modes.
   
   
     - Currently: `MapGroupsWithState` with aggregation is disallowed 
     - Currently: `MapGroupsWithState` only allowed in Update mode
    
   [Q] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? 
   [A] There may be some cases that key space is bounded. Also why Complete mode makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1016172080


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,34 +42,58 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  private def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  private def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  /**
+   * Check if the given logical plan is a streaming stateful operations.
+   * @param p: The logical plan to be checked.
+   */
+  def isStatefulOperation(p: LogicalPlan): Boolean = {
+    p match {
+      case s: Aggregate if s.isStreaming => true
+      // Since the Distinct node will be replaced to Aggregate in the optimizer rule
+      // [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by
+      // assuming it as Aggregate.
+      case d @ Distinct(_: LogicalPlan) if d.isStreaming => true

Review Comment:
   This is borrowed from line 136 of `collectStreamingAggregates`. Note that in the original `isStatefulOperation` we do not check this. lmk if this is needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1016172080


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,34 +42,58 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  private def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  private def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  /**
+   * Check if the given logical plan is a streaming stateful operations.
+   * @param p: The logical plan to be checked.
+   */
+  def isStatefulOperation(p: LogicalPlan): Boolean = {
+    p match {
+      case s: Aggregate if s.isStreaming => true
+      // Since the Distinct node will be replaced to Aggregate in the optimizer rule
+      // [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by
+      // assuming it as Aggregate.
+      case d @ Distinct(_: LogicalPlan) if d.isStreaming => true

Review Comment:
   This is borrowed from line 136 of `collectStreamingAggregates`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014281945


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming

Review Comment:
   Oh thank you so much for spotting that out!



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {

Review Comment:
   Done, put it under isStreamStreamIntervalJoin. Also add `private` before the functions to limit the scope. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1017447511


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala:
##########
@@ -240,25 +240,30 @@ class FlatMapGroupsInPandasWithStateSuite extends StateStoreMetricsTest {
         .groupBy("key")
         .count()
 
-    testStream(result, Complete)(
-      AddData(inputData, "a"),
-      CheckNewAnswer(("a", 1)),
-      AddData(inputData, "a", "b"),
-      // mapGroups generates ("a", "2"), ("b", "1"); so increases counts of a and b by 1
-      CheckNewAnswer(("a", 2), ("b", 1)),
-      StopStream,
-      StartStream(),
-      AddData(inputData, "a", "b"),
-      // mapGroups should remove state for "a" and generate ("a", "-1"), ("b", "2") ;
-      // so increment a and b by 1
-      CheckNewAnswer(("a", 3), ("b", 2)),
-      StopStream,
-      StartStream(),
-      AddData(inputData, "a", "c"),
-      // mapGroups should recreate state for "a" and generate ("a", "1"), ("c", "1") ;
-      // so increment a and c by 1
-      CheckNewAnswer(("a", 4), ("b", 2), ("c", 1))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default
+    val exp = intercept[AnalysisException] {

Review Comment:
   This is the test I was mentioning @alex-balikov 
   
   Just to confirm, in the above suite, sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala, there is a query which is flatMapGroupsWithState followed by an agg in complete mode, it also seems to be supported before. But here after the change it throws because there are two stateful ops in complete mode. Should we also allow this case? I'm not sure here because I remember you mentioned flatMapGroupsWithState could change the eventtime so we should disallow... So is the test wrong..?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1023192849


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,41 +42,70 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  private def hasEventTimeColNeq(neq: Expression): Boolean = {

Review Comment:
   This function is only used in `hasRangeExprAgainstEventTimeCol`. Yes it does mean not equal, maybe rename it to `hasEventTimeColBinaryComp` makes more sense. 
   
   I did try to change function signature to be `private def hasEventTimeColNeq(neq: BinaryComparison): Boolean` but the compiler would complain because here
   ```
   case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan)
   ```
   neq can only be identified as Expression.
   
   I'll just put these two helper functions under `hasRangeExprAgainstEventTimeCol`, that makes the logic more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   So every stateful operators have its own characteristics...
   
   1. streaming aggregation
   
   - append mode requires event time column to be set in grouping key. It will emit records which watermark passed by the value of event time column. (delaying the output records in above comment)
   - For update mode and complete mode, having event time column in grouping key is optional. It only helps to evict the state. (In complete mode this even doesn't happen.) It won't delay the output records.
   - For update mode and complete mode, downstream operator(s) followed by streaming aggregation must handle the outputs streaming aggregation produces semantically properly. E.g. update mode will produce outputs multiple times for the same aggregated output, say, correction. complete mode will produce all the historical aggregated outputs.
   - I can't imagine the valid case for update mode and complete mode of streaming aggregation to be followed by another stateful operator.
   
   2. deduplication
   
   - The behavior is same among all output modes. (More clearly, it "ignores" the output mode.)
   - It won't delay the output records.
   - It produces the same output only once.
   
   So, deduplication operator itself does not have any compatibility limitation. If the combination of stateful operator A and deduplication operator should be blocked, it is due to the operator A.
   
   3. stream-stream join
   
   - I cannot reason about the proper behavior for update and complete mode.
   - It only accepts append mode now, so good to retain it.
   - It can delay the output records in both inner and outer in terms of time interval join. For the equality join, inner join won't delay the outputs whereas outer join is still able to delay the outputs.
   
   4. flatMapGroupsWithState
   
   - It doesn't support complete mode at all.
   - It supports append mode and update mode, although it's purely relying on user function to do the right thing for the output mode, which I believe most of regular end users wouldn't do.
   - It loses the event time column on output of the operator.
   - It supports processing time semantic, which IMHO we should never allow this to be used with event time semantic in other stateful operators.
   
   That said, maybe we still need a hand-made enumeration of allow/block list for update/complete mode...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   So every stateful operators have its own characteristics...
   
   1. streaming aggregation
   
   - append mode requires event time column to be set in grouping key. It will emit records which watermark passed by the value of event time column. (delaying the output records in above comment)
   - For update mode and complete mode, having event time column in grouping key is optional. It only helps to evict the state. (In complete mode this even doesn't happen.) It won't delay the output records.
   - For update mode and complete mode, downstream operator(s) followed by streaming aggregation must handle the outputs streaming aggregation produces semantically properly. E.g. update mode will produce outputs multiple times for the same aggregated output, say, correction. complete mode will produce all the historical aggregated outputs.
   - I can't imagine the valid case for update mode and complete mode of streaming aggregation to be followed by another stateful operator.
   
   2. deduplication
   
   - The behavior is same among all output modes. (More clearly, it "ignores" the output mode.)
   - It won't delay the output records.
   - It produces the same output only once.
   
   So, deduplication operator itself does not have any compatibility limitation. If the combination of stateful operator A and deduplication operator should be blocked, it is due to the operator A.
   
   3. stream-stream join
   
   - I cannot reason about the proper behavior for update and complete mode.
   - It only accepts append mode now, so good to retain it.
   
   4. flatMapGroupsWithState
   
   - It doesn't support complete mode at all.
   - It supports append mode and update mode, although it's purely relying on user function to do the right thing for the output mode, which I believe most of regular end users wouldn't do.
   - It loses the event time column on output of the operator.
   - It supports processing time semantic, which IMHO we should never allow this to be used with event time semantic in other stateful operators.
   
   That said, maybe we still need a hand-made enumeration of allow/block list for update/complete mode...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] alex-balikov commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
alex-balikov commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014425174


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -157,10 +193,11 @@ object UnsupportedOperationChecker extends Logging {
     // Disallow multiple streaming aggregations
     val aggregates = collectStreamingAggregates(plan)
 
-    if (aggregates.size > 1) {
+    if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) {

Review Comment:
   Absolutely. I agree that we should allow multiple stateful ops only in append mode. The other modes are not implemented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1017447320


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   I see. No problem at all! 
   
   Just to confirm, in the above suite, sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala, there is a query which is flatMapGroupsWithState followed by an agg in complete mode, it also seems to be supported before. But here after the change it throws because there are two stateful ops in complete mode. Should we also allow this case? I'm not sure here because I remember you mentioned flatMapGroupsWithState could change the eventtime so we should disallow... So is the test wrong..?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1023563189


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -42,40 +43,101 @@ object UnsupportedOperationChecker extends Logging {
   }
 
   /**
-   * Checks for possible correctness issue in chained stateful operators. The behavior is
-   * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
-   * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
-   * print a warning message.
+   * Checks if the expression has a event time column
+   * @param exp the expression to be checked
+   * @return true if it is a event time column.
    */
-  def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
-    def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
-      case Join(left, right, joinType, _, _)
-        if left.isStreaming && right.isStreaming && joinType != Inner => true
-      case f: FlatMapGroupsWithState
-        if f.isStreaming && f.outputMode == OutputMode.Append() => true
-      case _ => false
+  private def hasEventTimeCol(exp: Expression): Boolean = exp.exists {
+    case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+    case _ => false
+  }
+
+  /**
+   * Checks if the expression contains a range comparison, in which
+   * either side of the comparison is an event-time column. This is used for checking
+   * stream-stream time interval join.
+   * @param e the expression to be checked
+   * @return true if there is a time-interval join.
+   */
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = {
+    def hasEventTimeColBinaryComp(neq: Expression): Boolean = {
+      val exp = neq.asInstanceOf[BinaryComparison]
+      hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
     }
 
-    def isStatefulOperation(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate if s.isStreaming => true
-      case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => true
-      case f: FlatMapGroupsWithState if f.isStreaming => true
-      case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
-      case d: Deduplicate if d.isStreaming => true
+    e.exists {
+      case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+        hasEventTimeColBinaryComp(neq)
       case _ => false
     }
+  }
 
-    val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled
+  /**
+   * This method, combined with isStatefulOperationPossiblyEmitLateRows, determines all disallowed
+   * behaviors in multiple stateful operators.
+   * Concretely, All conditions defined below cannot be followed by any streaming stateful
+   * operator as defined in isStatefulOperationPossiblyEmitLateRows.
+   * @param p logical plan to be checked
+   * @param outputMode query output mode
+   * @return true if it is not allowed when followed by any streaming stateful
+   * operator as defined in isStatefulOperationPossiblyEmitLateRows.
+   */
+  private def ifCannotBeFollowedByStatefulOperation(
+      p: LogicalPlan, outputMode: OutputMode): Boolean = p match {
+    case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+      left.isStreaming && right.isStreaming &&
+        otherCondition.isDefined && hasRangeExprAgainstEventTimeCol(otherCondition.get)
+    // FlatMapGroupsWithState configured with event time
+    case f @ FlatMapGroupsWithState(_, _, _, _, _, _, _, _, _, timeout, _, _, _, _, _, _)
+      if f.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+    case p @ FlatMapGroupsInPandasWithState(_, _, _, _, _, timeout, _)
+      if p.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+    case a: Aggregate if a.isStreaming && outputMode != InternalOutputModes.Append => true
+    // Since the Distinct node will be replaced to Aggregate in the optimizer rule
+    // [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by
+    // assuming it as Aggregate.
+    case d @ Distinct(_: LogicalPlan) if d.isStreaming
+      && outputMode != InternalOutputModes.Append => true
+    case _ => false
+  }
 
+  /**
+   * This method is only used with ifCannotBeFollowedByStatefulOperation.
+   * As can tell from the name, it doesn't contain ALL streaming stateful operations,
+   * only the stateful operations that are possible to emit late rows.
+   * for example, a Deduplicate without a event time column is still a stateful operation
+   * but of less interested because it won't emit late records because of watermark.
+   * @param p the logical plan to be checked
+   * @return true if there is a streaming stateful operation
+   */
+  private def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
+    case s: Aggregate if s.isStreaming => true
+    // Since the Distinct node will be replaced to Aggregate in the optimizer rule
+    // [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by
+    // assuming it as Aggregate.
+    case d @ Distinct(_: LogicalPlan) if d.isStreaming => true
+    case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => true
+    case f: FlatMapGroupsWithState if f.isStreaming => true
+    case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
+    // Deduplicate also works without event time column even in streaming,
+    // in such cases, although Dedup is still a stateful operation in a streaming
+    // query, it could be ignored in all checks below, so let it return false.
+    case d: Deduplicate if d.isStreaming && d.keys.exists(hasEventTimeCol) => true
+    case _ => false
+  }
+  /**
+   * Checks for possible correctness issue in chained stateful operators. The behavior is
+   * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
+   * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
+   * print a warning message.
+   */
+  def checkStreamingQueryGlobalWatermarkLimit(plan: LogicalPlan, outputMode: OutputMode): Unit = {
+    val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled
     try {
       plan.foreach { subPlan =>
-        if (isStatefulOperation(subPlan)) {
+        if (isStatefulOperationPossiblyEmitLateRows(subPlan)) {

Review Comment:
   First of all, the method name is not correct. For example, deduplicate never produces delayed rows.
   
   Say, if the upstream operator (descendant node for the tree structure) is bound to the case of "cannot be followed by stateful operation". Then the downstream operator just needs to be "stateful operation" to be disallowed. Deduplication without event time column is just an exception. 
   
   Could we rename the method to "isStatefulOperation", and also simplify the method doc? Adding exceptional case to the method name doesn't seem to be easy.
   
   (We could be much stricter on semantic and classify several kinds/types from "cannot be followed by stateful operation" as there are multiple reasons the operator cannot be followed by other stateful operator, but I don't think we want to be super exhaustive.)



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -315,15 +298,15 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
     // future.
     expectedMsgs = Seq("Complete"))
 
-  for (outputMode <- Seq(Append, Update, Complete)) {
+  for (outputMode <- Seq(Append, Complete)) {

Review Comment:
   The reason to fail here is not because of following aggregation operator. It's just because mapGroupsWithState is not supported with append/complete mode.
   
   * Append/Complete mode: `mapGroupsWithState is not supported with Complete output mode on a streaming DataFrame/Dataset`
   * Update mode: `Detected pattern of possible 'correctness' issue due to global watermark. The query contains stateful operation ...`
   
   That said, test cases here are effectively the same with above test cases, without aggregation.
   
   If you have test case for update mode on mapGroupsWithState followed by aggregation, then it's OK to just remove this test case. Otherwise, let's just change this to test for Update mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on PR #38503:
URL: https://github.com/apache/spark/pull/38503#issuecomment-1315727807

   > Shouldn't you also fix https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala to remove the flag->false setting?
   
   Oh I'm sorry. There were some problems when I'm rebasing the branch. I'll add it now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021160270


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -188,17 +194,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
     expectedMsgs = Seq("Complete"))
 
   // FlatMapGroupsWithState(Update) in streaming with aggregation
-  for (outputMode <- Seq(Append, Update, Complete)) {
+  for (outputMode <- Seq(Update, Complete)) {
     assertNotSupportedInStreamingPlan(
       "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation " +
         s"with aggregation in $outputMode mode",
       TestFlatMapGroupsWithState(
         null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,
         Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
       outputMode = outputMode,
-      expectedMsgs = Seq("flatMapGroupsWithState in update mode", "with aggregation"))
+      expectedMsgs = Seq("Multiple stateful operators", "Update", "Complete"))
   }
 
+  assertNotSupportedInStreamingPlan(
+    "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation " +
+      s"with aggregation in Append mode",
+    TestFlatMapGroupsWithState(
+      null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,

Review Comment:
   This is one of the reason I want to see the allowed operations be more restrictive rather than saying op A - op B is supported with some modes, op B - op A is supported with some modes, etc. 
   
   The major reason I blocked multiple stateful operators at all was that we figured out stream-stream outer join followed by another stream-stream outer join is not working, and I realized that is not only the case. Similar risks are everywhere and it's really hard to think hard on all matrix.
   
   That said, unfortunately, the case we do not fail in unsupported operation checker does not always mean we guarantee the case to work properly. We probably have checked in some cases, but I don't believe it's backed by theory (otherwise we should have known the limit of global watermark much earlier).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   We only care about parents (down stream ops) of each operator.
   
   - Deduplication: Only counted as a streaming stateful operator when it has event time column.
   
   - In Complete, Update mode, Aggregations followed by any stateful operators are disallowed
     - Note that Dedup w/o event time is not counted here.
    
   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
     - If `flatMapGroupsWithState` is configured with processing time, don't need to check.
     - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. 
       - Note that Dedup w/o event time is not counted here.
     - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. 
     - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior]
   
   - stream-stream join: 
     - only allowed in append mode, inner join with equality.
     - Outer join with equality and time-interval join are disallowed.
     - Append mode: time interval join followed by any stateful ops: disallowed; 
     - Append mode: equality inner & outer join followed by any stateful op: supported
   
   
     - Currently: `MapGroupsWithState` with aggregation is disallowed 
     - Currently: `MapGroupsWithState` only allowed in Update mode
    
   [Q] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? 
   [A] There may be some cases that key space is bounded. Also why Complete mode makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] alex-balikov commented on pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
alex-balikov commented on PR #38503:
URL: https://github.com/apache/spark/pull/38503#issuecomment-1315726141

   Shouldn't you also fix https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala to remove the flag->false setting?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   We only care about parents (down stream ops) of each operator.
   
   - Dedup: Only counted as a streaming stateful operator when it has event time column.
   
   - In Complete, Update mode, Aggregations followed by any stateful op are disallowed
     - Note that Dedup w/o event time is not counted here.
    
   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
     - If `flatMapGroupsWithState` is configured with processing time, don't need to check.
     - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. Note that Dedup w/o event time is not counted here.
     - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. 
     - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior]
   
   - stream-stream join: 
     - only allowed in append mode, inner join with equality.
     - Outer join with equality and time-interval join are disallowed.
     - Append mode: time interval join followed by any stateful ops: disallowed; equality inner & outer join followed by any stateful op: supported
    - Can't do stream-stream join on other two modes.
   
   
     - Currently: `MapGroupsWithState` with aggregation is disallowed 
     - Currently: `MapGroupsWithState` only allowed in Update mode
     - 
   [Q] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? 
   [A] There may be some cases that key space is bounded. Also why Complete mode makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   in short: we only care about parents (down stream ops) of each operator.
   
   - In Complete, Update mode, Aggregations followed by any stateful op are disallowed
   - Dedup: should count above.
   - stream-stream join: 
     - only allowed in append mode, inner join with equality.
     - Outer join with equality and time-interval join are disallowed.
     - Append mode: time interval join followed by any stateful ops: disallowed; equality inner & outer join followed by any stateful op: supported
    - Can't do stream-stream join on other two modes.
   
   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
     - Currently: `MapGroupsWithState` with aggregation is disallowed 
     - Currently: `MapGroupsWithState` only allowed in Update mode
     - [?] After this PR: `MapGroupsWithState` what?
     - Currently: `flatMapGroupsWithState`'s output mode must match query output mode if no aggs -> [keep this behavior] 
     - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior]
     - Currently: agg followed by `flatMapGroupsWithState` in Append mode is disallowed -> [change this behavior]
     - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. 
     - After this PR: `flatMapGroupsWithState` followed by any stateful operator is disallowed.
     - But `flatMapGroupsWithState` followed by Dedup is allowed. Actually as long as downstream stateful op doesn't require event time col.
     - If `flatMapGroupsWithState` is configured with processing time, don't need to check.
   
   [?] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? => There may be some cases that key space is bounded. Also why Complete mode makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1016177874


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,34 +42,58 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  private def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  private def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  /**
+   * Check if the given logical plan is a streaming stateful operations.
+   * @param p: The logical plan to be checked.
+   */
+  def isStatefulOperation(p: LogicalPlan): Boolean = {
+    p match {
+      case s: Aggregate if s.isStreaming => true
+      // Since the Distinct node will be replaced to Aggregate in the optimizer rule
+      // [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by
+      // assuming it as Aggregate.
+      case d @ Distinct(_: LogicalPlan) if d.isStreaming => true

Review Comment:
   Yeah, nice finding!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] alex-balikov commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
alex-balikov commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1017413562


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   hmm, the above scenario dropDuplicates -> aggregation was supported before. So I was wrong - dropDuplicates and also inner join with timestamp equality condition can be followed by a stateful operator in any mode - these operators - dropDuplicates and inner equality join  do not delay the output records. I apologize for the randomization, I think the above scenario is important to continue supporting.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -215,20 +220,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Complete)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("a" -> 3L, "b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default
+    val exp = intercept[AnalysisException] {

Review Comment:
   same here - the above scenario should work



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -188,17 +194,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
     expectedMsgs = Seq("Complete"))
 
   // FlatMapGroupsWithState(Update) in streaming with aggregation
-  for (outputMode <- Seq(Append, Update, Complete)) {
+  for (outputMode <- Seq(Update, Complete)) {
     assertNotSupportedInStreamingPlan(
       "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation " +
         s"with aggregation in $outputMode mode",
       TestFlatMapGroupsWithState(
         null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,
         Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
       outputMode = outputMode,
-      expectedMsgs = Seq("flatMapGroupsWithState in update mode", "with aggregation"))
+      expectedMsgs = Seq("Multiple stateful operators", "Update", "Complete"))
   }
 
+  assertNotSupportedInStreamingPlan(
+    "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation " +
+      s"with aggregation in Append mode",
+    TestFlatMapGroupsWithState(
+      null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,

Review Comment:
   I am not sure what is the semantics of setting output mode to Update on flatMapGroupsWithState but it does not match the Append output mode below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1013649051


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The behavior is
    * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {
     def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
       case Join(left, right, joinType, _, _)

Review Comment:
   We can remove this line as we support outer join as well. We only have issue with stream-stream time interval join (with all types) and flatMapGroupsWithState.
   (Arguably flatMapGroupsWithState with all output modes should be disallowed, but I believe we have a separate check for output mode so OK.)



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The behavior is
    * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {
     def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
       case Join(left, right, joinType, _, _)

Review Comment:
   We can remove this line as we support outer join as well. We only have issue with stream-stream time interval join (with all types) and flatMapGroupsWithState.
   (Arguably flatMapGroupsWithState with all output modes should be disallowed, but I believe we will have a separate check for output mode so OK.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   So every stateful operators have its own characteristics...
   
   1. streaming aggregation
   
   - append mode requires event time column to be set in grouping key. It will emit records which watermark passed by the value of event time column. (delaying the output records in above comment)
   - For update mode and complete mode, having event time column in grouping key is optional. It only helps to evict the state. (In complete mode this even doesn't happen.) It won't delay the output records.
   - For update mode and complete mode, downstream operator(s) followed by streaming aggregation must handle the outputs streaming aggregation produces semantically properly. E.g. update mode will produce outputs multiple times for the same aggregated output, say, correction. complete mode will produce all the historical aggregated outputs.
   - I can't imagine the valid case for update mode and complete mode of streaming aggregation to be followed by another stateful operator.
   
   2. deduplication
   
   - The behavior is same among all output modes. (More clearly, it "ignores" the output mode.)
   - It won't delay the output records.
   - It produces the same output only once.
   - This does not require event time column to be set. It's only used for eviction.
   
   So, deduplication operator itself does not have any compatibility limitation. If the combination of stateful operator A and deduplication operator should be blocked, it is due to the operator A.
   
   3. stream-stream join
   
   - I cannot reason about the proper behavior for update and complete mode.
   - It only accepts append mode now, so good to retain it.
   - For the equality join, inner join won't delay the outputs whereas outer join is still able to delay the outputs.
   - For the time interval join, it can delay the output records in both inner and outer. 
   
   4. flatMapGroupsWithState
   
   - It doesn't support complete mode at all.
   - It supports append mode and update mode, although it's purely relying on user function to do the right thing for the output mode, which I believe most of regular end users wouldn't do.
   - It loses the event time column on output of the operator.
   - It supports processing time semantic, which IMHO we should never allow this to be used with event time semantic in other stateful operators.
   
   That said, maybe we still need a hand-made enumeration of allow/block list for update/complete mode...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1016174482


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -154,15 +179,24 @@ object UnsupportedOperationChecker extends Logging {
           "DataFrames/Datasets")(plan)
     }
 
-    // Disallow multiple streaming aggregations
-    val aggregates = collectStreamingAggregates(plan)
+    val statefulOps = plan.collect {
+      case p: LogicalPlan if isStatefulOperation(p) => p
+    }
 
-    if (aggregates.size > 1) {
+    if (statefulOps.size > 1 &&
+      outputMode != InternalOutputModes.Append &&
+      SQLConf.get.statefulOperatorCorrectnessCheckEnabled) {
       throwError(
-        "Multiple streaming aggregations are not supported with " +
-          "streaming DataFrames/Datasets")(plan)
+        "Multiple stateful operators are not supported with " +
+          "streaming DataFrames/Datasets for Update and Complete mode. " +
+          "Only Append mode is supported. If you understand the possible risk of " +
+          "correctness issue and still need to run the query you can " +
+          "disable this check by setting the config " +
+          "`spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.")(plan)

Review Comment:
   Related tests also see Line 237 - Line 278 in https://github.com/apache/spark/blob/3637b807d57ff5c534386132fb9d47c7cce72705/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala#L237-L278



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1016181346


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -169,16 +179,24 @@ object UnsupportedOperationChecker extends Logging {
           "DataFrames/Datasets")(plan)
     }
 
-    // Disallow multiple streaming aggregations
-    val aggregates = collectStreamingAggregates(plan)
+    val statefulOps = plan.collect {
+      case p: LogicalPlan if isStatefulOperation(p) => p
+    }
 
-    if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) {
+    if (statefulOps.size > 1 &&
+      outputMode != InternalOutputModes.Append &&
+      SQLConf.get.statefulOperatorCorrectnessCheckEnabled) {

Review Comment:
   I think this configuration should not just exist now as we unlocked various use cases (except stream-stream time interval join followed by other stateful operator), but let's handle it as separate issue. (I'll file a JIRA ticket on it.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014774005


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -157,10 +172,11 @@ object UnsupportedOperationChecker extends Logging {
     // Disallow multiple streaming aggregations
     val aggregates = collectStreamingAggregates(plan)
 
-    if (aggregates.size > 1) {
+    if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) {

Review Comment:
   @WweiL You still need to change above to collect all stateful operators rather than only streaming aggregates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1022251949


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   Must works: 
   - Join on equality chained with agg in Append mode
   - Time-interval join & flatmapgroupswithSate followed by stateful op: disallowed
   - Just don't count when dedup doesn't have eventtime 
   - ? only allow all stateful followed by dedup -> only in Update and Complete mode



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1022251949


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   Must works: 
   - Join on equality chained with agg in Append mode
   - Time-interval join & flatmapgroupswithSate followed by stateful op: disallowed
   - Just don't count when dedup doesn't have eventtime 
   - ? only allow all stateful followed by dedup -> only in Update and Complete mode



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   
   - Deduplication: Only counted as a streaming stateful operator when it has event time column.
   
   - In Complete, Update mode, Aggregations followed by any stateful operators are disallowed
     - Note that Dedup w/o event time is not counted here.
    
   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
     - If `flatMapGroupsWithState` is configured with processing time, don't need to check.
     - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. 
       - Note that Dedup w/o event time is not counted here.
     - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. 
     - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior]
   
   - stream-stream join: 
     - Append mode: time interval join followed by any stateful ops: disallowed; 
     - Append mode: equality inner & outer join followed by any stateful op: supported
     - Currently: Only allowed in append mode, inner join with equality -> [keep this behavior]
     - Currently: Outer join with equality and time-interval join are disallowed -> [keep this behavior]
   
   [Q] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? 
   [A] There may be some cases that key space is bounded. Also why Complete mode makes sense.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default

Review Comment:
   Eventually, the above boils down to the simple 3 golden rules:
   1. `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. 
   2. Stream-stream time interval join followed by any stateful operator is disallowed. Note that this is only allowed in Append mode.
   3. Aggregation followed by any stateful operators is disallowed in Complete and Update mode.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1024292204


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -315,15 +298,15 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
     // future.
     expectedMsgs = Seq("Complete"))
 
-  for (outputMode <- Seq(Append, Update, Complete)) {
+  for (outputMode <- Seq(Append, Complete)) {

Review Comment:
   Thanks for the close look! Yes I just realized [there is indeed a test on mapGroupsWithState in Update mode](https://github.com/apache/spark/blob/255691553e2f8f531c81a1e5b5a81d8559d3ef7a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala#L516), I'll just remove these tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38503:
URL: https://github.com/apache/spark/pull/38503#issuecomment-1317644092

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] alex-balikov commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
alex-balikov commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1023155184


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -940,22 +1056,22 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
   def assertPassOnGlobalWatermarkLimit(
       testNamePostfix: String,
       plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
-    testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = false)
+      outputMode: OutputMode = OutputMode.Append()): Unit = {
+    testGlobalWatermarkLimit(testNamePostfix, plan, expectFailure = false, outputMode)
   }
 
   def assertFailOnGlobalWatermarkLimit(
       testNamePostfix: String,
       plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
-    testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = true)
+      outputMode: OutputMode = OutputMode.Append()): Unit = {

Review Comment:
   ditto



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,41 +42,70 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  private def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  private def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  private def isStatefulOperationPossiblyEmitLateRows(
+      p: LogicalPlan, outputMode: OutputMode): Boolean = p match {
+    case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+      left.isStreaming && right.isStreaming &&
+        otherCondition.isDefined && hasRangeExprAgainstEventTimeCol(otherCondition.get)
+    // FlatMapGroupsWithState configured with event time
+    case f @ FlatMapGroupsWithState(_, _, _, _, _, _, _, _, _, timeout, _, _, _, _, _, _)
+      if f.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+    case p @ FlatMapGroupsInPandasWithState(_, _, _, _, _, timeout, _)
+      if p.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+    case a: Aggregate if a.isStreaming && outputMode != InternalOutputModes.Append => true
+    // Since the Distinct node will be replaced to Aggregate in the optimizer rule
+    // [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by
+    // assuming it as Aggregate.
+    case d @ Distinct(_: LogicalPlan) if d.isStreaming
+      && outputMode != InternalOutputModes.Append => true
+    case _ => false
+  }
+
+  private def isStreamingStatefulOperation(p: LogicalPlan): Boolean = p match {
+    case s: Aggregate if s.isStreaming => true
+    // Since the Distinct node will be replaced to Aggregate in the optimizer rule
+    // [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by
+    // assuming it as Aggregate.
+    case d @ Distinct(_: LogicalPlan) if d.isStreaming => true
+    case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => true
+    case f: FlatMapGroupsWithState if f.isStreaming => true
+    case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
+    // Deduplicate also works without event time column even in streaming,
+    // in such cases, although Dedup is still a stateful operation in a streaming
+    // query, it could be ignored in all checks below, so let it return false.
+    case d: Deduplicate if d.isStreaming && d.keys.exists(hasEventTimeCol) => true
+    case _ => false
+  }
   /**
    * Checks for possible correctness issue in chained stateful operators. The behavior is
    * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
    * print a warning message.
    */
-  def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
-    def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
-      case Join(left, right, joinType, _, _)
-        if left.isStreaming && right.isStreaming && joinType != Inner => true
-      case f: FlatMapGroupsWithState
-        if f.isStreaming && f.outputMode == OutputMode.Append() => true
-      case _ => false
-    }
-
-    def isStatefulOperation(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate if s.isStreaming => true
-      case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => true
-      case f: FlatMapGroupsWithState if f.isStreaming => true
-      case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
-      case d: Deduplicate if d.isStreaming => true
-      case _ => false
-    }
-
+  def checkStreamingQueryGlobalWatermarkLimit(plan: LogicalPlan, outputMode: OutputMode): Unit = {

Review Comment:
   a comment 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,41 +42,70 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  private def hasEventTimeColNeq(neq: Expression): Boolean = {

Review Comment:
   What does 'Neq' stand for here? - 'not equal'? How does this method know that that is the case - seems like it can be applied to any binary expression. Maybe add an assert?



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -940,22 +1056,22 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
   def assertPassOnGlobalWatermarkLimit(
       testNamePostfix: String,
       plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
-    testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = false)
+      outputMode: OutputMode = OutputMode.Append()): Unit = {

Review Comment:
   I would suggest not making Append a default parameter value but specifying it explicitly in each test for better readability



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,41 +42,70 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = e.exists {

Review Comment:
   a short comment on the methods may be helpful



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -940,22 +1056,22 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
   def assertPassOnGlobalWatermarkLimit(
       testNamePostfix: String,
       plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
-    testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = false)
+      outputMode: OutputMode = OutputMode.Append()): Unit = {
+    testGlobalWatermarkLimit(testNamePostfix, plan, expectFailure = false, outputMode)
   }
 
   def assertFailOnGlobalWatermarkLimit(
       testNamePostfix: String,
       plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
-    testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = true)
+      outputMode: OutputMode = OutputMode.Append()): Unit = {
+    testGlobalWatermarkLimit(testNamePostfix, plan, expectFailure = true, outputMode)
   }
 
   def testGlobalWatermarkLimit(
       testNamePostfix: String,
       plan: LogicalPlan,
-      outputMode: OutputMode,
-      expectFailure: Boolean): Unit = {
+      expectFailure: Boolean,

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38503:
URL: https://github.com/apache/spark/pull/38503#issuecomment-1302620081

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org