You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2022/11/20 21:45:08 UTC

[spark] branch master updated: [SPARK-41198][SS] Fix metrics in streaming query having CTE and DSv1 streaming source

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1669af118ad [SPARK-41198][SS] Fix metrics in streaming query having CTE and DSv1 streaming source
1669af118ad is described below

commit 1669af118adb02fcdc409510d6d8d79f93abae61
Author: Jungtaek Lim <ka...@gmail.com>
AuthorDate: Mon Nov 21 06:44:11 2022 +0900

    [SPARK-41198][SS] Fix metrics in streaming query having CTE and DSv1 streaming source
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to fix the broken metrics when the streaming query has CTE and DSv1 streaming source, via applying InlineCTE manually against analyzed plan when collecting metrics.
    
    Suppose a streaming query contains below part as batch side which is joined with streaming source:
    
    ```
    with batch_tbl as (
      SELECT col1, col2 FROM parquet_tbl
    )
    
    SELECT col1 AS key, col2 as value_batch FROM batch_tbl
    ```
    
    Currently, Spark adds WithCTE node with CTERelationDef and CTERelationRef when there is a usage of CTE. Below is an analyzed plan:
    
    ```
    WriteToMicroBatchDataSource MemorySink, 2cbb2afa-6513-4a23-b4c2-37910fc9cdf9, Append, 0
    +- Project [key#15, value_stream#16, value_batch#9L]
       +- Join Inner, (cast(key#15 as bigint) = key#8L)
          :- SubqueryAlias spark_catalog.default.parquet_streaming_tbl
          :  +- Project [key#55 AS key#15, value_stream#56 AS value_stream#16]
          :     +- Relation spark_catalog.default.parquet_streaming_tbl[key#55,value_stream#56] parquet
          +- WithCTE
             :- CTERelationDef 0, false
             :  +- SubqueryAlias batch_tbl
             :     +- Project [col1#10L, col2#11L]
             :        +- SubqueryAlias spark_catalog.default.parquet_tbl
             :           +- Relation spark_catalog.default.parquet_tbl[col1#10L,col2#11L] parquet
             +- Project [col1#10L AS key#8L, col2#11L AS value_batch#9L]
                +- SubqueryAlias batch_tbl
                   +- CTERelationRef 0, true, [col1#10L, col2#11L]
    ```
    
    Here, there are 3 leaf nodes in the plan, but the actual sources in the leaf nodes are 2. During the optimization, inlining CTE happens and there are 2 leaf nodes. Below is the optimized plan:
    
    ```
    WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite622c7c7f]
    +- Project [key#55, value_stream#56, value_batch#9L]
       +- Join Inner, (cast(key#55 as bigint) = key#8L)
          :- Filter isnotnull(key#55)
          :  +- Relation spark_catalog.default.parquet_streaming_tbl[key#55,value_stream#56] parquet
          +- Project [col1#10L AS key#8L, col2#11L AS value_batch#9L]
             +- Filter isnotnull(col1#10L)
                +- Relation spark_catalog.default.parquet_tbl[col1#10L,col2#11L] parquet
    ```
    
    Hence executed plan will also have 2 leaf nodes, which does not match with the number of leaf nodes in analyzed plan, and ProgressReporter will give up collecting metrics.
    
    Applying InlineCTE against analyzed plan during collecting metrics would resolve this. For example, below is the logical plan which applies InlineCTE against above analyzed plan.
    
    ```
    WriteToMicroBatchDataSource MemorySink, 2cbb2afa-6513-4a23-b4c2-37910fc9cdf9, Append, 0
    +- Project [key#15, value_stream#16, value_batch#9L]
       +- Join Inner, (cast(key#15 as bigint) = key#8L)
          :- SubqueryAlias spark_catalog.default.parquet_streaming_tbl
          :  +- Project [key#55 AS key#15, value_stream#56 AS value_stream#16]
          :     +- Relation spark_catalog.default.parquet_streaming_tbl[key#55,value_stream#56] parquet
          +- Project [col1#10L AS key#8L, col2#11L AS value_batch#9L]
             +- SubqueryAlias batch_tbl
                +- SubqueryAlias batch_tbl
                   +- Project [col1#10L, col2#11L]
                      +- SubqueryAlias spark_catalog.default.parquet_tbl
                         +- Relation spark_catalog.default.parquet_tbl[col1#10L,col2#11L] parquet
    ```
    
    Note that this is only required for the case where there is at least one of DSv1 streaming source in the streaming query. If streaming query only contains DSv2 data sources as streaming sources, ProgressReporter can just pick up dedicated physical node(s) from executed plan.
    
    ### Why are the changes needed?
    
    The metrics in streaming query are broken if the query contains CTE.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New test case.
    
    Closes #38717 from HeartSaVioR/SPARK-41198.
    
    Authored-by: Jungtaek Lim <ka...@gmail.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../sql/execution/streaming/ProgressReporter.scala | 24 +++++++++-
 .../spark/sql/streaming/StreamingQuerySuite.scala  | 52 ++++++++++++++++++++++
 2 files changed, 74 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index a4c975861c5..3d7b4df7259 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -25,7 +25,8 @@ import scala.collection.mutable
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
+import org.apache.spark.sql.catalyst.optimizer.InlineCTE
+import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan, WithCTE}
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_SECOND
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.catalog.Table
@@ -293,6 +294,19 @@ trait ProgressReporter extends Logging {
       tuples.groupBy(_._1).mapValues(_.map(_._2).sum).toMap // sum up rows for each source
     }
 
+    def unrollCTE(plan: LogicalPlan): LogicalPlan = {
+      val containsCTE = plan.exists {
+        case _: WithCTE => true
+        case _ => false
+      }
+
+      if (containsCTE) {
+        InlineCTE(alwaysInline = true).apply(plan)
+      } else {
+        plan
+      }
+    }
+
     val onlyDataSourceV2Sources = {
       // Check whether the streaming query's logical plan has only V2 micro-batch data sources
       val allStreamingLeaves = logicalPlan.collect {
@@ -341,7 +355,13 @@ trait ProgressReporter extends Logging {
       val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
         logicalPlan.collectLeaves().map { leaf => leaf -> source }
       }
-      val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming
+
+      // SPARK-41198: CTE is inlined in optimization phase, which ends up with having different
+      // number of leaf nodes between (analyzed) logical plan and executed plan. Here we apply
+      // inlining CTE against logical plan manually if there is a CTE node.
+      val finalLogicalPlan = unrollCTE(lastExecution.logical)
+
+      val allLogicalPlanLeaves = finalLogicalPlan.collectLeaves() // includes non-streaming
       val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
       if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
         val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 71eb4c15701..5c162835b12 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -657,6 +657,58 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     )
   }
 
+  test("SPARK-41198: input row calculation with CTE") {
+    withTable("parquet_tbl", "parquet_streaming_tbl") {
+      spark.range(0, 10).selectExpr("id AS col1", "id AS col2")
+        .write.format("parquet").saveAsTable("parquet_tbl")
+
+      val dfWithClause = spark.sql(
+        """
+          |with batch_tbl as (
+          |  SELECT col1, col2 FROM parquet_tbl
+          |)
+          |
+          |SELECT col1 AS key, col2 as value_batch FROM batch_tbl
+          |""".stripMargin)
+
+      spark.sql(
+        """
+          |CREATE TABLE parquet_streaming_tbl
+          |(
+          |  key integer,
+          |  value_stream integer
+          |)
+          |USING parquet
+          |""".stripMargin)
+
+      // NOTE: if we only have DSv2 streaming source(s) as all streaming sources in the query, it
+      // simply collects the corresponding physical nodes from executed plan and does not encounter
+      // the issue. Here we use DSv1 streaming source to reproduce the issue.
+      val streamDf = spark.readStream.table("parquet_streaming_tbl")
+      val joinedDf = streamDf.join(dfWithClause, Seq("key"), "inner")
+
+      val clock = new StreamManualClock()
+      testStream(joinedDf)(
+        StartStream(triggerClock = clock, trigger = Trigger.ProcessingTime(100)),
+        Execute { _ =>
+          spark.range(1, 5).selectExpr("id AS key", "id AS value_stream")
+            .write.format("parquet").mode(SaveMode.Append).saveAsTable("parquet_streaming_tbl")
+        },
+        AdvanceManualClock(150),
+        waitUntilBatchProcessed(clock),
+        CheckLastBatch((1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)),
+        AssertOnQuery { q =>
+          val lastProgress = getLastProgressWithData(q)
+          assert(lastProgress.nonEmpty)
+          assert(lastProgress.get.numInputRows == 4)
+          assert(lastProgress.get.sources.length == 1)
+          assert(lastProgress.get.sources(0).numInputRows == 4)
+          true
+        }
+      )
+    }
+  }
+
   test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources") {
     withTable("parquet_streaming_tbl") {
       val streamInput = MemoryStream[Int]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org