You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2022/11/19 13:42:57 UTC

[spark] branch master updated: [SPARK-41199][SS] Fix metrics issue when DSv1 streaming source and DSv2 streaming source are co-used

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 25849684b78 [SPARK-41199][SS] Fix metrics issue when DSv1 streaming source and DSv2 streaming source are co-used
25849684b78 is described below

commit 25849684b78cca6651e25d6efc9644a576e7e20f
Author: Jungtaek Lim <ka...@gmail.com>
AuthorDate: Sat Nov 19 22:42:26 2022 +0900

    [SPARK-41199][SS] Fix metrics issue when DSv1 streaming source and DSv2 streaming source are co-used
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to fix the metrics issue for streaming query when DSv1 streaming source and DSv2 streaming source are co-used. If the streaming query has both DSv1 streaming source and DSv2 streaming source, only DSv1 streaming source produced correct metrics.
    
    There is a bug in ProgressReporter that it tries to match logical node for DSv2 streaming source with OffsetHolder (association map has OffsetHolder instances for DSv2 streaming sources), which will be never matched. Given that physical node for DSv2 streaming source contains both source information and metrics, we can simply deduce all the necessary information from the physical node rather than trying to find the source from association map.
    
    ### Why are the changes needed?
    
    The logic of collecting metrics does not collect metrics for DSv2 streaming sources properly.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New test case.
    
    Closes #38719 from HeartSaVioR/SPARK-41999.
    
    Authored-by: Jungtaek Lim <ka...@gmail.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../sql/execution/streaming/ProgressReporter.scala |  9 +++-
 .../spark/sql/streaming/StreamingQuerySuite.scala  | 56 +++++++++++++++++++++-
 2 files changed, 63 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 8a89ca7b85d..a4c975861c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -345,7 +345,14 @@ trait ProgressReporter extends Logging {
       val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
       if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
         val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
-          case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep -> source }
+          case (_, ep: MicroBatchScanExec) =>
+            // SPARK-41199: `logicalPlanLeafToSource` contains OffsetHolder instance for DSv2
+            // streaming source, hence we cannot lookup the actual source from the map.
+            // The physical node for DSv2 streaming source contains the information of the source
+            // by itself, so leverage it.
+            Some(ep -> ep.stream)
+          case (lp, ep) =>
+            logicalPlanLeafToSource.get(lp).map { source => ep -> source }
         }
         val sourceToInputRowsTuples = execLeafToSource.map { case (execLeaf, source) =>
           val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 090a2081219..71eb4c15701 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar
 
 import org.apache.spark.{SparkException, TestUtils}
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row}
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
@@ -657,6 +657,60 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     )
   }
 
+  test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources") {
+    withTable("parquet_streaming_tbl") {
+      val streamInput = MemoryStream[Int]
+      val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS value_stream")
+
+      spark.sql(
+        """
+          |CREATE TABLE parquet_streaming_tbl
+          |(
+          |  key integer,
+          |  value_stream integer
+          |)
+          |USING parquet
+          |""".stripMargin)
+
+      val streamDf2 = spark.readStream.table("parquet_streaming_tbl")
+      val unionedDf = streamDf.union(streamDf2)
+
+      val clock = new StreamManualClock()
+      testStream(unionedDf)(
+        StartStream(triggerClock = clock, trigger = Trigger.ProcessingTime(100)),
+        AddData(streamInput, 1, 2, 3),
+        Execute { _ =>
+          spark.range(4, 6).selectExpr("id AS key", "id AS value_stream")
+            .write.format("parquet").mode(SaveMode.Append).saveAsTable("parquet_streaming_tbl")
+        },
+        AdvanceManualClock(150),
+        waitUntilBatchProcessed(clock),
+        CheckLastBatch((1, 1), (2, 2), (3, 3), (4, 4), (5, 5)),
+        AssertOnQuery { q =>
+          val lastProgress = getLastProgressWithData(q)
+          assert(lastProgress.nonEmpty)
+          assert(lastProgress.get.numInputRows == 5)
+          assert(lastProgress.get.sources.length == 2)
+          assert(lastProgress.get.sources(0).numInputRows == 3)
+          assert(lastProgress.get.sources(1).numInputRows == 2)
+          true
+        }
+      )
+    }
+  }
+
+  private def waitUntilBatchProcessed(clock: StreamManualClock) = AssertOnQuery { q =>
+    eventually(Timeout(streamingTimeout)) {
+      if (!q.exception.isDefined) {
+        assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+      }
+    }
+    if (q.exception.isDefined) {
+      throw q.exception.get
+    }
+    true
+  }
+
   testQuietly("StreamExecution metadata garbage collection") {
     val inputData = MemoryStream[Int]
     val mapped = inputData.toDS().map(6 / _)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org