You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/09/22 04:55:11 UTC

spark git commit: [SPARK-22094][SS] processAllAvailable should check the query state

Repository: spark
Updated Branches:
  refs/heads/master f32a84250 -> fedf6961b


[SPARK-22094][SS] processAllAvailable should check the query state

## What changes were proposed in this pull request?

`processAllAvailable` should also check the query state and if the query is stopped, it should return.

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu <zs...@gmail.com>

Closes #19314 from zsxwing/SPARK-22094.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fedf6961
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fedf6961
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fedf6961

Branch: refs/heads/master
Commit: fedf6961be4e99139eb7ab08d5e6e29187ea5ccf
Parents: f32a842
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Thu Sep 21 21:55:07 2017 -0700
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Thu Sep 21 21:55:07 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/streaming/StreamExecution.scala |  2 +-
 .../spark/sql/streaming/StreamingQuerySuite.scala       | 12 ++++++++++++
 2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fedf6961/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index b2d6c60..406560c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -840,7 +840,7 @@ class StreamExecution(
         if (streamDeathCause != null) {
           throw streamDeathCause
         }
-        if (noNewData) {
+        if (noNewData || !isActive) {
           return
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/fedf6961/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 3823e33..ab35079 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -640,6 +640,18 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     }
   }
 
+  test("processAllAvailable should not block forever when a query is stopped") {
+    val input = MemoryStream[Int]
+    input.addData(1)
+    val query = input.toDF().writeStream
+      .trigger(Trigger.Once())
+      .format("console")
+      .start()
+    failAfter(streamingTimeout) {
+      query.processAllAvailable()
+    }
+  }
+
   /** Create a streaming DF that only execute one batch in which it returns the given static DF */
   private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = {
     require(!triggerDF.isStreaming)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org