You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by br...@apache.org on 2020/04/08 00:19:17 UTC
[spark] branch branch-3.0 updated: [SPARK-31278][SS] Fix
StreamingQuery output rows metric
This is an automated email from the ASF dual-hosted git repository.
brkyvz pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new a856eea [SPARK-31278][SS] Fix StreamingQuery output rows metric
a856eea is described below
commit a856eea42949810f54c5f2f41b9c9abdd2da37c6
Author: Burak Yavuz <br...@gmail.com>
AuthorDate: Tue Apr 7 17:17:47 2020 -0700
[SPARK-31278][SS] Fix StreamingQuery output rows metric
### What changes were proposed in this pull request?
In Structured Streaming, we provide progress updates every 10 seconds when a stream doesn't have any new data upstream. When providing this progress though, we zero out the input information but not the output information. This PR fixes that bug.
### Why are the changes needed?
Fixes a bug around incorrect metrics
### Does this PR introduce any user-facing change?
Fixes a bug in the metrics
### How was this patch tested?
New regression test
Closes #28040 from brkyvz/sinkMetrics.
Lead-authored-by: Burak Yavuz <br...@gmail.com>
Co-authored-by: Burak Yavuz <bu...@databricks.com>
Signed-off-by: Burak Yavuz <br...@gmail.com>
(cherry picked from commit 8ab2a0c5f23a59c00a9b4191afd976af50d913ba)
Signed-off-by: Burak Yavuz <br...@gmail.com>
---
.../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 2 +-
.../execution/streaming/MicroBatchExecution.scala | 3 +-
.../sql/execution/streaming/ProgressReporter.scala | 32 ++++++----
.../sql/streaming/StreamingAggregationSuite.scala | 71 ++++++++++++++--------
.../streaming/StreamingDeduplicationSuite.scala | 3 +-
.../StreamingQueryStatusAndProgressSuite.scala | 1 +
6 files changed, 73 insertions(+), 39 deletions(-)
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 5c8c5b1..4e808a5 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -314,7 +314,7 @@ class KafkaSinkMicroBatchStreamingSuite extends KafkaSinkStreamingSuiteBase {
try {
input.addData("1", "2", "3")
verifyResult(writer) {
- assert(writer.lastProgress.sink.numOutputRows == 3L)
+ assert(writer.recentProgress.exists(_.sink.numOutputRows == 3L))
}
} finally {
writer.stop()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 45a2ce1..e022bfb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -226,7 +226,8 @@ class MicroBatchExecution(
}
}
- finishTrigger(currentBatchHasNewData) // Must be outside reportTimeTaken so it is recorded
+ // Must be outside reportTimeTaken so it is recorded
+ finishTrigger(currentBatchHasNewData, isCurrentBatchConstructed)
// Signal waiting threads. Note this must be after finishTrigger() to ensure all
// activities (progress generation, etc.) have completed before signaling.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index feb151a..d1086cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -85,8 +85,8 @@ trait ProgressReporter extends Logging {
private val noDataProgressEventInterval =
sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
- // The timestamp we report an event that has no input data
- private var lastNoDataProgressEventTime = Long.MinValue
+ // The timestamp we report an event that has not executed anything
+ private var lastNoExecutionProgressEventTime = Long.MinValue
private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
@@ -142,8 +142,15 @@ trait ProgressReporter extends Logging {
logInfo(s"Streaming query made progress: $newProgress")
}
- /** Finalizes the query progress and adds it to list of recent status updates. */
- protected def finishTrigger(hasNewData: Boolean): Unit = {
+ /**
+ * Finalizes the query progress and adds it to list of recent status updates.
+ *
+ * @param hasNewData Whether the sources of this stream had new data for this trigger.
+ * @param hasExecuted Whether any batch was executed during this trigger. Streaming queries that
+ * perform stateful aggregations with timeouts can still run batches even
+ * though the sources don't have any new data.
+ */
+ protected def finishTrigger(hasNewData: Boolean, hasExecuted: Boolean): Unit = {
assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null)
currentTriggerEndTimestamp = triggerClock.getTimeMillis()
@@ -170,9 +177,12 @@ trait ProgressReporter extends Logging {
)
}
- val sinkProgress = SinkProgress(
- sink.toString,
- sinkCommitProgress.map(_.numOutputRows))
+ val sinkOutput = if (hasExecuted) {
+ sinkCommitProgress.map(_.numOutputRows)
+ } else {
+ sinkCommitProgress.map(_ => 0L)
+ }
+ val sinkProgress = SinkProgress(sink.toString, sinkOutput)
val observedMetrics = extractObservedMetrics(hasNewData, lastExecution)
val newProgress = new StreamingQueryProgress(
@@ -189,14 +199,14 @@ trait ProgressReporter extends Logging {
sink = sinkProgress,
observedMetrics = new java.util.HashMap(observedMetrics.asJava))
- if (hasNewData) {
+ if (hasExecuted) {
// Reset noDataEventTimestamp if we processed any data
- lastNoDataProgressEventTime = Long.MinValue
+ lastNoExecutionProgressEventTime = Long.MinValue
updateProgress(newProgress)
} else {
val now = triggerClock.getTimeMillis()
- if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
- lastNoDataProgressEventTime = now
+ if (now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime) {
+ lastNoExecutionProgressEventTime = now
updateProgress(newProgress)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 7413553..85e1b85 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -202,47 +202,68 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
}
}
- def stateOperatorProgresses: Seq[StateOperatorProgress] = {
- val operatorProgress = mutable.ArrayBuffer[StateOperatorProgress]()
- var progress = query.recentProgress.last
-
- operatorProgress ++= progress.stateOperators.map { op => op.copy(op.numRowsUpdated) }
- if (progress.numInputRows == 0) {
- // empty batch, merge metrics from previous batch as well
- progress = query.recentProgress.takeRight(2).head
- operatorProgress.zipWithIndex.foreach { case (sop, index) =>
- // "numRowsUpdated" should be merged, as it could be updated in both batches.
- // (for now it is only updated from previous batch, but things can be changed.)
- // other metrics represent current status of state so picking up the latest values.
- val newOperatorProgress = sop.copy(
- sop.numRowsUpdated + progress.stateOperators(index).numRowsUpdated)
- operatorProgress(index) = newOperatorProgress
- }
- }
+ // Pick the latest progress that actually ran a batch
+ def lastExecutedBatch: StreamingQueryProgress = {
+ query.recentProgress.filter(_.durationMs.containsKey("addBatch")).last
+ }
- operatorProgress
+ def stateOperatorProgresses: Seq[StateOperatorProgress] = {
+ lastExecutedBatch.stateOperators
}
}
+ val clock = new StreamManualClock()
+
testStream(aggWithWatermark)(
+ // batchId 0
AddData(inputData, 15),
- CheckAnswer(), // watermark = 5
+ StartStream(Trigger.ProcessingTime("interval 1 second"), clock),
+ CheckAnswer(), // watermark = 0
AssertOnQuery { _.stateNodes.size === 1 },
AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 },
+ AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },
+
+ // batchId 1 without data
+ AdvanceManualClock(1000L), // watermark = 5
+ Execute { q => // wait for the no data batch to complete
+ eventually(timeout(streamingTimeout)) { assert(q.lastProgress.batchId === 1) }
+ },
+ CheckAnswer(),
+ AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
+ AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 0 },
+ AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 },
+ AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },
+
+ // batchId 2 with data
AddData(inputData, 10, 12, 14),
- CheckAnswer(), // watermark = 5
- AssertOnQuery { _.stateNodes.size === 1 },
+ AdvanceManualClock(1000L), // watermark = 5
+ CheckAnswer(),
AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 },
+ AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },
+
+ // batchId 3 with data
AddData(inputData, 25),
- CheckAnswer((10, 3)), // watermark = 15
- AssertOnQuery { _.stateNodes.size === 1 },
- AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 1 },
+ AdvanceManualClock(1000L), // watermark = 5
+ CheckAnswer(),
+ AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
- AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 }
+ AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 3 },
+ AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },
+
+ // batchId 4 without data
+ AdvanceManualClock(1000L), // watermark = 15
+ Execute { q => // wait for the no data batch to complete
+ eventually(timeout(streamingTimeout)) { assert(q.lastProgress.batchId === 4) }
+ },
+ CheckAnswer((10, 3)),
+ AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 1 },
+ AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 0 },
+ AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 },
+ AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 1 }
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
index cfd7204..f63778a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
@@ -280,7 +280,8 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
{ // State should have been cleaned if flag is set, otherwise should not have been cleaned
if (flag) assertNumStateRows(total = 1, updated = 1)
else assertNumStateRows(total = 7, updated = 1)
- }
+ },
+ AssertOnQuery(q => q.lastProgress.sink.numOutputRows == 0L)
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 6f00b52..08b3644 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -241,6 +241,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
assert(nextProgress.numInputRows === 0)
assert(nextProgress.stateOperators.head.numRowsTotal === 2)
assert(nextProgress.stateOperators.head.numRowsUpdated === 0)
+ assert(nextProgress.sink.numOutputRows === 0)
}
} finally {
query.stop()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org