You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/09/22 04:55:11 UTC
spark git commit: [SPARK-22094][SS] processAllAvailable should check
the query state
Repository: spark
Updated Branches:
refs/heads/master f32a84250 -> fedf6961b
[SPARK-22094][SS] processAllAvailable should check the query state
## What changes were proposed in this pull request?
`processAllAvailable` should also check the query state and if the query is stopped, it should return.
## How was this patch tested?
The new unit test.
Author: Shixiong Zhu <zs...@gmail.com>
Closes #19314 from zsxwing/SPARK-22094.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fedf6961
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fedf6961
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fedf6961
Branch: refs/heads/master
Commit: fedf6961be4e99139eb7ab08d5e6e29187ea5ccf
Parents: f32a842
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Thu Sep 21 21:55:07 2017 -0700
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Thu Sep 21 21:55:07 2017 -0700
----------------------------------------------------------------------
.../spark/sql/execution/streaming/StreamExecution.scala | 2 +-
.../spark/sql/streaming/StreamingQuerySuite.scala | 12 ++++++++++++
2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/fedf6961/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index b2d6c60..406560c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -840,7 +840,7 @@ class StreamExecution(
if (streamDeathCause != null) {
throw streamDeathCause
}
- if (noNewData) {
+ if (noNewData || !isActive) {
return
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fedf6961/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 3823e33..ab35079 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -640,6 +640,18 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
}
+ test("processAllAvailable should not block forever when a query is stopped") {
+ val input = MemoryStream[Int]
+ input.addData(1)
+ val query = input.toDF().writeStream
+ .trigger(Trigger.Once())
+ .format("console")
+ .start()
+ failAfter(streamingTimeout) {
+ query.processAllAvailable()
+ }
+ }
+
/** Create a streaming DF that only execute one batch in which it returns the given static DF */
private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = {
require(!triggerDF.isStreaming)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org