You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by br...@apache.org on 2020/04/08 00:19:17 UTC

[spark] branch branch-3.0 updated: [SPARK-31278][SS] Fix StreamingQuery output rows metric

This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a856eea  [SPARK-31278][SS] Fix StreamingQuery output rows metric
a856eea is described below

commit a856eea42949810f54c5f2f41b9c9abdd2da37c6
Author: Burak Yavuz <br...@gmail.com>
AuthorDate: Tue Apr 7 17:17:47 2020 -0700

    [SPARK-31278][SS] Fix StreamingQuery output rows metric
    
    ### What changes were proposed in this pull request?
    
    In Structured Streaming, we provide progress updates every 10 seconds when a stream doesn't have any new data upstream. When providing this progress though, we zero out the input information but not the output information. This PR fixes that bug.
    
    ### Why are the changes needed?
    
    Fixes a bug around incorrect metrics
    
    ### Does this PR introduce any user-facing change?
    
    Fixes a bug in the metrics
    
    ### How was this patch tested?
    
    New regression test
    
    Closes #28040 from brkyvz/sinkMetrics.
    
    Lead-authored-by: Burak Yavuz <br...@gmail.com>
    Co-authored-by: Burak Yavuz <bu...@databricks.com>
    Signed-off-by: Burak Yavuz <br...@gmail.com>
    (cherry picked from commit 8ab2a0c5f23a59c00a9b4191afd976af50d913ba)
    Signed-off-by: Burak Yavuz <br...@gmail.com>
---
 .../apache/spark/sql/kafka010/KafkaSinkSuite.scala |  2 +-
 .../execution/streaming/MicroBatchExecution.scala  |  3 +-
 .../sql/execution/streaming/ProgressReporter.scala | 32 ++++++----
 .../sql/streaming/StreamingAggregationSuite.scala  | 71 ++++++++++++++--------
 .../streaming/StreamingDeduplicationSuite.scala    |  3 +-
 .../StreamingQueryStatusAndProgressSuite.scala     |  1 +
 6 files changed, 73 insertions(+), 39 deletions(-)

diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 5c8c5b1..4e808a5 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -314,7 +314,7 @@ class KafkaSinkMicroBatchStreamingSuite extends KafkaSinkStreamingSuiteBase {
     try {
       input.addData("1", "2", "3")
       verifyResult(writer) {
-        assert(writer.lastProgress.sink.numOutputRows == 3L)
+        assert(writer.recentProgress.exists(_.sink.numOutputRows == 3L))
       }
     } finally {
       writer.stop()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 45a2ce1..e022bfb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -226,7 +226,8 @@ class MicroBatchExecution(
           }
         }
 
-        finishTrigger(currentBatchHasNewData)  // Must be outside reportTimeTaken so it is recorded
+        // Must be outside reportTimeTaken so it is recorded
+        finishTrigger(currentBatchHasNewData, isCurrentBatchConstructed)
 
         // Signal waiting threads. Note this must be after finishTrigger() to ensure all
         // activities (progress generation, etc.) have completed before signaling.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index feb151a..d1086cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -85,8 +85,8 @@ trait ProgressReporter extends Logging {
   private val noDataProgressEventInterval =
     sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
 
-  // The timestamp we report an event that has no input data
-  private var lastNoDataProgressEventTime = Long.MinValue
+  // The timestamp we report an event that has not executed anything
+  private var lastNoExecutionProgressEventTime = Long.MinValue
 
   private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
   timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
@@ -142,8 +142,15 @@ trait ProgressReporter extends Logging {
     logInfo(s"Streaming query made progress: $newProgress")
   }
 
-  /** Finalizes the query progress and adds it to list of recent status updates. */
-  protected def finishTrigger(hasNewData: Boolean): Unit = {
+  /**
+   * Finalizes the query progress and adds it to list of recent status updates.
+   *
+   * @param hasNewData Whether the sources of this stream had new data for this trigger.
+   * @param hasExecuted Whether any batch was executed during this trigger. Streaming queries that
+   *                    perform stateful aggregations with timeouts can still run batches even
+   *                    though the sources don't have any new data.
+   */
+  protected def finishTrigger(hasNewData: Boolean, hasExecuted: Boolean): Unit = {
     assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null)
     currentTriggerEndTimestamp = triggerClock.getTimeMillis()
 
@@ -170,9 +177,12 @@ trait ProgressReporter extends Logging {
       )
     }
 
-    val sinkProgress = SinkProgress(
-      sink.toString,
-      sinkCommitProgress.map(_.numOutputRows))
+    val sinkOutput = if (hasExecuted) {
+      sinkCommitProgress.map(_.numOutputRows)
+    } else {
+      sinkCommitProgress.map(_ => 0L)
+    }
+    val sinkProgress = SinkProgress(sink.toString, sinkOutput)
     val observedMetrics = extractObservedMetrics(hasNewData, lastExecution)
 
     val newProgress = new StreamingQueryProgress(
@@ -189,14 +199,14 @@ trait ProgressReporter extends Logging {
       sink = sinkProgress,
       observedMetrics = new java.util.HashMap(observedMetrics.asJava))
 
-    if (hasNewData) {
+    if (hasExecuted) {
       // Reset noDataEventTimestamp if we processed any data
-      lastNoDataProgressEventTime = Long.MinValue
+      lastNoExecutionProgressEventTime = Long.MinValue
       updateProgress(newProgress)
     } else {
       val now = triggerClock.getTimeMillis()
-      if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
-        lastNoDataProgressEventTime = now
+      if (now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime) {
+        lastNoExecutionProgressEventTime = now
         updateProgress(newProgress)
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 7413553..85e1b85 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -202,47 +202,68 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
         }
       }
 
-      def stateOperatorProgresses: Seq[StateOperatorProgress] = {
-        val operatorProgress = mutable.ArrayBuffer[StateOperatorProgress]()
-        var progress = query.recentProgress.last
-
-        operatorProgress ++= progress.stateOperators.map { op => op.copy(op.numRowsUpdated) }
-        if (progress.numInputRows == 0) {
-          // empty batch, merge metrics from previous batch as well
-          progress = query.recentProgress.takeRight(2).head
-          operatorProgress.zipWithIndex.foreach { case (sop, index) =>
-            // "numRowsUpdated" should be merged, as it could be updated in both batches.
-            // (for now it is only updated from previous batch, but things can be changed.)
-            // other metrics represent current status of state so picking up the latest values.
-            val newOperatorProgress = sop.copy(
-              sop.numRowsUpdated + progress.stateOperators(index).numRowsUpdated)
-            operatorProgress(index) = newOperatorProgress
-          }
-        }
+      // Pick the latest progress that actually ran a batch
+      def lastExecutedBatch: StreamingQueryProgress = {
+        query.recentProgress.filter(_.durationMs.containsKey("addBatch")).last
+      }
 
-        operatorProgress
+      def stateOperatorProgresses: Seq[StateOperatorProgress] = {
+        lastExecutedBatch.stateOperators
       }
     }
 
+    val clock = new StreamManualClock()
+
     testStream(aggWithWatermark)(
+      // batchId 0
       AddData(inputData, 15),
-      CheckAnswer(), // watermark = 5
+      StartStream(Trigger.ProcessingTime("interval 1 second"), clock),
+      CheckAnswer(), // watermark = 0
       AssertOnQuery { _.stateNodes.size === 1 },
       AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
       AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
       AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 },
+      AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },
+
+      // batchId 1 without data
+      AdvanceManualClock(1000L), // watermark = 5
+      Execute { q =>             // wait for the no data batch to complete
+        eventually(timeout(streamingTimeout)) { assert(q.lastProgress.batchId === 1) }
+      },
+      CheckAnswer(),
+      AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
+      AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 0 },
+      AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 },
+      AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },
+
+      // batchId 2 with data
       AddData(inputData, 10, 12, 14),
-      CheckAnswer(), // watermark = 5
-      AssertOnQuery { _.stateNodes.size === 1 },
+      AdvanceManualClock(1000L), // watermark = 5
+      CheckAnswer(),
       AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
       AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
       AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 },
+      AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },
+
+      // batchId 3 with data
       AddData(inputData, 25),
-      CheckAnswer((10, 3)), // watermark = 15
-      AssertOnQuery { _.stateNodes.size === 1 },
-      AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 1 },
+      AdvanceManualClock(1000L), // watermark = 5
+      CheckAnswer(),
+      AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
       AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
-      AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 }
+      AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 3 },
+      AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },
+
+      // batchId 4 without data
+      AdvanceManualClock(1000L), // watermark = 15
+      Execute { q =>             // wait for the no data batch to complete
+        eventually(timeout(streamingTimeout)) { assert(q.lastProgress.batchId === 4) }
+      },
+      CheckAnswer((10, 3)),
+      AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 1 },
+      AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 0 },
+      AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 },
+      AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 1 }
     )
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
index cfd7204..f63778a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
@@ -280,7 +280,8 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
         { // State should have been cleaned if flag is set, otherwise should not have been cleaned
           if (flag) assertNumStateRows(total = 1, updated = 1)
           else assertNumStateRows(total = 7, updated = 1)
-        }
+        },
+        AssertOnQuery(q => q.lastProgress.sink.numOutputRows == 0L)
       )
     }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 6f00b52..08b3644 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -241,6 +241,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
           assert(nextProgress.numInputRows === 0)
           assert(nextProgress.stateOperators.head.numRowsTotal === 2)
           assert(nextProgress.stateOperators.head.numRowsUpdated === 0)
+          assert(nextProgress.sink.numOutputRows === 0)
         }
       } finally {
         query.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org