You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by eyalzit <gi...@git.apache.org> on 2017/02/19 11:35:34 UTC

[GitHub] spark pull request #16991: [SPARK-19594][Structured Streaming] StreamingQuer...

GitHub user eyalzit opened a pull request:

    https://github.com/apache/spark/pull/16991

    [SPARK-19594][Structured Streaming] StreamingQueryListener fails to handle QueryTerminatedEvent if more then one listeners exists

    ## What changes were proposed in this pull request?
    
    currently if multiple streaming queries listeners exists, when a QueryTerminatedEvent is triggered, only one of the listeners will be invoked while the rest of the listeners will ignore the event.
    this is caused since the the streaming queries listeners bus holds a set of running queries ids and when a termination event is triggered, after the first listeners is handling the event, the terminated query id is being removed from the set. 
    in this PR, the query id will be removed from the set only after all the listeners handles the event
    
    ## How was this patch tested?
    
    a test with multiple listeners has been added to StreamingQueryListenerSuite


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/eyalzit/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16991.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16991
    
----
commit ad75ec0eb336f265d638ce97567984a464bae41d
Author: Eyal Zituny <ey...@equalum.io>
Date:   2017-02-19T10:55:30Z

    [SPARK-19594] StreamingQueryListener fails to handle QueryTerminatedEvent if more then one listeners exists

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16991: [SPARK-19594][Structured Streaming] StreamingQueryListen...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16991
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73496/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16991: [SPARK-19594][Structured Streaming] StreamingQuer...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16991#discussion_r102324213
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala ---
    @@ -70,6 +70,10 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
             sparkListenerBus.post(s)
             // post to local listeners to trigger callbacks
             postToAll(s)
    +      case t: QueryTerminatedEvent =>
    +        // run all the listeners synchronized before removing the id from the list
    +        postToAll(t)
    --- End diff --
    
    It will post to all listeners directly in the current thread. Hence, the listener may see QueryProgressEvent after QueryTerminatedEvent.
    
    You can override `postToAll`. It's fine to remove `final` from the `postToAll` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16991: [SPARK-19594][Structured Streaming] StreamingQueryListen...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16991
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16991: [SPARK-19594][Structured Streaming] StreamingQuer...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16991#discussion_r102551511
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala ---
    @@ -133,6 +133,93 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
         }
       }
     
    +  testQuietly("multiple listeners, check trigger events are generated correctly") {
    --- End diff --
    
    It's better to add a regression test rather than copying the above test. Most of this test is testing the same thing in `"single listener, check trigger events are generated correctly"`. It doesn't make sense. How about this:
    ```
      test("SPARK-19594: all of listeners should receive QueryTerminatedEvent") {
        val df = MemoryStream[Int].toDS().as[Long]
        val listeners = (1 to 5).map(_ => new EventCollector)
        try {
          testStream(df, OutputMode.Append)(
            StartStream(),
            StopStream,
            AssertOnQuery { query =>
              eventually(Timeout(streamingTimeout)) {
                listeners.foreach(listener => assert(listener.terminationEvent !== null))
                listeners.foreach(listener => assert(listener.terminationEvent.id === query.id))
                listeners.foreach(listener => assert(listener.terminationEvent.runId === query.runId))
                listeners.foreach(listener => assert(listener.terminationEvent.exception === None))
              }
              listeners.foreach(listener => listener.checkAsyncErrors())
              listeners.foreach(listener => listener.reset())
              true
            }
          )
        } finally {
          listeners.foreach(spark.streams.removeListener)
        }
      }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16991: [SPARK-19594][Structured Streaming] StreamingQuer...

Posted by eyalzit <gi...@git.apache.org>.
Github user eyalzit commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16991#discussion_r103097765
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala ---
    @@ -133,6 +133,93 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
         }
       }
     
    +  testQuietly("multiple listeners, check trigger events are generated correctly") {
    --- End diff --
    
    I'm ok with that (with a small fix)
    need to add:
    listeners.foreach(listener => spark.streams.addListener(listener))



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16991: [SPARK-19594][Structured Streaming] StreamingQuer...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16991#discussion_r102547991
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala ---
    @@ -75,6 +75,20 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
         }
       }
     
    +  /**
    +   * Post the event to all registered listeners. The `postToAll` caller should guarantee calling
    +   * `postToAll` in the same thread for all events. also remove the query id after all listeners
    +   * process the QueryTerminatedEvent
    --- End diff --
    
    Don't copy the comments from the parent class. You can just document it as:
    ```
    Override the parent `postToAll` to remove query from`activeQueryRunIds` after all listeners process `QueryTerminatedEvent`. (SPARK-19594)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16991: [SPARK-19594][Structured Streaming] StreamingQueryListen...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/16991
  
    LGTM. Merging to master and 2.1. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16991: [SPARK-19594][Structured Streaming] StreamingQueryListen...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/16991
  
    ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16991: [SPARK-19594][Structured Streaming] StreamingQueryListen...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16991
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16991: [SPARK-19594][Structured Streaming] StreamingQueryListen...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16991
  
    **[Test build #73496 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73496/testReport)** for PR 16991 at commit [`896f5b3`](https://github.com/apache/spark/commit/896f5b33d01fa51db9d8ee53a8cc95274654f9e8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16991: [SPARK-19594][Structured Streaming] StreamingQueryListen...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16991
  
    **[Test build #73496 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73496/testReport)** for PR 16991 at commit [`896f5b3`](https://github.com/apache/spark/commit/896f5b33d01fa51db9d8ee53a8cc95274654f9e8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16991: [SPARK-19594][Structured Streaming] StreamingQuer...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/16991


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org