You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2023/09/04 02:42:30 UTC

[spark] branch branch-3.5 updated: [SPARK-45045][SS] Revert back the behavior of idle progress for StreamingQuery API from SPARK-43183

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 44ab0fc0068 [SPARK-45045][SS] Revert back the behavior of idle progress for StreamingQuery API from SPARK-43183
44ab0fc0068 is described below

commit 44ab0fc0068f815c7eddcd34ae4343bbfd97b64d
Author: Jungtaek Lim <ka...@gmail.com>
AuthorDate: Mon Sep 4 11:41:48 2023 +0900

    [SPARK-45045][SS] Revert back the behavior of idle progress for StreamingQuery API from SPARK-43183
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to revert back the behavior of idle progress for StreamingQuery API from [SPARK-43183](https://issues.apache.org/jira/browse/SPARK-43183), to avoid breakage of tests from 3rd party data sources.
    
    ### Why are the changes needed?
    
    We indicated that the behavioral change from SPARK-43183 broke many tests in 3rd party data sources.
    (Short summary of SPARK-43183: we changed the behavior of idle progress to only provide idle event callback, instead of making progress update callback as well as adding progress for StreamingQuery API to provide as recent progresses/last progress.)
    
    The main rationale of SPARK-43183 was to avoid making progress update callback for idle event, which had been confused users. That is more about streaming query listener, and not necessarily had to change the behavior of StreamingQuery API as well.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but the user-facing change is technically reduced before this PR, as we revert back the behavioral change partially from SPARK-43183, which wasn't released yet.
    
    ### How was this patch tested?
    
    Modified tests. Also manually ran 3rd party data source tests which were broken with Spark 3.5.0 RC which succeeded with this change.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42773 from HeartSaVioR/SPARK-45045.
    
    Authored-by: Jungtaek Lim <ka...@gmail.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
    (cherry picked from commit cf0a5cb472efebb4350e48bd82a4f834e8607333)
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../sql/execution/streaming/ProgressReporter.scala | 164 ++++++++++++---------
 .../streaming/StreamingQueryListenerSuite.scala    |   5 +-
 .../StreamingQueryStatusAndProgressSuite.scala     |  41 +++++-
 3 files changed, 135 insertions(+), 75 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 6dbecd186dc..c0bd94e7d6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -89,7 +89,7 @@ trait ProgressReporter extends Logging {
     sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
 
   // The timestamp we report an event that has not executed anything
-  private var lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+  private var lastNoExecutionProgressEventTime = Long.MinValue
 
   private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
   timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
@@ -142,21 +142,37 @@ trait ProgressReporter extends Logging {
     latestStreamProgress = to
   }
 
-  private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
+  private def addNewProgress(newProgress: StreamingQueryProgress): Unit = {
     progressBuffer.synchronized {
       progressBuffer += newProgress
       while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) {
         progressBuffer.dequeue()
       }
     }
+  }
+
+  private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
+    // Reset noDataEventTimestamp if we processed any data
+    lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+
+    addNewProgress(newProgress)
     postEvent(new QueryProgressEvent(newProgress))
     logInfo(s"Streaming query made progress: $newProgress")
   }
 
-  private def postIdleness(): Unit = {
-    postEvent(new QueryIdleEvent(id, runId, formatTimestamp(currentTriggerStartTimestamp)))
-    logInfo(s"Streaming query has been idle and waiting for new data more than " +
-      s"${noDataProgressEventInterval} ms.")
+  private def updateIdleness(newProgress: StreamingQueryProgress): Unit = {
+    val now = triggerClock.getTimeMillis()
+    if (now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime) {
+      addNewProgress(newProgress)
+      if (lastNoExecutionProgressEventTime > Long.MinValue) {
+        postEvent(new QueryIdleEvent(newProgress.id, newProgress.runId,
+          formatTimestamp(currentTriggerStartTimestamp)))
+        logInfo(s"Streaming query has been idle and waiting for new data more than " +
+          s"$noDataProgressEventInterval ms.")
+      }
+
+      lastNoExecutionProgressEventTime = now
+    }
   }
 
   /**
@@ -172,96 +188,102 @@ trait ProgressReporter extends Logging {
       currentTriggerLatestOffsets != null)
     currentTriggerEndTimestamp = triggerClock.getTimeMillis()
 
-    if (hasExecuted) {
-      val executionStats = extractExecutionStats(hasNewData)
-      val processingTimeMills = currentTriggerEndTimestamp - currentTriggerStartTimestamp
-      val processingTimeSec = Math.max(1L, processingTimeMills).toDouble / MILLIS_PER_SECOND
+    val executionStats = extractExecutionStats(hasNewData, hasExecuted)
+    val processingTimeMills = currentTriggerEndTimestamp - currentTriggerStartTimestamp
+    val processingTimeSec = Math.max(1L, processingTimeMills).toDouble / MILLIS_PER_SECOND
 
-      val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
-        (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND
-      } else {
-        Double.PositiveInfinity
-      }
-      logDebug(s"Execution stats: $executionStats")
-
-      val sourceProgress = sources.distinct.map { source =>
-        val numRecords = executionStats.inputRows.getOrElse(source, 0L)
-        val sourceMetrics = source match {
-          case withMetrics: ReportsSourceMetrics =>
-            withMetrics.metrics(Optional.ofNullable(latestStreamProgress.get(source).orNull))
-          case _ => Map[String, String]().asJava
-        }
-        new SourceProgress(
-          description = source.toString,
-          startOffset = currentTriggerStartOffsets.get(source).orNull,
-          endOffset = currentTriggerEndOffsets.get(source).orNull,
-          latestOffset = currentTriggerLatestOffsets.get(source).orNull,
-          numInputRows = numRecords,
-          inputRowsPerSecond = numRecords / inputTimeSec,
-          processedRowsPerSecond = numRecords / processingTimeSec,
-          metrics = sourceMetrics
-        )
-      }
+    val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
+      (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND
+    } else {
+      Double.PositiveInfinity
+    }
+    logDebug(s"Execution stats: $executionStats")
 
-      val sinkOutput = sinkCommitProgress.map(_.numOutputRows)
-      val sinkMetrics = sink match {
-        case withMetrics: ReportsSinkMetrics =>
-          withMetrics.metrics()
+    val sourceProgress = sources.distinct.map { source =>
+      val numRecords = executionStats.inputRows.getOrElse(source, 0L)
+      val sourceMetrics = source match {
+        case withMetrics: ReportsSourceMetrics =>
+          withMetrics.metrics(Optional.ofNullable(latestStreamProgress.get(source).orNull))
         case _ => Map[String, String]().asJava
       }
+      new SourceProgress(
+        description = source.toString,
+        startOffset = currentTriggerStartOffsets.get(source).orNull,
+        endOffset = currentTriggerEndOffsets.get(source).orNull,
+        latestOffset = currentTriggerLatestOffsets.get(source).orNull,
+        numInputRows = numRecords,
+        inputRowsPerSecond = numRecords / inputTimeSec,
+        processedRowsPerSecond = numRecords / processingTimeSec,
+        metrics = sourceMetrics
+      )
+    }
+
+    val sinkOutput = if (hasExecuted) {
+      sinkCommitProgress.map(_.numOutputRows)
+    } else {
+      sinkCommitProgress.map(_ => 0L)
+    }
 
-      val sinkProgress = SinkProgress(
-        sink.toString, sinkOutput, sinkMetrics)
-
-      val observedMetrics = extractObservedMetrics(hasNewData, lastExecution)
-
-      val newProgress = new StreamingQueryProgress(
-        id = id,
-        runId = runId,
-        name = name,
-        timestamp = formatTimestamp(currentTriggerStartTimestamp),
-        batchId = currentBatchId,
-        batchDuration = processingTimeMills,
-        durationMs =
-          new java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).toMap.asJava),
-        eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava),
-        stateOperators = executionStats.stateOperators.toArray,
-        sources = sourceProgress.toArray,
-        sink = sinkProgress,
-        observedMetrics = new java.util.HashMap(observedMetrics.asJava))
-
-      // Reset noDataEventTimestamp if we processed any data
-      lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+    val sinkMetrics = sink match {
+      case withMetrics: ReportsSinkMetrics =>
+        withMetrics.metrics()
+      case _ => Map[String, String]().asJava
+    }
+
+    val sinkProgress = SinkProgress(
+      sink.toString, sinkOutput, sinkMetrics)
+
+    val observedMetrics = extractObservedMetrics(hasNewData, lastExecution)
+
+    val newProgress = new StreamingQueryProgress(
+      id = id,
+      runId = runId,
+      name = name,
+      timestamp = formatTimestamp(currentTriggerStartTimestamp),
+      batchId = currentBatchId,
+      batchDuration = processingTimeMills,
+      durationMs =
+        new java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).toMap.asJava),
+      eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava),
+      stateOperators = executionStats.stateOperators.toArray,
+      sources = sourceProgress.toArray,
+      sink = sinkProgress,
+      observedMetrics = new java.util.HashMap(observedMetrics.asJava))
+
+    if (hasExecuted) {
       updateProgress(newProgress)
     } else {
-      val now = triggerClock.getTimeMillis()
-      if (now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime) {
-        lastNoExecutionProgressEventTime = now
-        postIdleness()
-      }
+      updateIdleness(newProgress)
     }
 
     currentStatus = currentStatus.copy(isTriggerActive = false)
   }
 
   /** Extract statistics about stateful operators from the executed query plan. */
-  private def extractStateOperatorMetrics(): Seq[StateOperatorProgress] = {
-    assert(lastExecution != null, "lastExecution is not available")
+  private def extractStateOperatorMetrics(hasExecuted: Boolean): Seq[StateOperatorProgress] = {
+    if (lastExecution == null) return Nil
+    // lastExecution could belong to one of the previous triggers if `!hasExecuted`.
+    // Walking the plan again should be inexpensive.
     lastExecution.executedPlan.collect {
       case p if p.isInstanceOf[StateStoreWriter] =>
-        p.asInstanceOf[StateStoreWriter].getProgress()
+        val progress = p.asInstanceOf[StateStoreWriter].getProgress()
+        if (hasExecuted) {
+          progress
+        } else {
+          progress.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark = 0)
+        }
     }
   }
 
   /** Extracts statistics from the most recent query execution. */
-  private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
+  private def extractExecutionStats(hasNewData: Boolean, hasExecuted: Boolean): ExecutionStats = {
     val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
     val watermarkTimestamp =
       if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
       else Map.empty[String, String]
 
     // SPARK-19378: Still report metrics even though no data was processed while reporting progress.
-    val stateOperators = extractStateOperatorMetrics()
+    val stateOperators = extractStateOperatorMetrics(hasExecuted)
 
     if (!hasNewData) {
       return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 5b5e8732e0d..52b740bc5c3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -331,9 +331,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
           }
           true
         }
-        // `recentProgress` should not receive any events
+        // `recentProgress` should not receive too many no data events
         actions += AssertOnQuery { q =>
-          q.recentProgress.isEmpty
+          q.recentProgress.size > 1 && q.recentProgress.size <= 11
         }
         testStream(input.toDS)(actions.toSeq: _*)
         spark.sparkContext.listenerBus.waitUntilEmpty()
@@ -524,7 +524,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
         testStream(result)(
           StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock),
           AddData(input, 10),
-          // checkProgressEvent(1),
           AdvanceManualClock(10),
           checkProgressEvent(1),
           AdvanceManualClock(90),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index d016b334627..fa7a3803d05 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -24,11 +24,13 @@ import scala.collection.JavaConverters._
 import org.json4s.jackson.JsonMethods._
 import org.scalatest.concurrent.Eventually
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._
 import org.apache.spark.sql.streaming.StreamingQuerySuite.clock
 import org.apache.spark.sql.streaming.util.StreamManualClock
@@ -286,6 +288,42 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
     }
   }
 
+  test("SPARK-19378: Continue reporting stateOp metrics even if there is no active trigger") {
+    import testImplicits._
+
+    withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10") {
+      val inputData = MemoryStream[Int]
+
+      val query = inputData.toDS().toDF("value")
+        .select($"value")
+        .groupBy($"value")
+        .agg(count("*"))
+        .writeStream
+        .queryName("metric_continuity")
+        .format("memory")
+        .outputMode("complete")
+        .start()
+      try {
+        inputData.addData(1, 2)
+        query.processAllAvailable()
+
+        val progress = query.lastProgress
+        assert(progress.stateOperators.length > 0)
+        // Should emit new progresses every 10 ms, but we could be facing a slow Jenkins
+        eventually(timeout(1.minute)) {
+          val nextProgress = query.lastProgress
+          assert(nextProgress.timestamp !== progress.timestamp)
+          assert(nextProgress.numInputRows === 0)
+          assert(nextProgress.stateOperators.head.numRowsTotal === 2)
+          assert(nextProgress.stateOperators.head.numRowsUpdated === 0)
+          assert(nextProgress.sink.numOutputRows === 0)
+        }
+      } finally {
+        query.stop()
+      }
+    }
+  }
+
   test("SPARK-29973: Make `processedRowsPerSecond` calculated more accurately and meaningfully") {
     import testImplicits._
 
@@ -298,7 +336,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
       AdvanceManualClock(1000),
       waitUntilBatchProcessed,
       AssertOnQuery(query => {
-        assert(query.lastProgress == null)
+        assert(query.lastProgress.numInputRows == 0)
+        assert(query.lastProgress.processedRowsPerSecond == 0.0d)
         true
       }),
       AddData(inputData, 1, 2),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org