You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by zsxwing <gi...@git.apache.org> on 2014/12/16 06:40:34 UTC

[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

GitHub user zsxwing opened a pull request:

    https://github.com/apache/spark/pull/3710

    [SPARK-4859][Streaming] Improve StreamingListenerBus

    * Fix the race condition of `queueFullErrorMessageLogged`.
    * Log the error from listener rather than crashing `listenerThread`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zsxwing/spark SPARK-4859

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/3710.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3710
    
----
commit d09066ead4bc61351886549756b655fca82a848e
Author: zsxwing <zs...@gmail.com>
Date:   2014-12-16T05:39:08Z

    Fix the race condition of `queueFullErrorMessageLogged` and log the error from listener rather than crashing `listenerThread`.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-68032031
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24761/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67924852
  
    > Hey @zsxwing can you update the title to make it more specific?
    
    Updated the title and description.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22266208
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -35,8 +36,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
        * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67127181
  
      [Test build #24490 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24490/consoleFull) for   PR 3710 at commit [`f5285d6`](https://github.com/apache/spark/commit/f5285d6628185c2bf21e3d1e3d7f72803cc515a4).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22263810
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -39,18 +45,19 @@ private[spark] class StreamingListenerBus() extends Logging {
             val event = eventQueue.take
    --- End diff --
    
    This does not use `stopped` like the `LiveListenerBus`. I know that introducing `eventLock` and using `eventQueue.poll` instead of `eventQueue.take` like the LiveListenerBus is too much for this PR. But at least we can eliminate the bug related to `StreamingListenerShutdown` by using `stopped` instead of `StreamingListenerShutdown`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22245060
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -17,20 +17,25 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import org.apache.spark.Logging
    -import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
    +import java.util.concurrent.atomic.AtomicBoolean
     import java.util.concurrent.LinkedBlockingQueue
    +import java.util.concurrent.CopyOnWriteArrayList
    +
    +import scala.collection.JavaConversions._
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
     
     /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
     private[spark] class StreamingListenerBus() extends Logging {
    -  private val listeners = new ArrayBuffer[StreamingListener]()
    -    with SynchronizedBuffer[StreamingListener]
    +  private val listeners = new CopyOnWriteArrayList[StreamingListener]()
    --- End diff --
    
    There is a implicit conversion going on between Java's CopyOnWriteArrayList and Scala Iterable/Iterator every time foreach applied. I am not sure whether that conversion is efficient, or if it is copying everything from the underlying array. Might be better to use Java API directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22786686
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -49,7 +52,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
             // Atomically remove and process this event
             LiveListenerBus.this.synchronized {
               val event = eventQueue.poll
    -          if (event == SparkListenerShutdown) {
    +          if (event == null) {
    +            assert(stopped)
    --- End diff --
    
    Added it in #4006 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22245214
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -75,6 +78,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
       }
     
       def post(event: SparkListenerEvent) {
    +    if (stopped) {
    --- End diff --
    
    Ahh, got it. That's cool then. This is worth it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-68083944
  
    All right, look good to me, but I will let @andrewor14 take the final call. He is on vacation so it will probably be a few days for him to respond. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67266000
  
    > Maybe there's an opportunity to refactor the listener busses to share more code; this might be a more involved change, though, and could happen in a separate PR
    
    We need a common parent trait for `SparkListenerEvent` and `StreamingListenerEvent`, also also a common class trait for `SparkListener` and `StreamingListener` so that we can write an abstract ListenerBus for them. I prefer to refactor it in a separate PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67135315
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24490/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22266216
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -64,36 +71,40 @@ private[spark] class StreamingListenerBus() extends Logging {
       }
     
       def addListener(listener: StreamingListener) {
    -    listeners += listener
    +    listeners.add(listener)
       }
     
       def post(event: StreamingListenerEvent) {
    +    if (stopped) {
    +      // Drop further events to make `StreamingListenerShutdown` be delivered ASAP
    +      logError("StreamingListenerBus has been stopped! Drop " + event)
    +      return
    +    }
         val eventAdded = eventQueue.offer(event)
    -    if (!eventAdded && !queueFullErrorMessageLogged) {
    +    if (!eventAdded && queueFullErrorMessageLogged.compareAndSet(false, true)) {
           logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
             "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
             "rate at which events are being started by the scheduler.")
    -      queueFullErrorMessageLogged = true
         }
       }
     
    -  /**
    -   * Waits until there are no more events in the queue, or until the specified time has elapsed.
    -   * Used for testing only. Returns true if the queue has emptied and false is the specified time
    -   * elapsed before the queue emptied.
    -   */
    -  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
    -    val finishTime = System.currentTimeMillis + timeoutMillis
    -    while (!eventQueue.isEmpty) {
    -      if (System.currentTimeMillis > finishTime) {
    -        return false
    +  def stop(): Unit = {
    +    stopped = true
    +    // Should not call `post`, or `StreamingListenerShutdown` may be dropped.
    +    eventQueue.put(StreamingListenerShutdown)
    +    listenerThread.join()
    +  }
    +
    +  private def foreachListener(f: StreamingListener => Unit): Unit = {
    +    val iter = listeners.iterator
    --- End diff --
    
    Added the comment and removed `JavaConversions`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22245204
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -17,20 +17,25 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import org.apache.spark.Logging
    -import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
    +import java.util.concurrent.atomic.AtomicBoolean
     import java.util.concurrent.LinkedBlockingQueue
    +import java.util.concurrent.CopyOnWriteArrayList
    +
    +import scala.collection.JavaConversions._
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
     
     /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
     private[spark] class StreamingListenerBus() extends Logging {
    -  private val listeners = new ArrayBuffer[StreamingListener]()
    -    with SynchronizedBuffer[StreamingListener]
    +  private val listeners = new CopyOnWriteArrayList[StreamingListener]()
    --- End diff --
    
    Good catch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67135310
  
      [Test build #24490 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24490/consoleFull) for   PR 3710 at commit [`f5285d6`](https://github.com/apache/spark/commit/f5285d6628185c2bf21e3d1e3d7f72803cc515a4).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67120498
  
      [Test build #24484 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24484/consoleFull) for   PR 3710 at commit [`d09066e`](https://github.com/apache/spark/commit/d09066ead4bc61351886549756b655fca82a848e).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22266276
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -64,36 +71,40 @@ private[spark] class StreamingListenerBus() extends Logging {
       }
     
       def addListener(listener: StreamingListener) {
    -    listeners += listener
    +    listeners.add(listener)
       }
     
       def post(event: StreamingListenerEvent) {
    +    if (stopped) {
    +      // Drop further events to make `StreamingListenerShutdown` be delivered ASAP
    +      logError("StreamingListenerBus has been stopped! Drop " + event)
    +      return
    +    }
         val eventAdded = eventQueue.offer(event)
    -    if (!eventAdded && !queueFullErrorMessageLogged) {
    +    if (!eventAdded && queueFullErrorMessageLogged.compareAndSet(false, true)) {
           logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
             "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
             "rate at which events are being started by the scheduler.")
    -      queueFullErrorMessageLogged = true
         }
       }
     
    -  /**
    -   * Waits until there are no more events in the queue, or until the specified time has elapsed.
    -   * Used for testing only. Returns true if the queue has emptied and false is the specified time
    -   * elapsed before the queue emptied.
    -   */
    -  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
    -    val finishTime = System.currentTimeMillis + timeoutMillis
    -    while (!eventQueue.isEmpty) {
    -      if (System.currentTimeMillis > finishTime) {
    -        return false
    +  def stop(): Unit = {
    +    stopped = true
    +    // Should not call `post`, or `StreamingListenerShutdown` may be dropped.
    +    eventQueue.put(StreamingListenerShutdown)
    --- End diff --
    
    I see, got it. This is a different approach from the LiveListenerBus. This is okay for this PR, but i would really like these to be merged in the later PR, by using a common trait. 
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22618979
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -49,7 +52,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
             // Atomically remove and process this event
             LiveListenerBus.this.synchronized {
               val event = eventQueue.poll
    -          if (event == SparkListenerShutdown) {
    +          if (event == null) {
    +            assert(stopped)
    --- End diff --
    
    could we just check `if (stopped)` here before polling the event queue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-69561617
  
    @andrewor14 instead of making the behavior of StreamingListenerBus to match that of the LiveListenerBus, I sent #4006 to make them share the same codes in the parent class `ListenerBus`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22263767
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -64,36 +71,40 @@ private[spark] class StreamingListenerBus() extends Logging {
       }
     
       def addListener(listener: StreamingListener) {
    -    listeners += listener
    +    listeners.add(listener)
       }
     
       def post(event: StreamingListenerEvent) {
    +    if (stopped) {
    +      // Drop further events to make `StreamingListenerShutdown` be delivered ASAP
    +      logError("StreamingListenerBus has been stopped! Drop " + event)
    +      return
    +    }
         val eventAdded = eventQueue.offer(event)
    -    if (!eventAdded && !queueFullErrorMessageLogged) {
    +    if (!eventAdded && queueFullErrorMessageLogged.compareAndSet(false, true)) {
           logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
             "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
             "rate at which events are being started by the scheduler.")
    -      queueFullErrorMessageLogged = true
         }
       }
     
    -  /**
    -   * Waits until there are no more events in the queue, or until the specified time has elapsed.
    -   * Used for testing only. Returns true if the queue has emptied and false is the specified time
    -   * elapsed before the queue emptied.
    -   */
    -  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
    -    val finishTime = System.currentTimeMillis + timeoutMillis
    -    while (!eventQueue.isEmpty) {
    -      if (System.currentTimeMillis > finishTime) {
    -        return false
    +  def stop(): Unit = {
    +    stopped = true
    +    // Should not call `post`, or `StreamingListenerShutdown` may be dropped.
    +    eventQueue.put(StreamingListenerShutdown)
    --- End diff --
    
    Why is this `put` still there? Wouldnt this block / throw error if the queue is full?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67115194
  
      [Test build #24484 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24484/consoleFull) for   PR 3710 at commit [`d09066e`](https://github.com/apache/spark/commit/d09066ead4bc61351886549756b655fca82a848e).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-68083882
  
      [Test build #24803 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24803/consoleFull) for   PR 3710 at commit [`df7ee40`](https://github.com/apache/spark/commit/df7ee409bcc309fa3a969bdffadd1803315fbdf3).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22243823
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -75,6 +78,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
       }
     
       def post(event: SparkListenerEvent) {
    +    if (stopped) {
    --- End diff --
    
    Is there a cost to using volatile variables here? There are different discussion around the web about the cost of the volatile being different for different architectures, and since this can be a high volume bus, its probably not a good idea to use a volatile variable. In fact it actually mixes two different concurrency models (single-threaded actor, and threadsafe volatile) which is usually not a good idea.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-68083137
  
      [Test build #24801 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24801/consoleFull) for   PR 3710 at commit [`7403edb`](https://github.com/apache/spark/commit/7403edbeff1cd80c69746c1e6a1beabe3da5dd0e).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-68083163
  
      [Test build #24801 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24801/consoleFull) for   PR 3710 at commit [`7403edb`](https://github.com/apache/spark/commit/7403edbeff1cd80c69746c1e6a1beabe3da5dd0e).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r21932115
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -64,36 +69,32 @@ private[spark] class StreamingListenerBus() extends Logging {
       }
     
       def addListener(listener: StreamingListener) {
    -    listeners += listener
    +    listeners.add(listener)
       }
     
       def post(event: StreamingListenerEvent) {
         val eventAdded = eventQueue.offer(event)
    -    if (!eventAdded && !queueFullErrorMessageLogged) {
    +    if (!eventAdded && queueFullErrorMessageLogged.compareAndSet(false, true)) {
           logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
             "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
             "rate at which events are being started by the scheduler.")
    -      queueFullErrorMessageLogged = true
         }
       }
     
    -  /**
    -   * Waits until there are no more events in the queue, or until the specified time has elapsed.
    -   * Used for testing only. Returns true if the queue has emptied and false is the specified time
    -   * elapsed before the queue emptied.
    -   */
    -  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
    -    val finishTime = System.currentTimeMillis + timeoutMillis
    -    while (!eventQueue.isEmpty) {
    -      if (System.currentTimeMillis > finishTime) {
    -        return false
    +  def stop(): Unit = {
    +    // Should not call `post`, or `StreamingListenerShutdown` may be dropped.
    +    eventQueue.offer(StreamingListenerShutdown)
    +    listenerThread.join()
    +  }
    +
    +  private def foreachListener(f: StreamingListener => Unit): Unit = {
    --- End diff --
    
    This looks like a good change to me, especially since we already use this error-handling strategy in [SparkListenerBus](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala) (added in #759).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22786638
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -17,20 +17,28 @@
     
     package org.apache.spark.streaming.scheduler
     
    +import java.util.concurrent.atomic.AtomicBoolean
    +import java.util.concurrent.{LinkedBlockingQueue, CopyOnWriteArrayList}
    +
    +import scala.util.control.NonFatal
    +
     import org.apache.spark.Logging
    -import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
    -import java.util.concurrent.LinkedBlockingQueue
    +import org.apache.spark.util.Utils
     
     /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
     private[spark] class StreamingListenerBus() extends Logging {
    -  private val listeners = new ArrayBuffer[StreamingListener]()
    -    with SynchronizedBuffer[StreamingListener]
    +  // `listeners` will be set up during the initialization of the whole system and the number of
    +  // listeners is small, so the copying cost of CopyOnWriteArrayList will be little. With the help
    +  // of CopyOnWriteArrayList, we can eliminate a lock during processing every event comparing to
    +  // SynchronizedBuffer.
    +  private val listeners = new CopyOnWriteArrayList[StreamingListener]()
    --- End diff --
    
    It's usually not safe to have a lock when posting events to outer listeners. I elimiated all locks during posting events to listeners in #4006    


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22619853
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -17,20 +17,28 @@
     
     package org.apache.spark.streaming.scheduler
     
    +import java.util.concurrent.atomic.AtomicBoolean
    +import java.util.concurrent.{LinkedBlockingQueue, CopyOnWriteArrayList}
    +
    +import scala.util.control.NonFatal
    +
     import org.apache.spark.Logging
    -import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
    -import java.util.concurrent.LinkedBlockingQueue
    +import org.apache.spark.util.Utils
     
     /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
     private[spark] class StreamingListenerBus() extends Logging {
    -  private val listeners = new ArrayBuffer[StreamingListener]()
    -    with SynchronizedBuffer[StreamingListener]
    +  // `listeners` will be set up during the initialization of the whole system and the number of
    +  // listeners is small, so the copying cost of CopyOnWriteArrayList will be little. With the help
    +  // of CopyOnWriteArrayList, we can eliminate a lock during processing every event comparing to
    +  // SynchronizedBuffer.
    +  private val listeners = new CopyOnWriteArrayList[StreamingListener]()
    --- End diff --
    
    is the use of copy on write here to speed up initialization, in the sense that events posted on start up will not be blocked as we add listeners? I may be missing something, but this speed up doesn't seem very significant to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67924843
  
    > This looks good to me; I'd say it's ready to merge. Left one really minor comment about maybe applying the same stop() fix to LiveListenerBus so that the implementations remain in sync.
    
    I pushed a commit to fix the `stop` issue. It's tricky in `LiveListenerBus`. @andrewor14 could you review it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22619353
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -118,7 +127,7 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
        * Log an error message to indicate that the event queue is full. Do this only once.
        */
       private def logQueueFullErrorMessage(): Unit = {
    -    if (!queueFullErrorMessageLogged) {
    +    if (queueFullErrorMessageLogged.compareAndSet(false, true)) {
    --- End diff --
    
    How bad is this race condition? The worst thing that can happen is that we log the same error message multiple times. Is my understanding correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67232863
  
    I left a few comments.  This seems like a reasonable set of changes, but I think that we should also make them to SparkListenerBus, too.  Maybe there's an opportunity to refactor the listener busses to share more code; this might be a more involved change, though, and could happen in a separate PR (also, maybe there's a good reason why they don't share a common trait).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22263727
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -64,36 +71,40 @@ private[spark] class StreamingListenerBus() extends Logging {
       }
     
       def addListener(listener: StreamingListener) {
    -    listeners += listener
    +    listeners.add(listener)
       }
     
       def post(event: StreamingListenerEvent) {
    +    if (stopped) {
    +      // Drop further events to make `StreamingListenerShutdown` be delivered ASAP
    +      logError("StreamingListenerBus has been stopped! Drop " + event)
    +      return
    +    }
         val eventAdded = eventQueue.offer(event)
    -    if (!eventAdded && !queueFullErrorMessageLogged) {
    +    if (!eventAdded && queueFullErrorMessageLogged.compareAndSet(false, true)) {
           logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
             "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
             "rate at which events are being started by the scheduler.")
    -      queueFullErrorMessageLogged = true
         }
       }
     
    -  /**
    -   * Waits until there are no more events in the queue, or until the specified time has elapsed.
    -   * Used for testing only. Returns true if the queue has emptied and false is the specified time
    -   * elapsed before the queue emptied.
    -   */
    -  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
    -    val finishTime = System.currentTimeMillis + timeoutMillis
    -    while (!eventQueue.isEmpty) {
    -      if (System.currentTimeMillis > finishTime) {
    -        return false
    +  def stop(): Unit = {
    +    stopped = true
    +    // Should not call `post`, or `StreamingListenerShutdown` may be dropped.
    +    eventQueue.put(StreamingListenerShutdown)
    +    listenerThread.join()
    +  }
    +
    +  private def foreachListener(f: StreamingListener => Unit): Unit = {
    +    val iter = listeners.iterator
    --- End diff --
    
    Nit: Can you add a comment mentioning why you used an iterator so that this does not regress in the future. This is quite subtle. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22246575
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -17,20 +17,25 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import org.apache.spark.Logging
    -import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
    +import java.util.concurrent.atomic.AtomicBoolean
     import java.util.concurrent.LinkedBlockingQueue
    +import java.util.concurrent.CopyOnWriteArrayList
    +
    +import scala.collection.JavaConversions._
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
     
     /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
     private[spark] class StreamingListenerBus() extends Logging {
    -  private val listeners = new ArrayBuffer[StreamingListener]()
    -    with SynchronizedBuffer[StreamingListener]
    +  private val listeners = new CopyOnWriteArrayList[StreamingListener]()
    --- End diff --
    
    Scala uses JIterableWrapper to wrapper a Java Iterable. It doesn't copy the data. However, using Java Iterator directly will avoid to create a JIterableWrapper every time. So I change to this way now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67120502
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24484/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-68032028
  
      [Test build #24761 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24761/consoleFull) for   PR 3710 at commit [`f947226`](https://github.com/apache/spark/commit/f947226b75df893f99c4af41305727e1585eb4b5).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r21932180
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -17,20 +17,25 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import org.apache.spark.Logging
    -import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
    +import java.util.concurrent.atomic.AtomicBoolean
     import java.util.concurrent.LinkedBlockingQueue
    +import java.util.concurrent.CopyOnWriteArrayList
    +
    +import scala.collection.JavaConversions._
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
     
     /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
     private[spark] class StreamingListenerBus() extends Logging {
    -  private val listeners = new ArrayBuffer[StreamingListener]()
    -    with SynchronizedBuffer[StreamingListener]
    +  private val listeners = new CopyOnWriteArrayList[StreamingListener]()
    --- End diff --
    
    Why this change?
    
    If this change is necessary, we should probably do the same thing in the SparkListenerBus trait: https://github.com/apache/spark/blame/master/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala#L32


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22617679
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -49,7 +52,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
             // Atomically remove and process this event
             LiveListenerBus.this.synchronized {
               val event = eventQueue.poll
    -          if (event == SparkListenerShutdown) {
    +          if (event == null) {
    +            assert(stopped)
    --- End diff --
    
    can you add an assertion message here so we get a slightly more descriptive feedback in case something goes wrong?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22265994
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -17,20 +17,25 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import org.apache.spark.Logging
    -import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
    +import java.util.concurrent.atomic.AtomicBoolean
     import java.util.concurrent.LinkedBlockingQueue
    +import java.util.concurrent.CopyOnWriteArrayList
    +
    +import scala.collection.JavaConversions._
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
     
     /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
     private[spark] class StreamingListenerBus() extends Logging {
    -  private val listeners = new ArrayBuffer[StreamingListener]()
    -    with SynchronizedBuffer[StreamingListener]
    +  private val listeners = new CopyOnWriteArrayList[StreamingListener]()
    --- End diff --
    
    Would be good to add a comment here saying why you are using CopyOnWriteArrayList instead of something simpler. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22245199
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -75,6 +78,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
       }
     
       def post(event: SparkListenerEvent) {
    +    if (stopped) {
    --- End diff --
    
    `StreamingListenerShutdown` may be dropped if the eventQueue is full


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22263834
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -35,8 +36,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
        * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
    --- End diff --
    
    Please update the comment about `SparkListenerShutdown` in the documentation of this class. Also we should probably removed the declaration of `SparkListenerShutdown` and any references to it. git-grep to check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-68074729
  
    I left a few more comments. There are clear inconsistencies between `LiveListenerBus` and `StreamingListenerBus`, which can only be solved by actually having `StreamingListenerBus` inherit `LiveListenerBus`. Since that is to be a different PR, I would suggest that let us at least try to maintain feature parity between them even with duplicate code. For eg., the bug with posting Shutdown event should be solved for both classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22619236
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -75,6 +79,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
       }
     
       def post(event: SparkListenerEvent) {
    +    if (stopped) {
    +      // Drop further events to make `listenerThread` exit ASAP
    +      logError("LiveListenerBus has been stopped! Drop " + event)
    --- End diff --
    
    `s"LiveListenerBus has already stopped! Dropping event $event"`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22266122
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -39,18 +45,19 @@ private[spark] class StreamingListenerBus() extends Logging {
             val event = eventQueue.take
    --- End diff --
    
    `eventQueue.take` will block until the queue is not empty. So if we don't use `eventQueue.poll` like `LiveListenerBus`, we still need to use `StreamingListenerShutdown`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-68085413
  
      [Test build #24800 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24800/consoleFull) for   PR 3710 at commit [`e088c4b`](https://github.com/apache/spark/commit/e088c4b7c1e101f75550be16c96cb63cd4b9a360).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-68082967
  
      [Test build #24800 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24800/consoleFull) for   PR 3710 at commit [`e088c4b`](https://github.com/apache/spark/commit/e088c4b7c1e101f75550be16c96cb63cd4b9a360).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22263766
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -64,36 +71,40 @@ private[spark] class StreamingListenerBus() extends Logging {
       }
     
       def addListener(listener: StreamingListener) {
    -    listeners += listener
    +    listeners.add(listener)
       }
     
       def post(event: StreamingListenerEvent) {
    +    if (stopped) {
    +      // Drop further events to make `StreamingListenerShutdown` be delivered ASAP
    +      logError("StreamingListenerBus has been stopped! Drop " + event)
    +      return
    +    }
         val eventAdded = eventQueue.offer(event)
    -    if (!eventAdded && !queueFullErrorMessageLogged) {
    +    if (!eventAdded && queueFullErrorMessageLogged.compareAndSet(false, true)) {
           logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
             "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
             "rate at which events are being started by the scheduler.")
    -      queueFullErrorMessageLogged = true
         }
       }
     
    -  /**
    -   * Waits until there are no more events in the queue, or until the specified time has elapsed.
    -   * Used for testing only. Returns true if the queue has emptied and false is the specified time
    -   * elapsed before the queue emptied.
    -   */
    -  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
    -    val finishTime = System.currentTimeMillis + timeoutMillis
    -    while (!eventQueue.isEmpty) {
    -      if (System.currentTimeMillis > finishTime) {
    -        return false
    +  def stop(): Unit = {
    +    stopped = true
    +    // Should not call `post`, or `StreamingListenerShutdown` may be dropped.
    +    eventQueue.put(StreamingListenerShutdown)
    +    listenerThread.join()
    +  }
    +
    +  private def foreachListener(f: StreamingListener => Unit): Unit = {
    +    val iter = listeners.iterator
    --- End diff --
    
    If this change is to avoid an implicit Java -> Scala collections conversion, why not replace the `JavaConversions` implicits with the more explicit `JavaConverters` instead, so that you have to manually write `.asJava` or `.asScala`?  That, in addition to a comment, would make it more obvious if we're re-introducing those conversions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r21932227
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -17,20 +17,25 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import org.apache.spark.Logging
    -import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
    +import java.util.concurrent.atomic.AtomicBoolean
     import java.util.concurrent.LinkedBlockingQueue
    +import java.util.concurrent.CopyOnWriteArrayList
    +
    +import scala.collection.JavaConversions._
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
     
     /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
     private[spark] class StreamingListenerBus() extends Logging {
    -  private val listeners = new ArrayBuffer[StreamingListener]()
    -    with SynchronizedBuffer[StreamingListener]
    +  private val listeners = new CopyOnWriteArrayList[StreamingListener]()
     
       /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
        * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
       private val EVENT_QUEUE_CAPACITY = 10000
       private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY)
    -  private var queueFullErrorMessageLogged = false
    +  private val queueFullErrorMessageLogged = new AtomicBoolean(false)
    --- End diff --
    
    This change seems fine to me, but we should probably apply the same fix to LiveListenerBus as well: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L38


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22193932
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -64,36 +69,32 @@ private[spark] class StreamingListenerBus() extends Logging {
       }
     
       def addListener(listener: StreamingListener) {
    -    listeners += listener
    +    listeners.add(listener)
       }
     
       def post(event: StreamingListenerEvent) {
         val eventAdded = eventQueue.offer(event)
    -    if (!eventAdded && !queueFullErrorMessageLogged) {
    +    if (!eventAdded && queueFullErrorMessageLogged.compareAndSet(false, true)) {
           logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
             "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
             "rate at which events are being started by the scheduler.")
    -      queueFullErrorMessageLogged = true
         }
       }
     
    -  /**
    -   * Waits until there are no more events in the queue, or until the specified time has elapsed.
    -   * Used for testing only. Returns true if the queue has emptied and false is the specified time
    -   * elapsed before the queue emptied.
    -   */
    -  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
    -    val finishTime = System.currentTimeMillis + timeoutMillis
    -    while (!eventQueue.isEmpty) {
    -      if (System.currentTimeMillis > finishTime) {
    -        return false
    +  def stop(): Unit = {
    +    // Should not call `post`, or `StreamingListenerShutdown` may be dropped.
    --- End diff --
    
    We might also want to apply this change to Spark Core's `LiveListenerBus` so that they're in sync.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22243970
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -75,6 +78,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
       }
     
       def post(event: SparkListenerEvent) {
    +    if (stopped) {
    --- End diff --
    
    Some cost, of cause. But comparing to the lock in `eventQueue.offer`, I would like to say it can be ignored.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22245148
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -75,6 +78,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
       }
     
       def post(event: SparkListenerEvent) {
    +    if (stopped) {
    --- End diff --
    
    What was wrong with the previous mechanism?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67271284
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24522/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67897335
  
    This looks good to me; I'd say it's ready to merge.  Left one really minor comment about maybe applying the same `stop()` fix to `LiveListenerBus` so that the implementations remain in sync.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22266417
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -64,36 +71,40 @@ private[spark] class StreamingListenerBus() extends Logging {
       }
     
       def addListener(listener: StreamingListener) {
    -    listeners += listener
    +    listeners.add(listener)
       }
     
       def post(event: StreamingListenerEvent) {
    +    if (stopped) {
    +      // Drop further events to make `StreamingListenerShutdown` be delivered ASAP
    +      logError("StreamingListenerBus has been stopped! Drop " + event)
    +      return
    +    }
         val eventAdded = eventQueue.offer(event)
    -    if (!eventAdded && !queueFullErrorMessageLogged) {
    +    if (!eventAdded && queueFullErrorMessageLogged.compareAndSet(false, true)) {
           logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
             "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
             "rate at which events are being started by the scheduler.")
    -      queueFullErrorMessageLogged = true
         }
       }
     
    -  /**
    -   * Waits until there are no more events in the queue, or until the specified time has elapsed.
    -   * Used for testing only. Returns true if the queue has emptied and false is the specified time
    -   * elapsed before the queue emptied.
    -   */
    -  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
    -    val finishTime = System.currentTimeMillis + timeoutMillis
    -    while (!eventQueue.isEmpty) {
    -      if (System.currentTimeMillis > finishTime) {
    -        return false
    +  def stop(): Unit = {
    +    stopped = true
    +    // Should not call `post`, or `StreamingListenerShutdown` may be dropped.
    +    eventQueue.put(StreamingListenerShutdown)
    --- End diff --
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-68083165
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24801/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-68028341
  
      [Test build #24761 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24761/consoleFull) for   PR 3710 at commit [`f947226`](https://github.com/apache/spark/commit/f947226b75df893f99c4af41305727e1585eb4b5).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67265519
  
      [Test build #24522 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24522/consoleFull) for   PR 3710 at commit [`4d96ce8`](https://github.com/apache/spark/commit/4d96ce87fff1d660ef9417c0b10b98a9a40a3b77).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22620055
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -64,36 +71,40 @@ private[spark] class StreamingListenerBus() extends Logging {
       }
     
       def addListener(listener: StreamingListener) {
    -    listeners += listener
    +    listeners.add(listener)
       }
     
       def post(event: StreamingListenerEvent) {
    +    if (stopped) {
    +      // Drop further events to make `StreamingListenerShutdown` be delivered ASAP
    +      logError("StreamingListenerBus has been stopped! Drop " + event)
    +      return
    +    }
         val eventAdded = eventQueue.offer(event)
    -    if (!eventAdded && !queueFullErrorMessageLogged) {
    +    if (!eventAdded && queueFullErrorMessageLogged.compareAndSet(false, true)) {
           logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
             "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
             "rate at which events are being started by the scheduler.")
    -      queueFullErrorMessageLogged = true
         }
       }
     
    -  /**
    -   * Waits until there are no more events in the queue, or until the specified time has elapsed.
    -   * Used for testing only. Returns true if the queue has emptied and false is the specified time
    -   * elapsed before the queue emptied.
    -   */
    -  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
    -    val finishTime = System.currentTimeMillis + timeoutMillis
    -    while (!eventQueue.isEmpty) {
    -      if (System.currentTimeMillis > finishTime) {
    -        return false
    +  def stop(): Unit = {
    +    stopped = true
    +    // Should not call `post`, or `StreamingListenerShutdown` may be dropped.
    +    eventQueue.put(StreamingListenerShutdown)
    --- End diff --
    
    I would prefer this to make this bus use the new behavior in `LiveListenerBus` as well. It's a little confusing that this one keeps using a shutdown event and joins on the thread instead. I don't see a particular requirement in the `StreamingListenerBus` that prevents us from doing this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-68086093
  
      [Test build #24803 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24803/consoleFull) for   PR 3710 at commit [`df7ee40`](https://github.com/apache/spark/commit/df7ee409bcc309fa3a969bdffadd1803315fbdf3).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-68074746
  
    Finally, I would prefer @andrewor14 take a final look because he understands that nuances of this class. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22786673
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -118,7 +127,7 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
        * Log an error message to indicate that the event queue is full. Do this only once.
        */
       private def logQueueFullErrorMessage(): Unit = {
    -    if (!queueFullErrorMessageLogged) {
    +    if (queueFullErrorMessageLogged.compareAndSet(false, true)) {
    --- End diff --
    
    Right. to avoid outputing multiple logs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67927929
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24730/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-68085416
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24800/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing closed the pull request at:

    https://github.com/apache/spark/pull/3710


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67271281
  
      [Test build #24522 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24522/consoleFull) for   PR 3710 at commit [`4d96ce8`](https://github.com/apache/spark/commit/4d96ce87fff1d660ef9417c0b10b98a9a40a3b77).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67899859
  
    Hey @zsxwing can you update the title to make it more specific?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-69100998
  
    Hi @zsxwing the changes look reasonable. My comments are mainly minor wording suggestions and clarifications. One thing is that we can save the refactoring of `StreamingListenerBus` to extend `SparkListenerBus` for a separate PR, but in this PR I would prefer the behavior of `StreamingListenerBus` to match that of the `LiveListenerBus`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Streaming] Improve StreamingListe...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r21947337
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -17,20 +17,25 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import org.apache.spark.Logging
    -import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
    +import java.util.concurrent.atomic.AtomicBoolean
     import java.util.concurrent.LinkedBlockingQueue
    +import java.util.concurrent.CopyOnWriteArrayList
    +
    +import scala.collection.JavaConversions._
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
     
     /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
     private[spark] class StreamingListenerBus() extends Logging {
    -  private val listeners = new ArrayBuffer[StreamingListener]()
    -    with SynchronizedBuffer[StreamingListener]
    +  private val listeners = new CopyOnWriteArrayList[StreamingListener]()
    --- End diff --
    
    No necessary. I just feel that eliminating a lock during handling events is better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-68086095
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24803/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67924795
  
      [Test build #24730 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24730/consoleFull) for   PR 3710 at commit [`48a112c`](https://github.com/apache/spark/commit/48a112c32fa8eebcefaf271a6cea72ff6756706e).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22265975
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -64,36 +71,40 @@ private[spark] class StreamingListenerBus() extends Logging {
       }
     
       def addListener(listener: StreamingListener) {
    -    listeners += listener
    +    listeners.add(listener)
       }
     
       def post(event: StreamingListenerEvent) {
    +    if (stopped) {
    +      // Drop further events to make `StreamingListenerShutdown` be delivered ASAP
    +      logError("StreamingListenerBus has been stopped! Drop " + event)
    +      return
    +    }
         val eventAdded = eventQueue.offer(event)
    -    if (!eventAdded && !queueFullErrorMessageLogged) {
    +    if (!eventAdded && queueFullErrorMessageLogged.compareAndSet(false, true)) {
           logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
             "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
             "rate at which events are being started by the scheduler.")
    -      queueFullErrorMessageLogged = true
         }
       }
     
    -  /**
    -   * Waits until there are no more events in the queue, or until the specified time has elapsed.
    -   * Used for testing only. Returns true if the queue has emptied and false is the specified time
    -   * elapsed before the queue emptied.
    -   */
    -  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
    -    val finishTime = System.currentTimeMillis + timeoutMillis
    -    while (!eventQueue.isEmpty) {
    -      if (System.currentTimeMillis > finishTime) {
    -        return false
    +  def stop(): Unit = {
    +    stopped = true
    +    // Should not call `post`, or `StreamingListenerShutdown` may be dropped.
    +    eventQueue.put(StreamingListenerShutdown)
    --- End diff --
    
    It will block the current thread if the queue is full. But since it will call `listenerThread.join()` later, the block is fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3710#issuecomment-67927925
  
      [Test build #24730 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24730/consoleFull) for   PR 3710 at commit [`48a112c`](https://github.com/apache/spark/commit/48a112c32fa8eebcefaf271a6cea72ff6756706e).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22617594
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -27,7 +28,8 @@ import org.apache.spark.util.Utils
      *
      * Until start() is called, all posted events are only buffered. Only after this listener bus
      * has started will events be actually propagated to all attached listeners. This listener bus
    - * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
    + * will be stopped when stop() is called. After `stop()` is called, it won't accept new events.
    + * However, for the events in the bufer, it will still process them before it exits.
    --- End diff --
    
    `However, it will still process events in the buffer before it exits.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4859][Core][Streaming] Improve LiveList...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3710#discussion_r22619181
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -49,7 +52,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
             // Atomically remove and process this event
             LiveListenerBus.this.synchronized {
               val event = eventQueue.poll
    -          if (event == SparkListenerShutdown) {
    +          if (event == null) {
    +            assert(stopped)
    --- End diff --
    
    can you put an assertion failed message here in case something goes wrong?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org