You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/06/14 05:54:33 UTC
[spark] branch branch-3.0 updated: [SPARK-31593][SS] Remove
unnecessary streaming query progress update
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 30637a8 [SPARK-31593][SS] Remove unnecessary streaming query progress update
30637a8 is described below
commit 30637a81fb50492e3c54759bb3fd7aac1cd0e326
Author: uncleGen <hu...@gmail.com>
AuthorDate: Sun Jun 14 14:49:01 2020 +0900
[SPARK-31593][SS] Remove unnecessary streaming query progress update
### What changes were proposed in this pull request?
Structured Streaming progress reporter will always report an `empty` progress when there is no new data. As design, we should provide progress updates every 10s (default) when there is no new data.
Before PR:
![20200428175008](https://user-images.githubusercontent.com/7402327/80474832-88a8ca00-897a-11ea-820b-d4be6127d2fe.jpg)
![20200428175037](https://user-images.githubusercontent.com/7402327/80474844-8ba3ba80-897a-11ea-873c-b7137bd4a804.jpg)
![20200428175102](https://user-images.githubusercontent.com/7402327/80474848-8e061480-897a-11ea-806e-28c6bbf1fe03.jpg)
After PR:
![image](https://user-images.githubusercontent.com/7402327/80475099-f35a0580-897a-11ea-8fb3-53f343df2c3f.png)
### Why are the changes needed?
Fixes a bug around incorrect progress report
### Does this PR introduce any user-facing change?
Fixes a bug around incorrect progress report
### How was this patch tested?
existing ut and manual test
Closes #28391 from uncleGen/SPARK-31593.
Authored-by: uncleGen <hu...@gmail.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit 1e40bccf447dccad9d31bccc75d21b8fca77ba52)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../sql/execution/streaming/ProgressReporter.scala | 2 +-
.../streaming/StreamingDeduplicationSuite.scala | 7 ++-
.../streaming/StreamingQueryListenerSuite.scala | 56 ++++++++++++++++++++--
3 files changed, 58 insertions(+), 7 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 0dff1c2..ea1f2ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -201,7 +201,7 @@ trait ProgressReporter extends Logging {
if (hasExecuted) {
// Reset noDataEventTimestamp if we processed any data
- lastNoExecutionProgressEventTime = Long.MinValue
+ lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
updateProgress(newProgress)
} else {
val now = triggerClock.getTimeMillis()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
index f63778a..51ddc7b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
@@ -281,7 +281,12 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
if (flag) assertNumStateRows(total = 1, updated = 1)
else assertNumStateRows(total = 7, updated = 1)
},
- AssertOnQuery(q => q.lastProgress.sink.numOutputRows == 0L)
+ AssertOnQuery { q =>
+ eventually(timeout(streamingTimeout)) {
+ q.lastProgress.sink.numOutputRows == 0L
+ true
+ }
+ }
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index e585b8a..6e08b88 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -389,7 +389,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
// Structured Streaming in Spark 2.0.0. Because we renamed the classes,
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
// to verify that we can skip broken jsons generated by Structured Streaming.
- testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt", 1)
+ testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.0.txt", 1)
}
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_1") {
@@ -397,14 +397,14 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
// Structured Streaming in Spark 2.0.1. Because we renamed the classes,
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
// to verify that we can skip broken jsons generated by Structured Streaming.
- testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt", 1)
+ testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.1.txt", 1)
}
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_2") {
// query-event-logs-version-2.0.2.txt has all types of events generated by
// Structured Streaming in Spark 2.0.2. SPARK-18516 refactored Structured Streaming query events
// in 2.1.0. This test is to verify we are able to load events generated by Spark 2.0.2.
- testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt", 5)
+ testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.2.txt", 5)
}
test("listener propagates observable metrics") {
@@ -433,9 +433,13 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
try {
+ val noDataProgressIntervalKey = SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key
spark.streams.addListener(listener)
testStream(df, OutputMode.Append)(
- StartStream(Trigger.ProcessingTime(100), triggerClock = clock),
+ StartStream(
+ Trigger.ProcessingTime(100),
+ triggerClock = clock,
+ Map(noDataProgressIntervalKey -> "100")),
// Batch 1
AddData(inputData, 1, 2),
AdvanceManualClock(100),
@@ -464,7 +468,49 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}
- private def testReplayListenerBusWithBorkenEventJsons(
+ test("SPARK-31593: remove unnecessary streaming query progress update") {
+ withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "100") {
+ @volatile var numProgressEvent = 0
+ val listener = new StreamingQueryListener {
+ override def onQueryStarted(event: QueryStartedEvent): Unit = {}
+ override def onQueryProgress(event: QueryProgressEvent): Unit = {
+ numProgressEvent += 1
+ }
+ override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
+ }
+ spark.streams.addListener(listener)
+
+ def checkProgressEvent(count: Int): StreamAction = {
+ AssertOnQuery { _ =>
+ eventually(Timeout(streamingTimeout)) {
+ assert(numProgressEvent == count)
+ }
+ true
+ }
+ }
+
+ try {
+ val input = new MemoryStream[Int](0, sqlContext)
+ val clock = new StreamManualClock()
+ val result = input.toDF().select("value")
+ testStream(result)(
+ StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock),
+ AddData(input, 10),
+ checkProgressEvent(1),
+ AdvanceManualClock(10),
+ checkProgressEvent(2),
+ AdvanceManualClock(90),
+ checkProgressEvent(2),
+ AdvanceManualClock(10),
+ checkProgressEvent(3)
+ )
+ } finally {
+ spark.streams.removeListener(listener)
+ }
+ }
+ }
+
+ private def testReplayListenerBusWithBrokenEventJsons(
fileName: String,
expectedEventSize: Int): Unit = {
val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org