You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by concretevitamin <gi...@git.apache.org> on 2014/03/27 06:53:32 UTC

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

GitHub user concretevitamin opened a pull request:

    https://github.com/apache/spark/pull/251

    Blockingly drain all events in LiveListenerBus#stop().

    This PR attempts to implement the proposal discussed in #221.
    
    Some notes:
    - Also, move bus.stop() call to the end of sc.stop(), so that slow
        listeners would not affect the cleaning of other states.
    - LiveListenerBus.scala: newline changes from CRLF to LF.
    - Add a test to track the draining behavior. I've (1) used the patch to test my application mentioned in the old PR, and    it worked as expected, and (2) run the new test without this patch, and it fails as expected.
    
    @kayousterhout @pwendell @andrewor14: do you guys mind reviewing this?

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/concretevitamin/spark listenerbus

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/251.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #251
    
----
commit a49dc74802738c318279e988bd699c8211d50670
Author: Zongheng Yang <zo...@gmail.com>
Date:   2014-03-27T05:36:45Z

    Blockingly drain all events in LiveListenerBus#stop().
    
    Also, move bus.stop() call to the end of sc.stop(), so that slow
    listeners would not affect the cleaning of other states.

commit 36bcc4beccaa18691f76222c6cfe9a3526595a86
Author: Zongheng Yang <zo...@gmail.com>
Date:   2014-03-27T05:47:36Z

    LiveListenerBus.scala: newline changes from CRLF to LF.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-38772966
  
    This commit hopefully contains all the non-CRLF changes, for easier diffing: https://github.com/concretevitamin/spark/commit/a49dc74802738c318279e988bd699c8211d50670


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-38890757
  
    Hey I just looked at this. Why not just have `stop` call `waitUntilEmpty(infinity)` and be done with it? I don't see a big advantage of the current approach over that.
    
    I can only think of two differences. One is that in the current approach is slightly more efficient - but I don't think that matters for this use case. Another is that the current approach blocks the addition of new events after `stop` has been called. But I think it's fine to just say that the semantics of events triggered after a `stop` are undefined.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-38980845
  
    @andrewor14 Yep maybe we can bump this to 100 milliseconds to make it less inefficient. Waking/sleeping 10 times per second won't impose any measurable load on the system at all, so I'm not too worried about the busywait


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-39004007
  
    Hey @andrewor14 @pwendell -- so what I have been thinking and what Andrew pointed out correctly is, the event **"the listener bus event queue is currently empty"** and the event **"the shutdown message has been received by the listener bus thread"** *are two different events*. We want `stop()` to do the right thing which is to wait for the second event to happen. However, as far as I can tell `waitUntilEmpty()` *only waits for the first event*. In the current uses of `waitUntilEmpty()` in the unit tests, due to manual control of the events posted to the listener bus the two events happen to be the same thing.
    
    In the general case, I think it's crucial that we distinguish these two. For instance it's probably possible that during the execution of a job the event queue becomes empty (for instance, the consumption of the events is faster than the posting of the events) -- but that doesn't mean a call to `stop()` at such a moment should succeed; rather we want the call to succeed whenever the end-of-work `SparkListenerShutdown` message is processed. This is where the condition variable comes in.
    
    Let me know what you guys think ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-38875434
  
    Jenkins not working?
    
    On Wednesday, March 26, 2014, Zongheng Yang <zo...@gmail.com> wrote:
    
    > Hey Aaron,
    >
    > That's correct -- I specifically made a CRLF-only commit. I believe `git
    > diff -w` will work.
    >
    >
    > On Wed, Mar 26, 2014 at 11:34 PM, Aaron Davidson <notifications@github.com<javascript:_e(%7B%7D,'cvml','notifications@github.com');>
    > > wrote:
    >
    >> This commit hopefully contains all the non-CRLF changes, for easier
    >> diffing: concretevitamin@a49dc74<https://github.com/concretevitamin/spark/commit/a49dc74802738c318279e988bd699c8211d50670>
    >>
    >> --
    >> Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/251#issuecomment-38772966>
    >> .
    >>
    >
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/251#discussion_r11087320
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -1,101 +1,119 @@
    -/*
    - * Licensed to the Apache Software Foundation (ASF) under one or more
    - * contributor license agreements.  See the NOTICE file distributed with
    - * this work for additional information regarding copyright ownership.
    - * The ASF licenses this file to You under the Apache License, Version 2.0
    - * (the "License"); you may not use this file except in compliance with
    - * the License.  You may obtain a copy of the License at
    - *
    - *    http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -
    -package org.apache.spark.scheduler
    -
    -import java.util.concurrent.LinkedBlockingQueue
    -
    -import org.apache.spark.Logging
    -
    -/**
    - * Asynchronously passes SparkListenerEvents to registered SparkListeners.
    - *
    - * Until start() is called, all posted events are only buffered. Only after this listener bus
    - * has started will events be actually propagated to all attached listeners. This listener bus
    - * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
    - */
    -private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
    -
    -  /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
    -   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
    -  private val EVENT_QUEUE_CAPACITY = 10000
    -  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
    -  private var queueFullErrorMessageLogged = false
    -  private var started = false
    -
    -  /**
    -   * Start sending events to attached listeners.
    -   *
    -   * This first sends out all buffered events posted before this listener bus has started, then
    -   * listens for any additional events asynchronously while the listener bus is still running.
    -   * This should only be called once.
    -   */
    -  def start() {
    -    if (started) {
    -      throw new IllegalStateException("Listener bus already started!")
    -    }
    -    started = true
    -    new Thread("SparkListenerBus") {
    -      setDaemon(true)
    -      override def run() {
    -        while (true) {
    -          val event = eventQueue.take
    -          if (event == SparkListenerShutdown) {
    -            // Get out of the while loop and shutdown the daemon thread
    -            return
    -          }
    -          postToAll(event)
    -        }
    -      }
    -    }.start()
    -  }
    -
    -  def post(event: SparkListenerEvent) {
    -    val eventAdded = eventQueue.offer(event)
    -    if (!eventAdded && !queueFullErrorMessageLogged) {
    -      logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
    -        "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
    -        "rate at which tasks are being started by the scheduler.")
    -      queueFullErrorMessageLogged = true
    -    }
    -  }
    -
    -  /**
    -   * Waits until there are no more events in the queue, or until the specified time has elapsed.
    -   * Used for testing only. Returns true if the queue has emptied and false is the specified time
    -   * elapsed before the queue emptied.
    -   */
    -  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
    -    val finishTime = System.currentTimeMillis + timeoutMillis
    -    while (!eventQueue.isEmpty) {
    -      if (System.currentTimeMillis > finishTime) {
    -        return false
    -      }
    -      /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
    -       * add overhead in the general case. */
    -      Thread.sleep(10)
    -    }
    -    true
    -  }
    -
    -  def stop() {
    -    if (!started) {
    -      throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
    -    }
    -    post(SparkListenerShutdown)
    -  }
    -}
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler
    +
    +import java.util.concurrent.LinkedBlockingQueue
    +
    +import org.apache.spark.Logging
    +
    +/**
    + * Asynchronously passes SparkListenerEvents to registered SparkListeners.
    + *
    + * Until start() is called, all posted events are only buffered. Only after this listener bus
    + * has started will events be actually propagated to all attached listeners. This listener bus
    + * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
    + */
    +private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
    +
    +  /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
    +   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
    +  private val EVENT_QUEUE_CAPACITY = 10000
    +  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
    +  private var queueFullErrorMessageLogged = false
    +  private var started = false
    +
    +  private var drained = false
    +  private val drainedLock = new Object()
    +
    +  /**
    +   * Start sending events to attached listeners.
    +   *
    +   * This first sends out all buffered events posted before this listener bus has started, then
    +   * listens for any additional events asynchronously while the listener bus is still running.
    +   * This should only be called once.
    +   */
    +  def start() {
    +    if (started) {
    +      throw new IllegalStateException("Listener bus already started!")
    +    }
    +    started = true
    +    new Thread("SparkListenerBus") {
    +      setDaemon(true)
    +      override def run() {
    +        while (true) {
    +          val event = eventQueue.take
    +          if (event == SparkListenerShutdown) {
    +            drainedLock.synchronized {
    +              drained = true
    +              drainedLock.notify()
    +            }
    +            // Get out of the while loop and shutdown the daemon thread
    +            return
    +          }
    +          postToAll(event)
    +        }
    +      }
    +    }.start()
    +  }
    +
    +  def post(event: SparkListenerEvent) {
    +    val eventAdded = eventQueue.offer(event)
    +    if (!eventAdded && !queueFullErrorMessageLogged) {
    +      logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
    +        "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
    +        "rate at which tasks are being started by the scheduler.")
    +      queueFullErrorMessageLogged = true
    +    }
    +  }
    +
    +  /**
    +   * Waits until there are no more events in the queue, or until the specified time has elapsed.
    +   * Used for testing only. Returns true if the queue has emptied and false is the specified time
    +   * elapsed before the queue emptied.
    +   */
    +  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
    +    val finishTime = System.currentTimeMillis + timeoutMillis
    +    while (!eventQueue.isEmpty) {
    +      if (System.currentTimeMillis > finishTime) {
    +        return false
    +      }
    +      /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
    +       * add overhead in the general case. */
    +      Thread.sleep(10)
    +    }
    +    true
    +  }
    +
    +  /**
    +   * Stop the listener bus; wait until all listener events are processed by the listener bus
    +   * thread. The user has to make sure the listeners finish in a reasonable amount of time.
    +   */
    +  def stop() {
    +    if (!started) {
    +      throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
    +    }
    +    drainedLock.synchronized {
    +      // put post() and wait() in the same synchronized block to ensure wait() happens before
    +      // notify()
    +      post(SparkListenerShutdown)
    +      while (!drained) {
    +        drainedLock.wait()
    +      }
    --- End diff --
    
    Can't you just check for eventQueue.isEmpty? This can get rid of the `drained` variable. Also, if you synchronize on an existing variable (e.g. eventQueue) then you don't even need `drainedLock`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-38772127
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-39004597
  
    Patrick: `SparkListenerShutdown` exists in the current code, not introduced by this patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-38891805
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/251#discussion_r11090566
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -1,101 +1,119 @@
    -/*
    - * Licensed to the Apache Software Foundation (ASF) under one or more
    - * contributor license agreements.  See the NOTICE file distributed with
    - * this work for additional information regarding copyright ownership.
    - * The ASF licenses this file to You under the Apache License, Version 2.0
    - * (the "License"); you may not use this file except in compliance with
    - * the License.  You may obtain a copy of the License at
    - *
    - *    http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -
    -package org.apache.spark.scheduler
    -
    -import java.util.concurrent.LinkedBlockingQueue
    -
    -import org.apache.spark.Logging
    -
    -/**
    - * Asynchronously passes SparkListenerEvents to registered SparkListeners.
    - *
    - * Until start() is called, all posted events are only buffered. Only after this listener bus
    - * has started will events be actually propagated to all attached listeners. This listener bus
    - * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
    - */
    -private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
    -
    -  /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
    -   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
    -  private val EVENT_QUEUE_CAPACITY = 10000
    -  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
    -  private var queueFullErrorMessageLogged = false
    -  private var started = false
    -
    -  /**
    -   * Start sending events to attached listeners.
    -   *
    -   * This first sends out all buffered events posted before this listener bus has started, then
    -   * listens for any additional events asynchronously while the listener bus is still running.
    -   * This should only be called once.
    -   */
    -  def start() {
    -    if (started) {
    -      throw new IllegalStateException("Listener bus already started!")
    -    }
    -    started = true
    -    new Thread("SparkListenerBus") {
    -      setDaemon(true)
    -      override def run() {
    -        while (true) {
    -          val event = eventQueue.take
    -          if (event == SparkListenerShutdown) {
    -            // Get out of the while loop and shutdown the daemon thread
    -            return
    -          }
    -          postToAll(event)
    -        }
    -      }
    -    }.start()
    -  }
    -
    -  def post(event: SparkListenerEvent) {
    -    val eventAdded = eventQueue.offer(event)
    -    if (!eventAdded && !queueFullErrorMessageLogged) {
    -      logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
    -        "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
    -        "rate at which tasks are being started by the scheduler.")
    -      queueFullErrorMessageLogged = true
    -    }
    -  }
    -
    -  /**
    -   * Waits until there are no more events in the queue, or until the specified time has elapsed.
    -   * Used for testing only. Returns true if the queue has emptied and false is the specified time
    -   * elapsed before the queue emptied.
    -   */
    -  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
    -    val finishTime = System.currentTimeMillis + timeoutMillis
    -    while (!eventQueue.isEmpty) {
    -      if (System.currentTimeMillis > finishTime) {
    -        return false
    -      }
    -      /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
    -       * add overhead in the general case. */
    -      Thread.sleep(10)
    -    }
    -    true
    -  }
    -
    -  def stop() {
    -    if (!started) {
    -      throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
    -    }
    -    post(SparkListenerShutdown)
    -  }
    -}
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler
    +
    +import java.util.concurrent.LinkedBlockingQueue
    +
    +import org.apache.spark.Logging
    +
    +/**
    + * Asynchronously passes SparkListenerEvents to registered SparkListeners.
    + *
    + * Until start() is called, all posted events are only buffered. Only after this listener bus
    + * has started will events be actually propagated to all attached listeners. This listener bus
    + * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
    + */
    +private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
    +
    +  /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
    +   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
    +  private val EVENT_QUEUE_CAPACITY = 10000
    +  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
    +  private var queueFullErrorMessageLogged = false
    +  private var started = false
    +
    +  private var drained = false
    +  private val drainedLock = new Object()
    +
    +  /**
    +   * Start sending events to attached listeners.
    +   *
    +   * This first sends out all buffered events posted before this listener bus has started, then
    +   * listens for any additional events asynchronously while the listener bus is still running.
    +   * This should only be called once.
    +   */
    +  def start() {
    +    if (started) {
    +      throw new IllegalStateException("Listener bus already started!")
    +    }
    +    started = true
    +    new Thread("SparkListenerBus") {
    +      setDaemon(true)
    +      override def run() {
    +        while (true) {
    +          val event = eventQueue.take
    +          if (event == SparkListenerShutdown) {
    +            drainedLock.synchronized {
    +              drained = true
    +              drainedLock.notify()
    +            }
    +            // Get out of the while loop and shutdown the daemon thread
    +            return
    +          }
    +          postToAll(event)
    +        }
    +      }
    +    }.start()
    +  }
    +
    +  def post(event: SparkListenerEvent) {
    +    val eventAdded = eventQueue.offer(event)
    +    if (!eventAdded && !queueFullErrorMessageLogged) {
    +      logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
    +        "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
    +        "rate at which tasks are being started by the scheduler.")
    +      queueFullErrorMessageLogged = true
    +    }
    +  }
    +
    +  /**
    +   * Waits until there are no more events in the queue, or until the specified time has elapsed.
    +   * Used for testing only. Returns true if the queue has emptied and false is the specified time
    +   * elapsed before the queue emptied.
    +   */
    +  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
    +    val finishTime = System.currentTimeMillis + timeoutMillis
    +    while (!eventQueue.isEmpty) {
    +      if (System.currentTimeMillis > finishTime) {
    +        return false
    +      }
    +      /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
    +       * add overhead in the general case. */
    +      Thread.sleep(10)
    +    }
    +    true
    +  }
    +
    +  /**
    +   * Stop the listener bus; wait until all listener events are processed by the listener bus
    +   * thread. The user has to make sure the listeners finish in a reasonable amount of time.
    +   */
    +  def stop() {
    +    if (!started) {
    +      throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
    +    }
    +    drainedLock.synchronized {
    +      // put post() and wait() in the same synchronized block to ensure wait() happens before
    +      // notify()
    +      post(SparkListenerShutdown)
    +      while (!drained) {
    +        drainedLock.wait()
    +      }
    --- End diff --
    
    Hm yeah these are good points. It's just a little strange to equate the "event queue being drained" with "the shutdown event is received." These are not inherently the same things, but we're treating them so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-39004282
  
    I don't understand what you are saying `SparkListenerShutDown` is a thing you invented for this pull request, right? I'm not suggesting that just calling `waitUntilEmpty` is identical to the `SparkListenerShutDown` approach. There are some differences. In particular if an event is posted after `stop()` is called it will just be ignored in my proposal. I'm fine with those semantics.
    
    Could you describe a concrete problem with doing `waitUntilEmpty` inside of `stop()`. And define it in terms of the current code and not the code added here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-38892043
  
    @pwendell yeah I think I used the current approach mainly because it should be more efficient than continuously sleeping and waiting. The task at hand seems very well handled by a condition variable. Additionally, the current approach is not any more complicated?
    
    As for your second point: I thought it's safe to treat SparkListenerShutdown (and hence stop()) as the special end-of-work / poison message, so I agree with the semantics you proposed.
    
    I will update the test shortly. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-38889879
  
     Merged build triggered. Some tests failed or tests have not started running yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-38893776
  
    Hey @concretevitamin even though it might not be inherently more complicated, when we look at a patch like this it's a big win if it can use an existing well tested piece of code. So in this case there is a bias towards using the existing mechanism if it's possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/251#discussion_r11098581
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---
    @@ -72,6 +72,20 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
         }
       }
     
    +  test("bus.stop() waits for the event queue to completely drain") {
    +    val sleepyListener = new SleepyListener(1000)
    +    val bus = new LiveListenerBus
    +    bus.addListener(sleepyListener)
    +    (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
    +
    +    bus.start()
    +    // since the handler is just thread sleep, the queue should not drain immediately
    +    assert(!bus.waitUntilEmpty(0))
    +    bus.stop()
    +    // bus.stop() should wait until the event queue is drained, ensuring no events are lost
    +    assert(bus.waitUntilEmpty(0))
    --- End diff --
    
    Good call. I added a `isDrained()` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/251#discussion_r11055710
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---
    @@ -72,6 +72,20 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
         }
       }
     
    +  test("bus.stop() waits for the event queue to completely drain") {
    +    val sleepyListener = new SleepyListener(1000)
    --- End diff --
    
    Relying on sleeps like here is a little bit brittle and it also means this test will take 5 seconds which is along time. What about instead using a synchronization mechanism and create a `BlockingListener` that will just block inside of `onJobEnded` until it's signaled to continue.
    
    Also - 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-39008194
  
    It was introduced by this commit: https://github.com/apache/spark/commit/2180c8718895f1773a9906ea0ddd49171c468e0b


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-39172164
  
    Is this good to go? I can see the argument that `SparkListenerShutdown` is probably not doing anything now, but removing it should be in another PR. To stick with the current semantics it's more correct to treat the two events as different events?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-38773741
  
    Hey Aaron,
    
    That's correct -- I specifically made a CRLF-only commit. I believe `git
    diff -w` will work.
    
    
    On Wed, Mar 26, 2014 at 11:34 PM, Aaron Davidson
    <no...@github.com>wrote:
    
    > This commit hopefully contains all the non-CRLF changes, for easier
    > diffing: concretevitamin@a49dc74<https://github.com/concretevitamin/spark/commit/a49dc74802738c318279e988bd699c8211d50670>
    >
    > --
    > Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/251#issuecomment-38772966>
    > .
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-38772581
  
    Jenkins, test this please
    
    
    On Wed, Mar 26, 2014 at 11:13 PM, UCB AMPLab <no...@github.com>wrote:
    
    > Can one of the admins verify this patch?
    >
    > --
    > Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/251#issuecomment-38772127>
    > .
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin closed the pull request at:

    https://github.com/apache/spark/pull/251


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/251#discussion_r11087358
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---
    @@ -72,6 +72,20 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
         }
       }
     
    +  test("bus.stop() waits for the event queue to completely drain") {
    +    val sleepyListener = new SleepyListener(1000)
    +    val bus = new LiveListenerBus
    +    bus.addListener(sleepyListener)
    +    (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
    +
    +    bus.start()
    +    // since the handler is just thread sleep, the queue should not drain immediately
    +    assert(!bus.waitUntilEmpty(0))
    +    bus.stop()
    +    // bus.stop() should wait until the event queue is drained, ensuring no events are lost
    +    assert(bus.waitUntilEmpty(0))
    --- End diff --
    
    I think it makes sense to expose something like `bus.isEmpty`. I find `waitUntilEmpty` with a 0ms timeout a little confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-38973103
  
    To add to the existing discussion, I think the difference between the two proposed approaches is in the mechanism, not in the semantics. The semantics of both approaches are actually very similar in that in both approaches there cannot be another event posted after the Shutdown event.
    
    My only concern with busy waiting is that if the listener code does take forever to finish, then the SparkContext will hang, and in the mean time use up a lot of resources. Since multiple applications may run on the same cluster, this may end up being costly. (In the broader context, however, this case is relevant to like <0.1% of all Spark applications, so IDK how important this difference actually is).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/251#discussion_r11098601
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---
    @@ -72,6 +72,20 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
         }
       }
     
    +  test("bus.stop() waits for the event queue to completely drain") {
    +    val sleepyListener = new SleepyListener(1000)
    --- End diff --
    
    Also, the test now runs in ~200ms on my laptop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-39013687
  
    In the mean time - can you fix the unit test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/251#discussion_r11098595
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---
    @@ -72,6 +72,20 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
         }
       }
     
    +  test("bus.stop() waits for the event queue to completely drain") {
    +    val sleepyListener = new SleepyListener(1000)
    --- End diff --
    
    Fixed; I use a Semaphore instead and create a `BlockingListener` as you suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-38980531
  
    Though arguably if the application introduces an undying listener then it's the user's fault. On second thought I'm also inclined towards using the existing code, as this becomes a very simple 1-line code change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-39007461
  
    Oh I see now - I'm not sure the `SparkListenerShutDown` event is super necessary because it's the bus runs inside of a daemon thread. By definition a daemon thread doesn't need to be explicitly terminated.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-39028474
  
    I have updated the test. Since `bus.stop()` would block until the listener is signaled to proceed, I used a new thread to call the stop method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-38891806
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13539/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-38889887
  
    Merged build started. Some tests failed or tests have not started running yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/251#discussion_r11088724
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -1,101 +1,119 @@
    -/*
    - * Licensed to the Apache Software Foundation (ASF) under one or more
    - * contributor license agreements.  See the NOTICE file distributed with
    - * this work for additional information regarding copyright ownership.
    - * The ASF licenses this file to You under the Apache License, Version 2.0
    - * (the "License"); you may not use this file except in compliance with
    - * the License.  You may obtain a copy of the License at
    - *
    - *    http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -
    -package org.apache.spark.scheduler
    -
    -import java.util.concurrent.LinkedBlockingQueue
    -
    -import org.apache.spark.Logging
    -
    -/**
    - * Asynchronously passes SparkListenerEvents to registered SparkListeners.
    - *
    - * Until start() is called, all posted events are only buffered. Only after this listener bus
    - * has started will events be actually propagated to all attached listeners. This listener bus
    - * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
    - */
    -private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
    -
    -  /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
    -   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
    -  private val EVENT_QUEUE_CAPACITY = 10000
    -  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
    -  private var queueFullErrorMessageLogged = false
    -  private var started = false
    -
    -  /**
    -   * Start sending events to attached listeners.
    -   *
    -   * This first sends out all buffered events posted before this listener bus has started, then
    -   * listens for any additional events asynchronously while the listener bus is still running.
    -   * This should only be called once.
    -   */
    -  def start() {
    -    if (started) {
    -      throw new IllegalStateException("Listener bus already started!")
    -    }
    -    started = true
    -    new Thread("SparkListenerBus") {
    -      setDaemon(true)
    -      override def run() {
    -        while (true) {
    -          val event = eventQueue.take
    -          if (event == SparkListenerShutdown) {
    -            // Get out of the while loop and shutdown the daemon thread
    -            return
    -          }
    -          postToAll(event)
    -        }
    -      }
    -    }.start()
    -  }
    -
    -  def post(event: SparkListenerEvent) {
    -    val eventAdded = eventQueue.offer(event)
    -    if (!eventAdded && !queueFullErrorMessageLogged) {
    -      logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
    -        "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
    -        "rate at which tasks are being started by the scheduler.")
    -      queueFullErrorMessageLogged = true
    -    }
    -  }
    -
    -  /**
    -   * Waits until there are no more events in the queue, or until the specified time has elapsed.
    -   * Used for testing only. Returns true if the queue has emptied and false is the specified time
    -   * elapsed before the queue emptied.
    -   */
    -  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
    -    val finishTime = System.currentTimeMillis + timeoutMillis
    -    while (!eventQueue.isEmpty) {
    -      if (System.currentTimeMillis > finishTime) {
    -        return false
    -      }
    -      /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
    -       * add overhead in the general case. */
    -      Thread.sleep(10)
    -    }
    -    true
    -  }
    -
    -  def stop() {
    -    if (!started) {
    -      throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
    -    }
    -    post(SparkListenerShutdown)
    -  }
    -}
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler
    +
    +import java.util.concurrent.LinkedBlockingQueue
    +
    +import org.apache.spark.Logging
    +
    +/**
    + * Asynchronously passes SparkListenerEvents to registered SparkListeners.
    + *
    + * Until start() is called, all posted events are only buffered. Only after this listener bus
    + * has started will events be actually propagated to all attached listeners. This listener bus
    + * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
    + */
    +private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
    +
    +  /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
    +   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
    +  private val EVENT_QUEUE_CAPACITY = 10000
    +  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
    +  private var queueFullErrorMessageLogged = false
    +  private var started = false
    +
    +  private var drained = false
    +  private val drainedLock = new Object()
    +
    +  /**
    +   * Start sending events to attached listeners.
    +   *
    +   * This first sends out all buffered events posted before this listener bus has started, then
    +   * listens for any additional events asynchronously while the listener bus is still running.
    +   * This should only be called once.
    +   */
    +  def start() {
    +    if (started) {
    +      throw new IllegalStateException("Listener bus already started!")
    +    }
    +    started = true
    +    new Thread("SparkListenerBus") {
    +      setDaemon(true)
    +      override def run() {
    +        while (true) {
    +          val event = eventQueue.take
    +          if (event == SparkListenerShutdown) {
    +            drainedLock.synchronized {
    +              drained = true
    +              drainedLock.notify()
    +            }
    +            // Get out of the while loop and shutdown the daemon thread
    +            return
    +          }
    +          postToAll(event)
    +        }
    +      }
    +    }.start()
    +  }
    +
    +  def post(event: SparkListenerEvent) {
    +    val eventAdded = eventQueue.offer(event)
    +    if (!eventAdded && !queueFullErrorMessageLogged) {
    +      logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
    +        "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
    +        "rate at which tasks are being started by the scheduler.")
    +      queueFullErrorMessageLogged = true
    +    }
    +  }
    +
    +  /**
    +   * Waits until there are no more events in the queue, or until the specified time has elapsed.
    +   * Used for testing only. Returns true if the queue has emptied and false is the specified time
    +   * elapsed before the queue emptied.
    +   */
    +  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
    +    val finishTime = System.currentTimeMillis + timeoutMillis
    +    while (!eventQueue.isEmpty) {
    +      if (System.currentTimeMillis > finishTime) {
    +        return false
    +      }
    +      /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
    +       * add overhead in the general case. */
    +      Thread.sleep(10)
    +    }
    +    true
    +  }
    +
    +  /**
    +   * Stop the listener bus; wait until all listener events are processed by the listener bus
    +   * thread. The user has to make sure the listeners finish in a reasonable amount of time.
    +   */
    +  def stop() {
    +    if (!started) {
    +      throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
    +    }
    +    drainedLock.synchronized {
    +      // put post() and wait() in the same synchronized block to ensure wait() happens before
    +      // notify()
    +      post(SparkListenerShutdown)
    +      while (!drained) {
    +        drainedLock.wait()
    +      }
    --- End diff --
    
    I used `drained` instead of checking `isEmpty` because I wasn't absolutely sure if it's (currently in the codebase or in the future) possible for new events to be posted after this stop method was called. If it is possible, then checking `isEmpty` is not enough to imply that the end-of-work / drain signal has been reached, and that although we enforce a semantics that ignores such events we can't really prevent them? 
    
    As for 2nd point: after a bit googling it seems synchronizing on a thread-safe queue can cause issues using the queue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Blockingly drain all events in LiveListenerBus...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/251#issuecomment-38889844
  
    Jenkins, test this please?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---