You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/04 06:46:16 UTC

[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1013649051


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The behavior is
    * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {
     def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
       case Join(left, right, joinType, _, _)

Review Comment:
   We can remove this line as we support outer join as well. We only have issue with stream-stream time interval join and flatMapGroupsWithState.
   (Arguably flatMapGroupsWithState with all output modes should be disallowed, but I believe we have a separate check for output mode so OK.)



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming

Review Comment:
   maybe missing &&? otherwise this line would be no-op



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -507,15 +507,13 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
       assertPassOnGlobalWatermarkLimit(
         s"single $joinType join in Append mode",
         streamRelation.join(streamRelation, joinType = RightOuter,
-          condition = Some(attributeWithWatermark === attribute)),

Review Comment:
   The above code comment and related tests are outdated. 
   
   For (time-window) equality join, (inner, left outer, right outer, and full outer) should just work with recent fix of late record filtering. For time-interval join, all types won't work if there is following stateful operator.
   
   That said, we should now have redundant test cases for time-interval join specifically to check the different expectation.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The behavior is
    * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {
     def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
       case Join(left, right, joinType, _, _)
         if left.isStreaming && right.isStreaming && joinType != Inner => true
       case f: FlatMapGroupsWithState
         if f.isStreaming && f.outputMode == OutputMode.Append() => true
-      case _ => false
+      case _ =>

Review Comment:
   nit: unnecessary change



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -157,10 +193,11 @@ object UnsupportedOperationChecker extends Logging {
     // Disallow multiple streaming aggregations
     val aggregates = collectStreamingAggregates(plan)
 
-    if (aggregates.size > 1) {
+    if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) {

Review Comment:
   I think it's not only multiple aggregations. Reasoning streaming aggregation followed by time window join with update mode would be very tricky. I'd say we should simply disallow multiple stateful operators with update and complete mode at all.
   (There could be some combinations where they still make sense for update/complete mode, but it doesn't seem to be trivial to reason and list up allowlist or blocklist.)
   
   cc. @alex-balikov 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The behavior is
    * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {
     def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
-      case s: Aggregate
-        if s.isStreaming && outputMode == InternalOutputModes.Append => true
       case Join(left, right, joinType, _, _)

Review Comment:
   I think we should check the stream-stream time interval join in here. I understand having separate check helps us to produce better error message, but we are disallowing the query at all whereas it could run with the correctness check config turned off.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {

Review Comment:
   The method name is too general than what it actually does. Maybe `hasRangeExprAgainstEventTimeCol` ?
   Or just define the method from the start of `isStreamStreamIntervalJoin` so that it is under the proper implicit context.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##########
@@ -40,378 +40,465 @@ class MultiStatefulOperatorsSuite
   }
 
   test("window agg -> window agg, append mode") {
-    // TODO: SPARK-40940 - Fix the unsupported ops checker to allow chaining of stateful ops.
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val stream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .groupBy(window($"window", "10 seconds"))
-        .agg(count("*").as("count"), sum("count").as("sum"))
-        .select($"window".getField("start").cast("long").as[Long],
-          $"count".as[Long], $"sum".as[Long])
-
-      testStream(stream)(
-        AddData(inputData, 10 to 21: _*),
-        // op1 W (0, 0)
-        // agg: [10, 15) 5, [15, 20) 5, [20, 25) 2
-        // output: None
-        // state: [10, 15) 5, [15, 20) 5, [20, 25) 2
-        // op2 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 21)
-        // agg: None
-        // output: [10, 15) 5, [15, 20) 5
-        // state: [20, 25) 2
-        // op2 W (0, 21)
-        // agg: [10, 20) (2, 10)
-        // output: [10, 20) (2, 10)
-        // state: None
-        CheckNewAnswer((10, 2, 10)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0)),
-
-        AddData(inputData, 10 to 29: _*),
-        // op1 W (21, 21)
-        // agg: [10, 15) 5 - late, [15, 20) 5 - late, [20, 25) 5, [25, 30) 5
-        // output: None
-        // state: [20, 25) 7, [25, 30) 5
-        // op2 W (21, 21)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (21, 29)
-        // agg: None
-        // output: [20, 25) 7
-        // state: [25, 30) 5
-        // op2 W (21, 29)
-        // agg: [20, 30) (1, 7)
-        // output: None
-        // state: [20, 30) (1, 7)
-        CheckNewAnswer(),
-        assertNumStateRows(Seq(1, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 2)),
-
-        // Move the watermark.
-        AddData(inputData, 30, 31),
-        // op1 W (29, 29)
-        // agg: [30, 35) 2
-        // output: None
-        // state: [25, 30) 5 [30, 35) 2
-        // op2 W (29, 29)
-        // agg: None
-        // output: None
-        // state: [20, 30) (1, 7)
-
-        // no-data batch triggered
-
-        // op1 W (29, 31)
-        // agg: None
-        // output: [25, 30) 5
-        // state: [30, 35) 2
-        // op2 W (29, 31)
-        // agg: [20, 30) (2, 12)
-        // output: [20, 30) (2, 12)
-        // state: None
-        CheckNewAnswer((20, 2, 12)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val stream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .groupBy(window($"window", "10 seconds"))
+      .agg(count("*").as("count"), sum("count").as("sum"))
+      .select($"window".getField("start").cast("long").as[Long],
+        $"count".as[Long], $"sum".as[Long])
+
+    testStream(stream)(
+      AddData(inputData, 10 to 21: _*),
+      // op1 W (0, 0)
+      // agg: [10, 15) 5, [15, 20) 5, [20, 25) 2
+      // output: None
+      // state: [10, 15) 5, [15, 20) 5, [20, 25) 2
+      // op2 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 21)
+      // agg: None
+      // output: [10, 15) 5, [15, 20) 5
+      // state: [20, 25) 2
+      // op2 W (0, 21)
+      // agg: [10, 20) (2, 10)
+      // output: [10, 20) (2, 10)
+      // state: None
+      CheckNewAnswer((10, 2, 10)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0)),
+
+      AddData(inputData, 10 to 29: _*),
+      // op1 W (21, 21)
+      // agg: [10, 15) 5 - late, [15, 20) 5 - late, [20, 25) 5, [25, 30) 5
+      // output: None
+      // state: [20, 25) 7, [25, 30) 5
+      // op2 W (21, 21)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (21, 29)
+      // agg: None
+      // output: [20, 25) 7
+      // state: [25, 30) 5
+      // op2 W (21, 29)
+      // agg: [20, 30) (1, 7)
+      // output: None
+      // state: [20, 30) (1, 7)
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 2)),
+
+      // Move the watermark.
+      AddData(inputData, 30, 31),
+      // op1 W (29, 29)
+      // agg: [30, 35) 2
+      // output: None
+      // state: [25, 30) 5 [30, 35) 2
+      // op2 W (29, 29)
+      // agg: None
+      // output: None
+      // state: [20, 30) (1, 7)
+
+      // no-data batch triggered
+
+      // op1 W (29, 31)
+      // agg: None
+      // output: [25, 30) 5
+      // state: [30, 35) 2
+      // op2 W (29, 31)
+      // agg: [20, 30) (2, 12)
+      // output: [20, 30) (2, 12)
+      // state: None
+      CheckNewAnswer((20, 2, 12)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
   }
 
   test("agg -> agg -> agg, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val stream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .groupBy(window(window_time($"window"), "10 seconds"))
-        .agg(count("*").as("count"), sum("count").as("sum"))
-        .groupBy(window(window_time($"window"), "20 seconds"))
-        .agg(count("*").as("count"), sum("sum").as("sum"))
-        .select(
-          $"window".getField("start").cast("long").as[Long],
-          $"window".getField("end").cast("long").as[Long],
-          $"count".as[Long], $"sum".as[Long])
-
-      testStream(stream)(
-        AddData(inputData, 0 to 37: _*),
-        // op1 W (0, 0)
-        // agg: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5,
-        //   [35, 40) 3
-        // output: None
-        // state: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5,
-        //   [35, 40) 3
-        // op2 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-        // op3 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 37)
-        // agg: None
-        // output: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5
-        // state: [35, 40) 3
-        // op2 W (0, 37)
-        // agg: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10), [30, 40) (1, 5)
-        // output: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10)
-        // state: [30, 40) (1, 5)
-        // op3 W (0, 37)
-        // agg: [0, 20) (2, 20), [20, 40) (1, 10)
-        // output: [0, 20) (2, 20)
-        // state: [20, 40) (1, 10)
-        CheckNewAnswer((0, 20, 2, 20)),
-        assertNumStateRows(Seq(1, 1, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0, 0)),
-
-        AddData(inputData, 30 to 60: _*),
-        // op1 W (37, 37)
-        // dropped rows: [30, 35), 1 row <= note that 35, 36, 37 are still in effect
-        // agg: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 65) 1
-        // output: None
-        // state: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 65) 1
-        // op2 W (37, 37)
-        // output: None
-        // state: [30, 40) (1, 5)
-        // op3 W (37, 37)
-        // output: None
-        // state: [20, 40) (1, 10)
-
-        // no-data batch
-        // op1 W (37, 60)
-        // output: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5
-        // state: [60, 65) 1
-        // op2 W (37, 60)
-        // agg: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
-        // output: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
-        // state: None
-        // op3 W (37, 60)
-        // agg: [20, 40) (2, 23), [40, 60) (2, 20)
-        // output: [20, 40) (2, 23), [40, 60) (2, 20)
-        // state: None
-
-        CheckNewAnswer((20, 40, 2, 23), (40, 60, 2, 20)),
-        assertNumStateRows(Seq(0, 0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0, 1))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val stream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .groupBy(window(window_time($"window"), "10 seconds"))
+      .agg(count("*").as("count"), sum("count").as("sum"))
+      .groupBy(window(window_time($"window"), "20 seconds"))
+      .agg(count("*").as("count"), sum("sum").as("sum"))
+      .select(
+        $"window".getField("start").cast("long").as[Long],
+        $"window".getField("end").cast("long").as[Long],
+        $"count".as[Long], $"sum".as[Long])
+
+    testStream(stream)(
+      AddData(inputData, 0 to 37: _*),
+      // op1 W (0, 0)
+      // agg: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5,
+      //   [35, 40) 3
+      // output: None
+      // state: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5,
+      //   [35, 40) 3
+      // op2 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+      // op3 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 37)
+      // agg: None
+      // output: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5
+      // state: [35, 40) 3
+      // op2 W (0, 37)
+      // agg: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10), [30, 40) (1, 5)
+      // output: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10)
+      // state: [30, 40) (1, 5)
+      // op3 W (0, 37)
+      // agg: [0, 20) (2, 20), [20, 40) (1, 10)
+      // output: [0, 20) (2, 20)
+      // state: [20, 40) (1, 10)
+      CheckNewAnswer((0, 20, 2, 20)),
+      assertNumStateRows(Seq(1, 1, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0, 0)),
+
+      AddData(inputData, 30 to 60: _*),
+      // op1 W (37, 37)
+      // dropped rows: [30, 35), 1 row <= note that 35, 36, 37 are still in effect
+      // agg: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 65) 1
+      // output: None
+      // state: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 65) 1
+      // op2 W (37, 37)
+      // output: None
+      // state: [30, 40) (1, 5)
+      // op3 W (37, 37)
+      // output: None
+      // state: [20, 40) (1, 10)
+
+      // no-data batch
+      // op1 W (37, 60)
+      // output: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5
+      // state: [60, 65) 1
+      // op2 W (37, 60)
+      // agg: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
+      // output: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
+      // state: None
+      // op3 W (37, 60)
+      // agg: [20, 40) (2, 23), [40, 60) (2, 20)
+      // output: [20, 40) (2, 23), [40, 60) (2, 20)
+      // state: None
+
+      CheckNewAnswer((20, 40, 2, 23), (40, 60, 2, 20)),
+      assertNumStateRows(Seq(0, 0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0, 1))
+    )
   }
 
   test("stream deduplication -> aggregation, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val deduplication = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "10 seconds")
-        .dropDuplicates("value", "eventTime")
-
-      val windowedAggregation = deduplication
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"), sum("value").as("sum"))
-        .select($"window".getField("start").cast("long").as[Long],
-          $"count".as[Long])
-
-      testStream(windowedAggregation)(
-        AddData(inputData, 1 to 15: _*),
-        // op1 W (0, 0)
-        // input: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // deduplicated: None
-        // output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // state: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // op2 W (0, 0)
-        // agg: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
-        // output: None
-        // state: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
-
-        // no-data batch triggered
-
-        // op1 W (0, 5)
-        // agg: None
-        // output: None
-        // state: 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // op2 W (0, 5)
-        // agg: None
-        // output: [0, 5) 4
-        // state: [5, 10) 5 [10, 15) 5, [15, 20) 1
-        CheckNewAnswer((0, 4)),
-        assertNumStateRows(Seq(3, 10)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val deduplication = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "10 seconds")
+      .dropDuplicates("value", "eventTime")
+
+    val windowedAggregation = deduplication
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"), sum("value").as("sum"))
+      .select($"window".getField("start").cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(windowedAggregation)(
+      AddData(inputData, 1 to 15: _*),
+      // op1 W (0, 0)
+      // input: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // deduplicated: None
+      // output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // state: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // op2 W (0, 0)
+      // agg: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
+      // output: None
+      // state: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
+
+      // no-data batch triggered
+
+      // op1 W (0, 5)
+      // agg: None
+      // output: None
+      // state: 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // op2 W (0, 5)
+      // agg: None
+      // output: [0, 5) 4
+      // state: [5, 10) 5 [10, 15) 5, [15, 20) 1
+      CheckNewAnswer((0, 4)),
+      assertNumStateRows(Seq(3, 10)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
   }
 
   test("join -> window agg, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val input1 = MemoryStream[Int]
-      val inputDF1 = input1.toDF
-        .withColumnRenamed("value", "value1")
-        .withColumn("eventTime1", timestamp_seconds($"value1"))
-        .withWatermark("eventTime1", "0 seconds")
-
-      val input2 = MemoryStream[Int]
-      val inputDF2 = input2.toDF
-        .withColumnRenamed("value", "value2")
-        .withColumn("eventTime2", timestamp_seconds($"value2"))
-        .withWatermark("eventTime2", "0 seconds")
-
-      val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), "inner")
-        .groupBy(window($"eventTime1", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[Int]
+    val inputDF2 = input2.toDF
+      .withColumnRenamed("value", "value2")
+      .withColumn("eventTime2", timestamp_seconds($"value2"))
+      .withWatermark("eventTime2", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), "inner")
+      .groupBy(window($"eventTime1", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(stream)(
+      MultiAddData(input1, 1 to 4: _*)(input2, 1 to 4: _*),
+
+      // op1 W (0, 0)
+      // join output: (1, 1), (2, 2), (3, 3), (4, 4)
+      // state: (1, 1), (2, 2), (3, 3), (4, 4)
+      // op2 W (0, 0)
+      // agg: [0, 5) 4
+      // output: None
+      // state: [0, 5) 4
+
+      // no-data batch triggered
+
+      // op1 W (0, 4)
+      // join output: None
+      // state: None
+      // op2 W (0, 4)
+      // agg: None
+      // output: None
+      // state: [0, 5) 4
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0)),
+
+      // Move the watermark
+      MultiAddData(input1, 5)(input2, 5),
+
+      // op1 W (4, 4)
+      // join output: (5, 5)
+      // state: (5, 5)
+      // op2 W (4, 4)
+      // agg: [5, 10) 1
+      // output: None
+      // state: [0, 5) 4, [5, 10) 1
+
+      // no-data batch triggered
+
+      // op1 W (4, 5)
+      // join output: None
+      // state: None
+      // op2 W (4, 5)
+      // agg: None
+      // output: [0, 5) 4
+      // state: [5, 10) 1
+      CheckNewAnswer((0, 4)),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+  }
 
+  test("aggregation -> stream deduplication, append mode") {
+    val inputData = MemoryStream[Int]
+
+    val aggStream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .withColumn("windowEnd", expr("window.end"))
+
+    // dropDuplicates from aggStream without event time column for dropDuplicates - the
+    // state does not get trimmed due to watermark advancement.
+    val dedupNoEventTime = aggStream
+      .dropDuplicates("count", "windowEnd")
+      .select(
+        $"windowEnd".cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(dedupNoEventTime)(
+      AddData(inputData, 1, 5, 10, 15),
+
+      // op1 W (0, 0)
+      // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // output: None
+      // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // op2 W (0, 0)
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 15)
+      // agg: None
+      // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
+      // state: [15, 20) 1
+      // op2 W (0, 15)
+      // output: (5, 1), (10, 1), (15, 1)
+      // state: (5, 1), (10, 1), (15, 1)
+
+      CheckNewAnswer((5, 1), (10, 1), (15, 1)),
+      assertNumStateRows(Seq(3, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+
+    // Similar to the above but add event time. The dedup state will get trimmed.
+    val dedupWithEventTime = aggStream
+      .withColumn("windowTime", expr("window_time(window)"))
+      .withColumn("windowTimeMicros", expr("unix_micros(windowTime)"))
+      .dropDuplicates("count", "windowEnd", "windowTime")
+      .select(
+        $"windowEnd".cast("long").as[Long],
+        $"windowTimeMicros".cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(dedupWithEventTime)(
+      AddData(inputData, 1, 5, 10, 15),
+
+      // op1 W (0, 0)
+      // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // output: None
+      // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // op2 W (0, 0)
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 15)
+      // agg: None
+      // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
+      // state: [15, 20) 1
+      // op2 W (0, 15)
+      // output: (5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)
+      // state: None - trimmed by watermark
+
+      CheckNewAnswer((5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+  }
+
+  // TODO: This test should be modified or deleted after
+  // we support stream-stream join followed by aggregation
+  test("join on time interval -> window agg, append mode, should fail") {
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[(Int, Int)]
+    val inputDF2 = input2.toDS().toDF("start", "end")
+      .withColumn("eventTime2Start", timestamp_seconds($"start"))
+      .withColumn("eventTime2End", timestamp_seconds($"end"))
+      .withColumn("start2", timestamp_seconds($"start"))
+      .withWatermark("eventTime2Start", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2,
+      expr("eventTime1 >= eventTime2Start AND eventTime1 < eventTime2End " +
+        "AND eventTime1 = start2"), "inner")
+      .groupBy(window($"eventTime1", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    var excpetionThrown: Boolean = false
+    try {
       testStream(stream)(
-        MultiAddData(input1, 1 to 4: _*)(input2, 1 to 4: _*),
-
-        // op1 W (0, 0)
-        // join output: (1, 1), (2, 2), (3, 3), (4, 4)
-        // state: (1, 1), (2, 2), (3, 3), (4, 4)
-        // op2 W (0, 0)
-        // agg: [0, 5) 4
-        // output: None
-        // state: [0, 5) 4
-
-        // no-data batch triggered
-
-        // op1 W (0, 4)
-        // join output: None
-        // state: None
-        // op2 W (0, 4)
-        // agg: None
-        // output: None
-        // state: [0, 5) 4
-        CheckNewAnswer(),
-        assertNumStateRows(Seq(1, 0)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0)),
-
-        // Move the watermark
-        MultiAddData(input1, 5)(input2, 5),
-
-        // op1 W (4, 4)
-        // join output: (5, 5)
-        // state: (5, 5)
-        // op2 W (4, 4)
-        // agg: [5, 10) 1
-        // output: None
-        // state: [0, 5) 4, [5, 10) 1
-
-        // no-data batch triggered
-
-        // op1 W (4, 5)
-        // join output: None
-        // state: None
-        // op2 W (4, 5)
-        // agg: None
-        // output: [0, 5) 4
-        // state: [5, 10) 1
-        CheckNewAnswer((0, 4)),
-        assertNumStateRows(Seq(1, 0)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
+        StartStream()
       )
+    } catch {
+      case t: AnalysisException =>
+        assert(t.getMessage.contains("stream-stream interval join " +
+          "followed by any stateful operator is not supported yet"))
+        excpetionThrown = true
     }
+    assert(excpetionThrown)
   }
 
-  test("aggregation -> stream deduplication, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val aggStream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .withColumn("windowEnd", expr("window.end"))
-
-      // dropDuplicates from aggStream without event time column for dropDuplicates - the
-      // state does not get trimmed due to watermark advancement.
-      val dedupNoEventTime = aggStream
-        .dropDuplicates("count", "windowEnd")
-        .select(
-          $"windowEnd".cast("long").as[Long],
-          $"count".as[Long])
-
-      testStream(dedupNoEventTime)(
-        AddData(inputData, 1, 5, 10, 15),
-
-        // op1 W (0, 0)
-        // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
-        // output: None
-        // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
-        // op2 W (0, 0)
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 15)
-        // agg: None
-        // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
-        // state: [15, 20) 1
-        // op2 W (0, 15)
-        // output: (5, 1), (10, 1), (15, 1)
-        // state: (5, 1), (10, 1), (15, 1)
-
-        CheckNewAnswer((5, 1), (10, 1), (15, 1)),
-        assertNumStateRows(Seq(3, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
+  // TODO: This test should be modified or deleted after
+  // we support stream-stream join followed by aggregation
+  test("join with range join on non-time intervals -> window agg, append mode, shouldn't fail") {
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withColumn("v1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[(Int, Int)]
+    val inputDF2 = input2.toDS().toDF("start", "end")
+      .withColumn("eventTime2Start", timestamp_seconds($"start"))
+      .withColumn("start2", timestamp_seconds($"start"))
+      .withColumn("end2", timestamp_seconds($"end"))
+      .withWatermark("eventTime2Start", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2,
+      expr("v1 >= start2 AND v1 < end2 " +
+        "AND eventTime1 = start2"), "inner")
+      .groupBy(window($"eventTime1", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(stream)(
+      AddData(input1, 1, 2, 3, 4),
+      AddData(input2, (1, 2), (2, 3), (3, 4), (4, 5)),
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+  }
 
-      // Similar to the above but add event time. The dedup state will get trimmed.
-      val dedupWithEventTime = aggStream
-        .withColumn("windowTime", expr("window_time(window)"))
-        .withColumn("windowTimeMicros", expr("unix_micros(windowTime)"))
-        .dropDuplicates("count", "windowEnd", "windowTime")
-        .select(
-          $"windowEnd".cast("long").as[Long],
-          $"windowTimeMicros".cast("long").as[Long],
-          $"count".as[Long])
-
-      testStream(dedupWithEventTime)(
-        AddData(inputData, 1, 5, 10, 15),
-
-        // op1 W (0, 0)
-        // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
-        // output: None
-        // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
-        // op2 W (0, 0)
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 15)
-        // agg: None
-        // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
-        // state: [15, 20) 1
-        // op2 W (0, 15)
-        // output: (5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)
-        // state: None - trimmed by watermark
-
-        CheckNewAnswer((5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+  // TODO: This test should be modified or deleted after

Review Comment:
   This is just obvious and should have covered in stream-stream join suite. We can remove the test case.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##########
@@ -40,378 +40,465 @@ class MultiStatefulOperatorsSuite
   }
 
   test("window agg -> window agg, append mode") {
-    // TODO: SPARK-40940 - Fix the unsupported ops checker to allow chaining of stateful ops.
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val stream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .groupBy(window($"window", "10 seconds"))
-        .agg(count("*").as("count"), sum("count").as("sum"))
-        .select($"window".getField("start").cast("long").as[Long],
-          $"count".as[Long], $"sum".as[Long])
-
-      testStream(stream)(
-        AddData(inputData, 10 to 21: _*),
-        // op1 W (0, 0)
-        // agg: [10, 15) 5, [15, 20) 5, [20, 25) 2
-        // output: None
-        // state: [10, 15) 5, [15, 20) 5, [20, 25) 2
-        // op2 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 21)
-        // agg: None
-        // output: [10, 15) 5, [15, 20) 5
-        // state: [20, 25) 2
-        // op2 W (0, 21)
-        // agg: [10, 20) (2, 10)
-        // output: [10, 20) (2, 10)
-        // state: None
-        CheckNewAnswer((10, 2, 10)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0)),
-
-        AddData(inputData, 10 to 29: _*),
-        // op1 W (21, 21)
-        // agg: [10, 15) 5 - late, [15, 20) 5 - late, [20, 25) 5, [25, 30) 5
-        // output: None
-        // state: [20, 25) 7, [25, 30) 5
-        // op2 W (21, 21)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (21, 29)
-        // agg: None
-        // output: [20, 25) 7
-        // state: [25, 30) 5
-        // op2 W (21, 29)
-        // agg: [20, 30) (1, 7)
-        // output: None
-        // state: [20, 30) (1, 7)
-        CheckNewAnswer(),
-        assertNumStateRows(Seq(1, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 2)),
-
-        // Move the watermark.
-        AddData(inputData, 30, 31),
-        // op1 W (29, 29)
-        // agg: [30, 35) 2
-        // output: None
-        // state: [25, 30) 5 [30, 35) 2
-        // op2 W (29, 29)
-        // agg: None
-        // output: None
-        // state: [20, 30) (1, 7)
-
-        // no-data batch triggered
-
-        // op1 W (29, 31)
-        // agg: None
-        // output: [25, 30) 5
-        // state: [30, 35) 2
-        // op2 W (29, 31)
-        // agg: [20, 30) (2, 12)
-        // output: [20, 30) (2, 12)
-        // state: None
-        CheckNewAnswer((20, 2, 12)),
-        assertNumStateRows(Seq(0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val stream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .groupBy(window($"window", "10 seconds"))
+      .agg(count("*").as("count"), sum("count").as("sum"))
+      .select($"window".getField("start").cast("long").as[Long],
+        $"count".as[Long], $"sum".as[Long])
+
+    testStream(stream)(
+      AddData(inputData, 10 to 21: _*),
+      // op1 W (0, 0)
+      // agg: [10, 15) 5, [15, 20) 5, [20, 25) 2
+      // output: None
+      // state: [10, 15) 5, [15, 20) 5, [20, 25) 2
+      // op2 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 21)
+      // agg: None
+      // output: [10, 15) 5, [15, 20) 5
+      // state: [20, 25) 2
+      // op2 W (0, 21)
+      // agg: [10, 20) (2, 10)
+      // output: [10, 20) (2, 10)
+      // state: None
+      CheckNewAnswer((10, 2, 10)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0)),
+
+      AddData(inputData, 10 to 29: _*),
+      // op1 W (21, 21)
+      // agg: [10, 15) 5 - late, [15, 20) 5 - late, [20, 25) 5, [25, 30) 5
+      // output: None
+      // state: [20, 25) 7, [25, 30) 5
+      // op2 W (21, 21)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (21, 29)
+      // agg: None
+      // output: [20, 25) 7
+      // state: [25, 30) 5
+      // op2 W (21, 29)
+      // agg: [20, 30) (1, 7)
+      // output: None
+      // state: [20, 30) (1, 7)
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 2)),
+
+      // Move the watermark.
+      AddData(inputData, 30, 31),
+      // op1 W (29, 29)
+      // agg: [30, 35) 2
+      // output: None
+      // state: [25, 30) 5 [30, 35) 2
+      // op2 W (29, 29)
+      // agg: None
+      // output: None
+      // state: [20, 30) (1, 7)
+
+      // no-data batch triggered
+
+      // op1 W (29, 31)
+      // agg: None
+      // output: [25, 30) 5
+      // state: [30, 35) 2
+      // op2 W (29, 31)
+      // agg: [20, 30) (2, 12)
+      // output: [20, 30) (2, 12)
+      // state: None
+      CheckNewAnswer((20, 2, 12)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
   }
 
   test("agg -> agg -> agg, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val stream = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "0 seconds")
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .groupBy(window(window_time($"window"), "10 seconds"))
-        .agg(count("*").as("count"), sum("count").as("sum"))
-        .groupBy(window(window_time($"window"), "20 seconds"))
-        .agg(count("*").as("count"), sum("sum").as("sum"))
-        .select(
-          $"window".getField("start").cast("long").as[Long],
-          $"window".getField("end").cast("long").as[Long],
-          $"count".as[Long], $"sum".as[Long])
-
-      testStream(stream)(
-        AddData(inputData, 0 to 37: _*),
-        // op1 W (0, 0)
-        // agg: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5,
-        //   [35, 40) 3
-        // output: None
-        // state: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5,
-        //   [35, 40) 3
-        // op2 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-        // op3 W (0, 0)
-        // agg: None
-        // output: None
-        // state: None
-
-        // no-data batch triggered
-
-        // op1 W (0, 37)
-        // agg: None
-        // output: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5
-        // state: [35, 40) 3
-        // op2 W (0, 37)
-        // agg: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10), [30, 40) (1, 5)
-        // output: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10)
-        // state: [30, 40) (1, 5)
-        // op3 W (0, 37)
-        // agg: [0, 20) (2, 20), [20, 40) (1, 10)
-        // output: [0, 20) (2, 20)
-        // state: [20, 40) (1, 10)
-        CheckNewAnswer((0, 20, 2, 20)),
-        assertNumStateRows(Seq(1, 1, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0, 0)),
-
-        AddData(inputData, 30 to 60: _*),
-        // op1 W (37, 37)
-        // dropped rows: [30, 35), 1 row <= note that 35, 36, 37 are still in effect
-        // agg: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 65) 1
-        // output: None
-        // state: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 65) 1
-        // op2 W (37, 37)
-        // output: None
-        // state: [30, 40) (1, 5)
-        // op3 W (37, 37)
-        // output: None
-        // state: [20, 40) (1, 10)
-
-        // no-data batch
-        // op1 W (37, 60)
-        // output: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5
-        // state: [60, 65) 1
-        // op2 W (37, 60)
-        // agg: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
-        // output: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
-        // state: None
-        // op3 W (37, 60)
-        // agg: [20, 40) (2, 23), [40, 60) (2, 20)
-        // output: [20, 40) (2, 23), [40, 60) (2, 20)
-        // state: None
-
-        CheckNewAnswer((20, 40, 2, 23), (40, 60, 2, 20)),
-        assertNumStateRows(Seq(0, 0, 1)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0, 1))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val stream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .groupBy(window(window_time($"window"), "10 seconds"))
+      .agg(count("*").as("count"), sum("count").as("sum"))
+      .groupBy(window(window_time($"window"), "20 seconds"))
+      .agg(count("*").as("count"), sum("sum").as("sum"))
+      .select(
+        $"window".getField("start").cast("long").as[Long],
+        $"window".getField("end").cast("long").as[Long],
+        $"count".as[Long], $"sum".as[Long])
+
+    testStream(stream)(
+      AddData(inputData, 0 to 37: _*),
+      // op1 W (0, 0)
+      // agg: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5,
+      //   [35, 40) 3
+      // output: None
+      // state: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5,
+      //   [35, 40) 3
+      // op2 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+      // op3 W (0, 0)
+      // agg: None
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 37)
+      // agg: None
+      // output: [0, 5) 5, [5, 10) 5, [10, 15) 5, [15, 20) 5, [20, 25) 5, [25, 30) 5, [30, 35) 5
+      // state: [35, 40) 3
+      // op2 W (0, 37)
+      // agg: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10), [30, 40) (1, 5)
+      // output: [0, 10) (2, 10), [10, 20) (2, 10), [20, 30) (2, 10)
+      // state: [30, 40) (1, 5)
+      // op3 W (0, 37)
+      // agg: [0, 20) (2, 20), [20, 40) (1, 10)
+      // output: [0, 20) (2, 20)
+      // state: [20, 40) (1, 10)
+      CheckNewAnswer((0, 20, 2, 20)),
+      assertNumStateRows(Seq(1, 1, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0, 0)),
+
+      AddData(inputData, 30 to 60: _*),
+      // op1 W (37, 37)
+      // dropped rows: [30, 35), 1 row <= note that 35, 36, 37 are still in effect
+      // agg: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 65) 1
+      // output: None
+      // state: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5, [60, 65) 1
+      // op2 W (37, 37)
+      // output: None
+      // state: [30, 40) (1, 5)
+      // op3 W (37, 37)
+      // output: None
+      // state: [20, 40) (1, 10)
+
+      // no-data batch
+      // op1 W (37, 60)
+      // output: [35, 40) 8, [40, 45) 5, [45, 50) 5, [50, 55) 5, [55, 60) 5
+      // state: [60, 65) 1
+      // op2 W (37, 60)
+      // agg: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
+      // output: [30, 40) (2, 13), [40, 50) (2, 10), [50, 60), (2, 10)
+      // state: None
+      // op3 W (37, 60)
+      // agg: [20, 40) (2, 23), [40, 60) (2, 20)
+      // output: [20, 40) (2, 23), [40, 60) (2, 20)
+      // state: None
+
+      CheckNewAnswer((20, 40, 2, 23), (40, 60, 2, 20)),
+      assertNumStateRows(Seq(0, 0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0, 1))
+    )
   }
 
   test("stream deduplication -> aggregation, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val inputData = MemoryStream[Int]
-
-      val deduplication = inputData.toDF()
-        .withColumn("eventTime", timestamp_seconds($"value"))
-        .withWatermark("eventTime", "10 seconds")
-        .dropDuplicates("value", "eventTime")
-
-      val windowedAggregation = deduplication
-        .groupBy(window($"eventTime", "5 seconds").as("window"))
-        .agg(count("*").as("count"), sum("value").as("sum"))
-        .select($"window".getField("start").cast("long").as[Long],
-          $"count".as[Long])
-
-      testStream(windowedAggregation)(
-        AddData(inputData, 1 to 15: _*),
-        // op1 W (0, 0)
-        // input: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // deduplicated: None
-        // output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // state: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // op2 W (0, 0)
-        // agg: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
-        // output: None
-        // state: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
-
-        // no-data batch triggered
-
-        // op1 W (0, 5)
-        // agg: None
-        // output: None
-        // state: 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
-        // op2 W (0, 5)
-        // agg: None
-        // output: [0, 5) 4
-        // state: [5, 10) 5 [10, 15) 5, [15, 20) 1
-        CheckNewAnswer((0, 4)),
-        assertNumStateRows(Seq(3, 10)),
-        assertNumRowsDroppedByWatermark(Seq(0, 0))
-      )
-    }
+    val inputData = MemoryStream[Int]
+
+    val deduplication = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "10 seconds")
+      .dropDuplicates("value", "eventTime")
+
+    val windowedAggregation = deduplication
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"), sum("value").as("sum"))
+      .select($"window".getField("start").cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(windowedAggregation)(
+      AddData(inputData, 1 to 15: _*),
+      // op1 W (0, 0)
+      // input: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // deduplicated: None
+      // output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // state: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // op2 W (0, 0)
+      // agg: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
+      // output: None
+      // state: [0, 5) 4, [5, 10) 5 [10, 15) 5, [15, 20) 1
+
+      // no-data batch triggered
+
+      // op1 W (0, 5)
+      // agg: None
+      // output: None
+      // state: 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+      // op2 W (0, 5)
+      // agg: None
+      // output: [0, 5) 4
+      // state: [5, 10) 5 [10, 15) 5, [15, 20) 1
+      CheckNewAnswer((0, 4)),
+      assertNumStateRows(Seq(3, 10)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
   }
 
   test("join -> window agg, append mode") {
-    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
-      val input1 = MemoryStream[Int]
-      val inputDF1 = input1.toDF
-        .withColumnRenamed("value", "value1")
-        .withColumn("eventTime1", timestamp_seconds($"value1"))
-        .withWatermark("eventTime1", "0 seconds")
-
-      val input2 = MemoryStream[Int]
-      val inputDF2 = input2.toDF
-        .withColumnRenamed("value", "value2")
-        .withColumn("eventTime2", timestamp_seconds($"value2"))
-        .withWatermark("eventTime2", "0 seconds")
-
-      val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), "inner")
-        .groupBy(window($"eventTime1", "5 seconds").as("window"))
-        .agg(count("*").as("count"))
-        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[Int]
+    val inputDF2 = input2.toDF
+      .withColumnRenamed("value", "value2")
+      .withColumn("eventTime2", timestamp_seconds($"value2"))
+      .withWatermark("eventTime2", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), "inner")
+      .groupBy(window($"eventTime1", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(stream)(
+      MultiAddData(input1, 1 to 4: _*)(input2, 1 to 4: _*),
+
+      // op1 W (0, 0)
+      // join output: (1, 1), (2, 2), (3, 3), (4, 4)
+      // state: (1, 1), (2, 2), (3, 3), (4, 4)
+      // op2 W (0, 0)
+      // agg: [0, 5) 4
+      // output: None
+      // state: [0, 5) 4
+
+      // no-data batch triggered
+
+      // op1 W (0, 4)
+      // join output: None
+      // state: None
+      // op2 W (0, 4)
+      // agg: None
+      // output: None
+      // state: [0, 5) 4
+      CheckNewAnswer(),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0)),
+
+      // Move the watermark
+      MultiAddData(input1, 5)(input2, 5),
+
+      // op1 W (4, 4)
+      // join output: (5, 5)
+      // state: (5, 5)
+      // op2 W (4, 4)
+      // agg: [5, 10) 1
+      // output: None
+      // state: [0, 5) 4, [5, 10) 1
+
+      // no-data batch triggered
+
+      // op1 W (4, 5)
+      // join output: None
+      // state: None
+      // op2 W (4, 5)
+      // agg: None
+      // output: [0, 5) 4
+      // state: [5, 10) 1
+      CheckNewAnswer((0, 4)),
+      assertNumStateRows(Seq(1, 0)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+  }
 
+  test("aggregation -> stream deduplication, append mode") {
+    val inputData = MemoryStream[Int]
+
+    val aggStream = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 seconds")
+      .groupBy(window($"eventTime", "5 seconds").as("window"))
+      .agg(count("*").as("count"))
+      .withColumn("windowEnd", expr("window.end"))
+
+    // dropDuplicates from aggStream without event time column for dropDuplicates - the
+    // state does not get trimmed due to watermark advancement.
+    val dedupNoEventTime = aggStream
+      .dropDuplicates("count", "windowEnd")
+      .select(
+        $"windowEnd".cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(dedupNoEventTime)(
+      AddData(inputData, 1, 5, 10, 15),
+
+      // op1 W (0, 0)
+      // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // output: None
+      // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // op2 W (0, 0)
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 15)
+      // agg: None
+      // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
+      // state: [15, 20) 1
+      // op2 W (0, 15)
+      // output: (5, 1), (10, 1), (15, 1)
+      // state: (5, 1), (10, 1), (15, 1)
+
+      CheckNewAnswer((5, 1), (10, 1), (15, 1)),
+      assertNumStateRows(Seq(3, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+
+    // Similar to the above but add event time. The dedup state will get trimmed.
+    val dedupWithEventTime = aggStream
+      .withColumn("windowTime", expr("window_time(window)"))
+      .withColumn("windowTimeMicros", expr("unix_micros(windowTime)"))
+      .dropDuplicates("count", "windowEnd", "windowTime")
+      .select(
+        $"windowEnd".cast("long").as[Long],
+        $"windowTimeMicros".cast("long").as[Long],
+        $"count".as[Long])
+
+    testStream(dedupWithEventTime)(
+      AddData(inputData, 1, 5, 10, 15),
+
+      // op1 W (0, 0)
+      // agg: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // output: None
+      // state: [0, 5) 1, [5, 10) 1, [10, 15) 1, [15, 20) 1
+      // op2 W (0, 0)
+      // output: None
+      // state: None
+
+      // no-data batch triggered
+
+      // op1 W (0, 15)
+      // agg: None
+      // output: [0, 5) 1, [5, 10) 1, [10, 15) 1
+      // state: [15, 20) 1
+      // op2 W (0, 15)
+      // output: (5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)
+      // state: None - trimmed by watermark
+
+      CheckNewAnswer((5, 4999999, 1), (10, 9999999, 1), (15, 14999999, 1)),
+      assertNumStateRows(Seq(0, 1)),
+      assertNumRowsDroppedByWatermark(Seq(0, 0))
+    )
+  }
+
+  // TODO: This test should be modified or deleted after
+  // we support stream-stream join followed by aggregation
+  test("join on time interval -> window agg, append mode, should fail") {
+    val input1 = MemoryStream[Int]
+    val inputDF1 = input1.toDF
+      .withColumnRenamed("value", "value1")
+      .withColumn("eventTime1", timestamp_seconds($"value1"))
+      .withWatermark("eventTime1", "0 seconds")
+
+    val input2 = MemoryStream[(Int, Int)]
+    val inputDF2 = input2.toDS().toDF("start", "end")
+      .withColumn("eventTime2Start", timestamp_seconds($"start"))
+      .withColumn("eventTime2End", timestamp_seconds($"end"))
+      .withColumn("start2", timestamp_seconds($"start"))
+      .withWatermark("eventTime2Start", "0 seconds")
+
+    val stream = inputDF1.join(inputDF2,
+      expr("eventTime1 >= eventTime2Start AND eventTime1 < eventTime2End " +
+        "AND eventTime1 = start2"), "inner")
+      .groupBy(window($"eventTime1", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    var excpetionThrown: Boolean = false

Review Comment:
   Shall we try `intercept` instead?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+    case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
+      hasEventTimeColNeq(neq)
+    case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+    val exp = neq.asInstanceOf[BinaryComparison]
+    hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+    exps.exists {
+      case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
+      case _ => false
+    }
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+    plan match {
+      case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+        left.isStreaming && right.isStreaming
+        otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+      case _ => false
+    }
+  }
+
   /**
    * Checks for possible correctness issue in chained stateful operators. The behavior is
    * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
    * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
    * print a warning message.
    */
   def checkStreamingQueryGlobalWatermarkLimit(
-      plan: LogicalPlan,
-      outputMode: OutputMode): Unit = {
+      plan: LogicalPlan): Unit = {

Review Comment:
   nit: could be combined with above line now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org