You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by concretevitamin <gi...@git.apache.org> on 2014/03/25 07:16:18 UTC

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

GitHub user concretevitamin opened a pull request:

    https://github.com/apache/spark/pull/221

    Add getListenerBus() in SparkContext.

    Motivation:
    
    We had a job that collects and analyzes statistics collected by a custom SparkListener. What we realized was that it was possible for our main job to finish first, when the listener event poster thread is still processing events. (In our particular case -- a small job consisting of linear algebra operations -- an average of ~43ms is needed for the main job to wait.) A simple solution to this seems to be gaining access to the listener bus, and thus do something like `sc.getListenerBus().waitUntilEmpty(10000)`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/concretevitamin/spark listener

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/221.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #221
    
----
commit a52de265966af39b67511597adb11e7a0b23b725
Author: Zongheng Yang <zo...@gmail.com>
Date:   2014-03-25T06:08:55Z

    Add getListenerBus() in SparkContext.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38612256
  
    If we have the readyToStopFlag, is the idea that an application will just
    keep checking the flag until it's true?
    
    
    On Tue, Mar 25, 2014 at 12:35 PM, Zongheng Yang <no...@github.com>wrote:
    
    > Thanks for the input, @andrewor14 <https://github.com/andrewor14>. Yeah
    > exposing the listener bus enables users to post events to it. I think your
    > second workaround does the job and is simpler to implement than the first;
    > we could set the flag when the shut down message is being processed (it is
    > possible for the bus to be empty but there are events coming). If everyone
    > agrees I can close this PR & send in a new one.
    >
    > --
    > Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/221#issuecomment-38610336>
    > .
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38617343
  
    because config parameters are evil - and this is one that < 1% of users are doing to use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38621766
  
    Okay fair enough - what about moving `listenerBus.stop()` to the very end of `sc.stop()`. If the user is adding custom listeners that get delayed or are buggy/slow in processing events, then they will have to deal with it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38597584
  
    Jenkins, this is okay to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38534016
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38608812
  
    I think SparkListenerEvents should only be posted from within Spark, and exposing the LiveListenerBus to the user violates this. I can think of two workarounds for this:
    
    1) As Kay suggested, optionally drain all events in LiveListenerBus.stop() until a configurable timeout has exceeded.
    
    2) We could expose some sort of boolean flag instead of the entire listener bus, perhaps something like `sc.readyToStop`, which returns true only if there are no more events left in the LiveListenerBus.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38614013
  
    Hard-coding 1sec into stop() might not be flexible enough for future uses (though it should suffice for us now). Making the applications themselves to wait & poll offer more options: they can either wait indefinitely or have their own timeout.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin closed the pull request at:

    https://github.com/apache/spark/pull/221


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38615793
  
    What about if we expose a method called `flushListeners(timeout: Int)` in SparkContext that users can optionally call before calling sc.stop(). Inside of sc.stop() we'll call this with some default value, e.g. 1 second.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38616346
  
    Why is this better than just having a config parameter for the timeont when
    flushing listeners?
    
    
    On Tue, Mar 25, 2014 at 1:24 PM, Patrick Wendell
    <no...@github.com>wrote:
    
    > What about if we expose a method called flushListeners(timeout: Int) in
    > SparkContext that users can optionally call before calling sc.stop().
    > Inside of sc.stop() we'll call this with some default value, e.g. 1 second.
    >
    > --
    > Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/221#issuecomment-38615793>
    > .
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38652864
  
    Thanks for the proposals everyone. For our case a Spark-enforced timeout might not be ideal, and only moving `listenerBus.stop()` to the end of `sc.stop()` probably does not guarantee draining. So how about exposing a binary flag from `SparkContext`, plus moving `listenerBus.stop()`? To recap, this makes sure other states / info get cleaned up in `sc.stop()` and do not get stuck because of the listener bus, and the user application can have the flexibility to poll the binary drained flag.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38604656
  
    I think it would be better not to publicly expose the listener bus, since that's meant to be an internal Spark thing, but I agree that the SparkContext shouldn't stop before trying to drain the listener bus.  I think what we should do is drain the listener bus ourselves when SparkContext.stop() gets called -- so add waitUntiEmpty(some reasonable timeout) to LiveListenerBus.stop().  Does this seem right to you two, @andrewor14 and @pwendell ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38610336
  
    Thanks for the input, @andrewor14. Yeah exposing the listener bus enables users to post events to it. I think your second workaround does the job and is simpler to implement than the first; we could set the flag when the shut down message is being processed (it is possible for the bus to be empty but there are events coming). If everyone agrees I can close this PR & send in a new one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38653088
  
    Hey @concretevitamin sorry I think our proposal was the following:
    
    1. Modify the listenerbus.stop() method to wait until all events are empty.
    2. Call listenerbus.stop() in sc.stop()
    
    This doesn't change anything visible to the user.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38771149
  
    Oops I totally missed that. Thanks @pwendell. I am closing this and sending in another patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38621306
  
    I'm still not too sure about doing this through a timeout. It just seems to me that people would only want to set this timeout to either 0 or infinity seconds. Setting it to anything in between is like trying to guess how long your listeners will finish processing an unknown number of events, which is quite hard. And if you guess it wrong, then the penalty is that you have to re-run your application and guess again.
    
    To ensure other SparkContext state gets cleaned up, we can move `listenerBus.stop()` to the end of `sc.stop()` in case there are undying listeners that keep the event bus thread from stopping.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38617497
  
    anyways I'm being tongue-in-cheek here, we could also do that. I just feel these super obscure configs make things confusing for most users. whereas if it's in the code, then advanced users can just opt to use it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38617760
  
    But if <1% of users are going to use this, isn't it more damaging to change
    the public API for them?
    
    I was going to advocate for setting it to 1 second and waiting to add a
    config variable until at least one person says they actually care about
    setting this.
    
    
    On Tue, Mar 25, 2014 at 1:39 PM, Patrick Wendell
    <no...@github.com>wrote:
    
    > anyways I'm being tongue-in-cheek here, we could also do that. I just feel
    > these super obscure configs make things confusing for most users. whereas
    > if it's in the code, then advanced users can just opt to use it.
    >
    > --
    > Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/221#issuecomment-38617497>
    > .
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38607328
  
    To have fine-grained control over the listeners, going down that approach I think it would be good to make this listener bus timeout configurable -- this might be useful for the application I mentioned. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38606759
  
    Yeah that's definitely a concern -- I was thinking we could timeout at 1 second or so and log a warning if the listener bus couldn't completely drain.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38613043
  
    @kayousterhout Yes, something like that. We could even add this check as a configurable option in sc.stop() itself, so the applications don't need to do it themselves.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38613782
  
    The drawback of waiting N seconds on the listener bus is that this is still a race condition. Because this happens at the end of the application, configuring N can be tricky because you may have to re-run the entire application again if N is not sufficiently large for all events to be processed. I guess to workaround that we could just make N arbitrarily large.
    
    A major plus of this approach is that it doesn't include changing the public API, which is always nice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38606654
  
    Should we be concerned that user jobs accidentally / intentionally use slow or computation-intensive listeners, so that SparkContext.stop() gets stuck?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38613052
  
    Yeah I'm also in favor of this -- it seems simpler and easier for
    applications to deal with.
    
    
    On Tue, Mar 25, 2014 at 12:58 PM, Patrick Wendell
    <no...@github.com>wrote:
    
    > What if we just keep this simple? Have the listener bus wait 1 second to
    > shut down when the spark context is stopped. Is there a major draw-back to
    > that.
    >
    > --
    > Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/221#issuecomment-38612883>
    > .
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Add getListenerBus() in SparkContext.

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/221#issuecomment-38612883
  
    What if we just keep this simple? Have the listener bus wait 1 second to shut down when the spark context is stopped. Is there a major draw-back to that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---