You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by zsxwing <gi...@git.apache.org> on 2016/01/16 00:56:36 UTC

[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

GitHub user zsxwing opened a pull request:

    https://github.com/apache/spark/pull/10779

    [SPARK-12847][Core][Streaming]Remove StreamingListenerBus and post all Streaming events to the same thread as Spark events

    Including the following changes:
    
    1. Add StreamingSparkListenerAdapter to process events in `onOtherEvent` to StreamingListener
    2. Remove StreamingListenerBus
    3. Merge AsynchronousListenerBus and LiveListenerBus to the same class LiveListenerBus
    4. Remove unnecessary generics
    5. Add `logEvent` method to SparkListenerEvent so that EventLoggingListener can use it to ignore StreamingListenerEvents

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zsxwing/spark streaming-listener

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/10779.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #10779
    
----
commit 2e13d5ecb867d644c11718eda7dd358d137d60a0
Author: Shixiong Zhu <sh...@databricks.com>
Date:   2016-01-15T23:34:28Z

    Remove StreamingListenerBus and post all Streaming events to the same thread as Spark events
    
    Including the following changes:
    
    1. Add StreamingSparkListenerAdapter to process events in `onOtherEvent` to StreamingListener
    2. Remove StreamingListenerBus
    3. Merge AsynchronousListenerBus and LiveListenerBus to the same class LiveListenerBus
    4. Remove unnecessary generics
    5. Add `logEvent` method to SparkListenerEvent so that EventLoggingListener can use it to ignore StreamingListenerEvents

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-173031418
  
    as discussed offline, LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50049199
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -42,5 +187,14 @@ private[spark] class LiveListenerBus
             "the rate at which tasks are being started by the scheduler.")
         }
       }
    +}
    +
    +private[spark] object LiveListenerBus {
    +  /* Allows for Context to check whether stop() call is made within listener thread
    +  */
    --- End diff --
    
    just do `//`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-173054175
  
    retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172137250
  
    **[Test build #49501 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49501/consoleFull)** for PR 10779 at commit [`2e13d5e`](https://github.com/apache/spark/commit/2e13d5ecb867d644c11718eda7dd358d137d60a0).
     * This patch **fails MiMa tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172956140
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172970291
  
    **[Test build #49699 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49699/consoleFull)** for PR 10779 at commit [`07c3679`](https://github.com/apache/spark/commit/07c3679c3d91426abd4844709f91a5cb2bc02949).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172673400
  
    **[Test build #49630 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49630/consoleFull)** for PR 10779 at commit [`26374f5`](https://github.com/apache/spark/commit/26374f504cce42de99105830b8ff625c7139c81d).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r49920775
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
    @@ -200,7 +200,9 @@ private[spark] class EventLoggingListener(
       override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }
     
       override def onOtherEvent(event: SparkListenerEvent): Unit = {
    -    logEvent(event, flushLogger = true)
    +    if (event.logEvent) {
    --- End diff --
    
    @vanzin Looks JsonProtocol already supported `StreamingListenerEvent`s?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50205975
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala ---
    @@ -34,8 +34,6 @@ class ReceiverTrackerSuite extends TestSuiteBase {
     
       test("send rate update to receivers") {
         withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc =>
    -      ssc.scheduler.listenerBus.start(ssc.sc)
    --- End diff --
    
    Does this not need to go back in there because start() has been added back?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-173052959
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49725/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50165313
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerForwardingBus.scala ---
    @@ -17,17 +17,38 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
    +import org.apache.spark.util.ListenerBus
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    +/**
    + * Wrap StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark listener bus.
    + */
    +private[streaming] case class WrappedStreamingListenerEvent(
    +    streamingListenerEvent: StreamingListenerEvent) extends SparkListenerEvent {
    +
    +  // TODO once SPARK-12140 is resolved this will be true as well
    +  protected[spark] override def logEvent: Boolean = false
    +}
     
    -/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
    -private[spark] class StreamingListenerBus
    -  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
    -  with Logging {
    +/**
    + * A Streaming listener bus to forward events in WrappedStreamingListenerEvent to StreamingListeners
    + */
    +private[streaming] class StreamingListenerForwardingBus(sparkListenerBus: LiveListenerBus)
    +  extends SparkListener with ListenerBus[StreamingListener, StreamingListenerEvent] {
    --- End diff --
    
    Spark's `LiveListenerBus` and the old `StreamingListenerBus` also extend ListenerBus and has the `postAll` method. Since nobody complaints it, I think it's fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50158458
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerForwardingBus.scala ---
    @@ -17,17 +17,38 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
    +import org.apache.spark.util.ListenerBus
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    +/**
    + * Wrap StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark listener bus.
    + */
    +private[streaming] case class WrappedStreamingListenerEvent(
    +    streamingListenerEvent: StreamingListenerEvent) extends SparkListenerEvent {
    +
    +  // TODO once SPARK-12140 is resolved this will be true as well
    --- End diff --
    
    but once we do SPARK-12140 we will remove can just remove this flag no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172137270
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172692990
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-173075935
  
    **[Test build #49741 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49741/consoleFull)** for PR 10779 at commit [`8eb545c`](https://github.com/apache/spark/commit/8eb545c7b6c8fbb2c67f0aadd8ec070f9b10ba9e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50051890
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingSparkListenerAdapter.scala ---
    @@ -17,19 +17,16 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    -
    -/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
    -private[spark] class StreamingListenerBus
    -  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
    -  with Logging {
    -
    -  private val logDroppedEvent = new AtomicBoolean(false)
    +/**
    + * A SparkListener adapter for StreamingListener that will dispatch the Streaming events to
    + * the underlying StreamingListener.
    + */
    +private[spark] class StreamingSparkListenerAdapter(listener: StreamingListener)
    --- End diff --
    
    Making StreamingListener extend SparkListener has a tricky issue. 
    
    Image someone implements both StreamingListener and SparkListener in the current codes. Then he needs to register this one with SparkContext and StreamingContext. If we make StreamingListener extend SparkListener, the user usually won't notice this change as his codes can be compiled successfully, however, the listener receives SparkListenerEvent twice and cause very tricky issue and hard to find the real problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50048973
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -17,24 +17,169 @@
     
     package org.apache.spark.scheduler
     
    +import java.util.concurrent._
     import java.util.concurrent.atomic.AtomicBoolean
     
    -import org.apache.spark.util.AsynchronousListenerBus
    +import scala.util.DynamicVariable
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.util.Utils
     
     /**
      * Asynchronously passes SparkListenerEvents to registered SparkListeners.
      *
    - * Until start() is called, all posted events are only buffered. Only after this listener bus
    + * Until `start()` is called, all posted events are only buffered. Only after this listener bus
      * has started will events be actually propagated to all attached listeners. This listener bus
    - * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
    + * is stopped when `stop()` is called, and it will drop further events after stopping.
      */
    -private[spark] class LiveListenerBus
    -  extends AsynchronousListenerBus[SparkListener, SparkListenerEvent]("SparkListenerBus")
    -  with SparkListenerBus {
    +private[spark] class LiveListenerBus extends SparkListenerBus {
    +
    +  self =>
    +
    +  import LiveListenerBus._
    +
    +  private var sparkContext: SparkContext = null
    +
    +  /* Cap the capacity of the event queue so we get an explicit error (rather than
    +   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
    +  private val EVENT_QUEUE_CAPACITY = 10000
    +  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
    +
    +  // Indicate if `start()` is called
    +  private val started = new AtomicBoolean(false)
    +  // Indicate if `stop()` is called
    +  private val stopped = new AtomicBoolean(false)
    +
    +  // Indicate if we are processing some event
    +  // Guarded by `self`
    +  private var processingEvent = false
     
       private val logDroppedEvent = new AtomicBoolean(false)
     
    -  override def onDropEvent(event: SparkListenerEvent): Unit = {
    +  // A counter that represents the number of events produced and consumed in the queue
    +  private val eventLock = new Semaphore(0)
    +
    +  private val listenerThread = new Thread(name) {
    +    setDaemon(true)
    +    override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
    +      LiveListenerBus.withinListenerThread.withValue(true) {
    +        while (true) {
    +          eventLock.acquire()
    +          self.synchronized {
    +            processingEvent = true
    +          }
    +          try {
    +            val event = eventQueue.poll
    +            if (event == null) {
    +              // Get out of the while loop and shutdown the daemon thread
    +              if (!stopped.get) {
    +                throw new IllegalStateException("Polling `null` from eventQueue means" +
    +                  " the listener bus has been stopped. So `stopped` must be true")
    +              }
    +              return
    +            }
    +            postToAll(event)
    +          } finally {
    +            self.synchronized {
    +              processingEvent = false
    +            }
    +          }
    +        }
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Start sending events to attached listeners.
    +   *
    +   * This first sends out all buffered events posted before this listener bus has started, then
    +   * listens for any additional events asynchronously while the listener bus is still running.
    +   * This should only be called once.
    +   *
    +   * @param sc Used to stop the SparkContext in case the listener thread dies.
    +   */
    +  def start(sc: SparkContext) {
    --- End diff --
    
    `Unit` return type


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-173076058
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50159846
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerForwardingBus.scala ---
    @@ -17,17 +17,38 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
    +import org.apache.spark.util.ListenerBus
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    +/**
    + * Wrap StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark listener bus.
    + */
    +private[streaming] case class WrappedStreamingListenerEvent(
    +    streamingListenerEvent: StreamingListenerEvent) extends SparkListenerEvent {
    +
    +  // TODO once SPARK-12140 is resolved this will be true as well
    +  protected[spark] override def logEvent: Boolean = false
    +}
     
    -/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
    -private[spark] class StreamingListenerBus
    -  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
    -  with Logging {
    +/**
    + * A Streaming listener bus to forward events in WrappedStreamingListenerEvent to StreamingListeners
    + */
    +private[streaming] class StreamingListenerForwardingBus(sparkListenerBus: LiveListenerBus)
    +  extends SparkListener with ListenerBus[StreamingListener, StreamingListenerEvent] {
    --- End diff --
    
    I think it's fine that it extends `ListenerBus` since it does in fact share the same method signatures. I don't see how the threading assumptions are violated if we call `postToAll`. There's only 1 thread (the core listener bus), and that thread calls this synchronously.
    
    Also it's not just the 3 lines. We also want to inherit the `postToAll` method because we want to post to all streaming listeners, right? The implementation also has some try catch logic in there so I think it's better this way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50049695
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingSparkListenerAdapter.scala ---
    @@ -17,19 +17,16 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    -
    -/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
    -private[spark] class StreamingListenerBus
    -  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
    -  with Logging {
    -
    -  private val logDroppedEvent = new AtomicBoolean(false)
    +/**
    + * A SparkListener adapter for StreamingListener that will dispatch the Streaming events to
    + * the underlying StreamingListener.
    + */
    +private[spark] class StreamingSparkListenerAdapter(listener: StreamingListener)
    --- End diff --
    
    do we need this class? What if we made `StreamingListener` extend `SparkListener` and override `onOtherEvent` to delegate the streaming events?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172134268
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172737916
  
    Sounds great. Let me update it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50197743
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -51,12 +68,31 @@ private[spark] class StreamingListenerBus
         }
       }
     
    -  override def onDropEvent(event: StreamingListenerEvent): Unit = {
    -    if (logDroppedEvent.compareAndSet(false, true)) {
    -      // Only log the following message once to avoid duplicated annoying logs.
    -      logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
    -        "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
    -        "rate at which events are being started by the scheduler.")
    -    }
    +  /**
    +   * Register this one with the Spark listener bus so that it can receive Streaming events and
    +   * forward them to StreamingListeners.
    +   */
    +  def start(): Unit = {
    +    sparkListenerBus.addListener(this) // for getting callbacks on spark events
    +  }
    +
    +  /**
    +   * Unregister this one with the Spark listener bus and all StreamingListeners won't receive any
    +   * events after that.
    +   */
    +  def stop(): Unit = {
    +    sparkListenerBus.removeListener(this)
    +  }
    +
    +  /**
    +   * Wrapper for StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark
    +   * listener bus.
    +   */
    +  case class WrappedStreamingListenerEvent(
    +    streamingListenerEvent: StreamingListenerEvent) extends SparkListenerEvent {
    --- End diff --
    
    prefer style:
    ```
    private case class WrapperStreamingListenerEvent(event: StreamingListenerEvent)
      extends SparkListenerEvent {
      ...
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-173341695
  
    LGTM. Merging this to master! Thank you very much! @zsxwing 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-173052647
  
    **[Test build #49725 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49725/consoleFull)** for PR 10779 at commit [`8d53369`](https://github.com/apache/spark/commit/8d53369b54b39dd98f21398557c6260bba4caad3).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50165546
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerForwardingBus.scala ---
    @@ -17,17 +17,38 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
    +import org.apache.spark.util.ListenerBus
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    +/**
    + * Wrap StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark listener bus.
    + */
    +private[streaming] case class WrappedStreamingListenerEvent(
    --- End diff --
    
    Moved `WrappedStreamingListenerEvent` into `StreamingListenerBus`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50194731
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -17,17 +17,28 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
    +import org.apache.spark.util.ListenerBus
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    +/**
    + * A Streaming listener bus to forward events in WrappedStreamingListenerEvent to StreamingListeners
    + */
    +private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus)
    +  extends SparkListener with ListenerBus[StreamingListener, StreamingListenerEvent] {
     
    -/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
    -private[spark] class StreamingListenerBus
    -  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
    -  with Logging {
    +  sparkListenerBus.addListener(this)    // for getting callbacks on spark events
     
    -  private val logDroppedEvent = new AtomicBoolean(false)
    +  def post(event: StreamingListenerEvent) {
    --- End diff --
    
    Added `stop` and test. I also added `start` method because `StreamingContext.stop` will just return if `StreamingContext.start` is not called.
    
    Right now, the behavior is registering StreamingListenerBus to the Spark listener bus in `StreamingContext.start` and unregistering it in `StreamingContext.stop`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172668709
  
    @zsxwing looks pretty good. I think it's pretty close to being merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50066434
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingSparkListenerAdapter.scala ---
    @@ -17,19 +17,16 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    -
    -/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
    -private[spark] class StreamingListenerBus
    -  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
    -  with Logging {
    -
    -  private val logDroppedEvent = new AtomicBoolean(false)
    +/**
    + * A SparkListener adapter for StreamingListener that will dispatch the Streaming events to
    + * the underlying StreamingListener.
    + */
    +private[spark] class StreamingSparkListenerAdapter(listener: StreamingListener)
    --- End diff --
    
    nitL The name is very long. How about `WrappedStreamingListener`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50049310
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
    @@ -34,7 +34,10 @@ import org.apache.spark.util.{Distribution, Utils}
     
     @DeveloperApi
     @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
    -trait SparkListenerEvent
    +trait SparkListenerEvent {
    +  /* Whether output this event to the event log */
    +  def logEvent: Boolean = true
    --- End diff --
    
    can you make this `protected[spark]`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-173037386
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49734/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172135951
  
    **[Test build #49501 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49501/consoleFull)** for PR 10779 at commit [`2e13d5e`](https://github.com/apache/spark/commit/2e13d5ecb867d644c11718eda7dd358d137d60a0).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172137271
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49501/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172134271
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49497/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50298208
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala ---
    @@ -34,8 +34,6 @@ class ReceiverTrackerSuite extends TestSuiteBase {
     
       test("send rate update to receivers") {
         withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc =>
    -      ssc.scheduler.listenerBus.start(ssc.sc)
    --- End diff --
    
    I see.
    
    RateController only monitors `StreamingListenerBatchCompleted`. But in this test, no StreamingListenerBatchCompleted is emitted. So this line is not necessary. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172151660
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r49919804
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -17,24 +17,169 @@
     
     package org.apache.spark.scheduler
     
    +import java.util.concurrent._
     import java.util.concurrent.atomic.AtomicBoolean
     
    -import org.apache.spark.util.AsynchronousListenerBus
    +import scala.util.DynamicVariable
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.util.Utils
     
     /**
      * Asynchronously passes SparkListenerEvents to registered SparkListeners.
      *
    - * Until start() is called, all posted events are only buffered. Only after this listener bus
    + * Until `start()` is called, all posted events are only buffered. Only after this listener bus
      * has started will events be actually propagated to all attached listeners. This listener bus
    - * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
    + * is stopped when `stop()` is called, and it will drop further events after stopping.
      */
    -private[spark] class LiveListenerBus
    -  extends AsynchronousListenerBus[SparkListener, SparkListenerEvent]("SparkListenerBus")
    -  with SparkListenerBus {
    +private[spark] class LiveListenerBus extends SparkListenerBus {
    --- End diff --
    
    Most of the file are same as the removed `AsynchronousListenerBus.scala`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-173059535
  
    Its off if a core feature like removing listener does not have any test in the core. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50174005
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -51,12 +62,15 @@ private[spark] class StreamingListenerBus
         }
       }
     
    -  override def onDropEvent(event: StreamingListenerEvent): Unit = {
    -    if (logDroppedEvent.compareAndSet(false, true)) {
    -      // Only log the following message once to avoid duplicated annoying logs.
    -      logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
    -        "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
    -        "rate at which events are being started by the scheduler.")
    -    }
    +  /**
    +   * Wrap StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark listener
    --- End diff --
    
    nit: Wrapper for StreamingListenerEvent as SparkListenerEvent...
    
    Wraps --> verb, user for methods that does something. Class should be a noun.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172692991
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49630/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50197399
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -17,19 +17,36 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
    +import org.apache.spark.util.ListenerBus
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    +/**
    + * A Streaming listener bus to forward events to StreamingListeners. This one will wrap received
    + * Streaming events as WrappedStreamingListenerEvent and send them to Spark listener bus. It also
    + * registers itself with Spark listener bus, so that it can receive WrappedStreamingListenerEvents,
    + * unwrap them as StreamingListenerEvent and dispatch them to StreamingListeners.
    + */
    +private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus)
    +  extends SparkListener with ListenerBus[StreamingListener, StreamingListenerEvent] {
     
    -/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
    -private[spark] class StreamingListenerBus
    -  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
    -  with Logging {
    +  /**
    +   * Post a StreamingListenerEvent to the Spark listener bus asynchronously. This event will be
    +   * dispatched to all StreamingListeners in the thread of the Spark listener bus.
    +   */
    +  def post(event: StreamingListenerEvent) {
    +    sparkListenerBus.post(new WrappedStreamingListenerEvent(event))
    +  }
     
    -  private val logDroppedEvent = new AtomicBoolean(false)
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = {
    +    event match {
    +      case WrappedStreamingListenerEvent(e) =>
    +        postToAll(e)
    +      case _ =>
    +    }
    +  }
     
    -  override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {
    +  protected override def doPostEvent(
    +      listener: StreamingListener, event: StreamingListenerEvent): Unit = {
    --- End diff --
    
    style:
    ```
    protected override def doPostEvent(
        listener: StreamingListener,
        event: StreamingListenerEvent): Unit = {
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50297063
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala ---
    @@ -34,8 +34,6 @@ class ReceiverTrackerSuite extends TestSuiteBase {
     
       test("send rate update to receivers") {
         withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc =>
    -      ssc.scheduler.listenerBus.start(ssc.sc)
    --- End diff --
    
    Interesting. The test still passed without this line. Let me investigate it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/10779


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50158532
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerForwardingBus.scala ---
    @@ -17,17 +17,38 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
    +import org.apache.spark.util.ListenerBus
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    +/**
    + * Wrap StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark listener bus.
    + */
    +private[streaming] case class WrappedStreamingListenerEvent(
    --- End diff --
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172151628
  
    **[Test build #49507 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49507/consoleFull)** for PR 10779 at commit [`dd33a86`](https://github.com/apache/spark/commit/dd33a86d038faf083e29924e94e7111f18dbbc44).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172998992
  
    **[Test build #49699 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49699/consoleFull)** for PR 10779 at commit [`07c3679`](https://github.com/apache/spark/commit/07c3679c3d91426abd4844709f91a5cb2bc02949).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-173316759
  
    > Its really odd if a core feature like removing listener does not have any test in the core.
    
    I'm not concerned that because addListener and removeListener both are one-line method and CopyOnWriteArrayList in JDK should already be well tested.
    
    So I think we only need to test if we call `removeListener` in `StreamingContext.stop`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172956142
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49689/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172999301
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49699/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50165599
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerForwardingBus.scala ---
    @@ -17,17 +17,38 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
    +import org.apache.spark.util.ListenerBus
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    +/**
    + * Wrap StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark listener bus.
    + */
    +private[streaming] case class WrappedStreamingListenerEvent(
    +    streamingListenerEvent: StreamingListenerEvent) extends SparkListenerEvent {
    +
    +  // TODO once SPARK-12140 is resolved this will be true as well
    --- End diff --
    
    Updated the comments to include both the history server stuff and TODO


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172727012
  
    On the streaming side, @andrewor14  and I talked offline and there can be cleaner design, with better abstractions. 
    
    Current design basically stores the list of StreamingListener in the SparkListenerBus (using the adaptor), and makes each StreamingListenerEvent extends a SparkListenerEvent. Since there is no StreamingListenerBus anymore, the abstraction is a little hard to understand on what gets posted where and who is calling the callbacks. Also the public API is being changed, which is also awkward - StreamingListener does not extend SparkListener but StreamingListenerEvent extends SparkListenerEvent.
    
    I think a better design is the following. The goal is simply for the existing StreamingListenerBus to not maintain its own thread and use the SparkListenerBus's thread to post everything, To do that all that needs to be done is for the StreamingListenerBus to forward the events into the SparkListenerBus. This can be done by the following. 
    
    
    ```
    class StreamingListenerForwardingBus(sparkListenerBus: SparkListenerBus) extends SparkListener {
         
       case class WrappedStreamingListenerEvent(streamingListenerEvent: StreamingListenerEvent) 
            extends SparkListenerEvent {
            protected[spark] override def logEvent: Boolean = false
       }
    
    
       private val listeners = new ArrayBuffer[StreamingListener]()
    
       sparkListenerBus.add(this)    // for getting callbacks on spark events
    
       def addListener(listener: StreamingListener) { listeners += listener }
    
       def post(event: StreamingListenerEvent) {
            sparkListenerBus.post(new WrappedStreamingListenerEvent(event))
       } 
    
        override def onOtherEvents(event: SparkListenerEvent) {
            event match {
                case WrappedStreamingListenerEvent(sle) => sle match {  .... 
                       // call listeners
                }
                case _ => 
            }
         }
    }
    
    
    class JobScheduler {
       ...
       val listenerBus = new FakeStreamingListenerBus(sparkContext.listenerBus)
    }
    ```
    
    This maintains the clean abstraction that streaming events get posted to streaming bus (internally forwarded to spark bus), AND does not require public API changes (streaming events do not have to extend spark events).
    
    What do you think?
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50164960
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerForwardingBus.scala ---
    @@ -17,17 +17,38 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
    +import org.apache.spark.util.ListenerBus
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    +/**
    + * Wrap StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark listener bus.
    + */
    +private[streaming] case class WrappedStreamingListenerEvent(
    +    streamingListenerEvent: StreamingListenerEvent) extends SparkListenerEvent {
    +
    +  // TODO once SPARK-12140 is resolved this will be true as well
    +  protected[spark] override def logEvent: Boolean = false
    +}
     
    -/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
    -private[spark] class StreamingListenerBus
    -  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
    -  with Logging {
    +/**
    + * A Streaming listener bus to forward events in WrappedStreamingListenerEvent to StreamingListeners
    + */
    +private[streaming] class StreamingListenerForwardingBus(sparkListenerBus: LiveListenerBus)
    --- End diff --
    
    Looks StreamingListenerBus is better. I just renamed it to StreamingListenerBus


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172692885
  
    **[Test build #49630 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49630/consoleFull)** for PR 10779 at commit [`26374f5`](https://github.com/apache/spark/commit/26374f504cce42de99105830b8ff625c7139c81d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172999298
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-173057872
  
    **[Test build #49741 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49741/consoleFull)** for PR 10779 at commit [`8eb545c`](https://github.com/apache/spark/commit/8eb545c7b6c8fbb2c67f0aadd8ec070f9b10ba9e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172143982
  
    **[Test build #49507 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49507/consoleFull)** for PR 10779 at commit [`dd33a86`](https://github.com/apache/spark/commit/dd33a86d038faf083e29924e94e7111f18dbbc44).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-173052955
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172955846
  
    **[Test build #49689 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49689/consoleFull)** for PR 10779 at commit [`27ccb05`](https://github.com/apache/spark/commit/27ccb050c84bab642c6ca5aeba35ef806244f997).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-173037383
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50154743
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerForwardingBus.scala ---
    @@ -17,17 +17,38 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
    +import org.apache.spark.util.ListenerBus
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    +/**
    + * Wrap StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark listener bus.
    + */
    +private[streaming] case class WrappedStreamingListenerEvent(
    +    streamingListenerEvent: StreamingListenerEvent) extends SparkListenerEvent {
    +
    +  // TODO once SPARK-12140 is resolved this will be true as well
    --- End diff --
    
    No need to add a todo here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172134356
  
    retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50050013
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala ---
    @@ -27,7 +28,9 @@ import org.apache.spark.util.Distribution
      * Base trait for events related to StreamingListener
      */
     @DeveloperApi
    -sealed trait StreamingListenerEvent
    +sealed trait StreamingListenerEvent extends SparkListenerEvent {
    +  override def logEvent: Boolean = false
    --- End diff --
    
    make this private, also maybe add a TODO here to explain that once SPARK-12140 is resolved this will be true as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172926400
  
    **[Test build #49689 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49689/consoleFull)** for PR 10779 at commit [`27ccb05`](https://github.com/apache/spark/commit/27ccb050c84bab642c6ca5aeba35ef806244f997).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50155771
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerForwardingBus.scala ---
    @@ -17,17 +17,38 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
    +import org.apache.spark.util.ListenerBus
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    +/**
    + * Wrap StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark listener bus.
    + */
    +private[streaming] case class WrappedStreamingListenerEvent(
    +    streamingListenerEvent: StreamingListenerEvent) extends SparkListenerEvent {
    +
    +  // TODO once SPARK-12140 is resolved this will be true as well
    +  protected[spark] override def logEvent: Boolean = false
    +}
     
    -/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
    -private[spark] class StreamingListenerBus
    -  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
    -  with Logging {
    +/**
    + * A Streaming listener bus to forward events in WrappedStreamingListenerEvent to StreamingListeners
    + */
    +private[streaming] class StreamingListenerForwardingBus(sparkListenerBus: LiveListenerBus)
    +  extends SparkListener with ListenerBus[StreamingListener, StreamingListenerEvent] {
    --- End diff --
    
    I am not sure whether this should be extend ListenerBus. It gets confusing again on whether the events are posted on same thread, different async thread, or forwarded to a different async thread. Also this inherits the `postAll` method which someone can accidentally use and violate all threading assumptions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172132667
  
    cc @andrewor14 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172926094
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49688/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-173076060
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49741/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172926088
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-173027447
  
    **[Test build #49725 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49725/consoleFull)** for PR 10779 at commit [`8d53369`](https://github.com/apache/spark/commit/8d53369b54b39dd98f21398557c6260bba4caad3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50160152
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerForwardingBus.scala ---
    @@ -17,17 +17,38 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
    +import org.apache.spark.util.ListenerBus
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    +/**
    + * Wrap StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark listener bus.
    + */
    +private[streaming] case class WrappedStreamingListenerEvent(
    +    streamingListenerEvent: StreamingListenerEvent) extends SparkListenerEvent {
    +
    +  // TODO once SPARK-12140 is resolved this will be true as well
    +  protected[spark] override def logEvent: Boolean = false
    +}
     
    -/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
    -private[spark] class StreamingListenerBus
    -  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
    -  with Logging {
    +/**
    + * A Streaming listener bus to forward events in WrappedStreamingListenerEvent to StreamingListeners
    + */
    +private[streaming] class StreamingListenerForwardingBus(sparkListenerBus: LiveListenerBus)
    --- End diff --
    
    Can we please call this `StreamingListenerBus`? It really *is* a listener bus from the perspective of streaming, right? The streaming scheduler has a reference to the bus and posts events to it. Listeners register to the bus and receive events from it. I think `*Manager` has been kind of like a catch-all default name for things we don't know what to call, like `BlockManager` or `ConnectionManager`.
    
    In this case it's literally just a bus whose details are hidden from the caller.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50173881
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -17,17 +17,28 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
    +import org.apache.spark.util.ListenerBus
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    +/**
    + * A Streaming listener bus to forward events in WrappedStreamingListenerEvent to StreamingListeners
    --- End diff --
    
    Add some docs to explain how this class works - why is it a listener, how does it forward, etc. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50155144
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerForwardingBus.scala ---
    @@ -17,17 +17,38 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
    +import org.apache.spark.util.ListenerBus
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    +/**
    + * Wrap StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark listener bus.
    + */
    +private[streaming] case class WrappedStreamingListenerEvent(
    --- End diff --
    
    I am inclined to put this as an internal class of `StreamingListenerForwardingBus` because this is basically an internal implementation detail that streaming events are wrapped as spark events.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172151662
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49507/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50174582
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---
    @@ -17,17 +17,28 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
    +import org.apache.spark.util.ListenerBus
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    +/**
    + * A Streaming listener bus to forward events in WrappedStreamingListenerEvent to StreamingListeners
    + */
    +private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus)
    +  extends SparkListener with ListenerBus[StreamingListener, StreamingListenerEvent] {
     
    -/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
    -private[spark] class StreamingListenerBus
    -  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
    -  with Logging {
    +  sparkListenerBus.addListener(this)    // for getting callbacks on spark events
     
    -  private val logDroppedEvent = new AtomicBoolean(false)
    +  def post(event: StreamingListenerEvent) {
    --- End diff --
    
    Shouldnt there be a way to remove `this` when the StreamingContext is shutdown. All the added StreamingListener are useless after the StreamingContext has been stopped. Otherwise if someone is starting and stopping StreamingContexts multiple times, all the previous listener will keep getting streaming events of the new one. 
    
    Please add a test for this to make sure that this does not happen. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50049129
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -17,24 +17,169 @@
     
     package org.apache.spark.scheduler
     
    +import java.util.concurrent._
     import java.util.concurrent.atomic.AtomicBoolean
     
    -import org.apache.spark.util.AsynchronousListenerBus
    +import scala.util.DynamicVariable
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.util.Utils
     
     /**
      * Asynchronously passes SparkListenerEvents to registered SparkListeners.
      *
    - * Until start() is called, all posted events are only buffered. Only after this listener bus
    + * Until `start()` is called, all posted events are only buffered. Only after this listener bus
      * has started will events be actually propagated to all attached listeners. This listener bus
    - * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
    + * is stopped when `stop()` is called, and it will drop further events after stopping.
      */
    -private[spark] class LiveListenerBus
    -  extends AsynchronousListenerBus[SparkListener, SparkListenerEvent]("SparkListenerBus")
    -  with SparkListenerBus {
    +private[spark] class LiveListenerBus extends SparkListenerBus {
    +
    +  self =>
    +
    +  import LiveListenerBus._
    +
    +  private var sparkContext: SparkContext = null
    +
    +  /* Cap the capacity of the event queue so we get an explicit error (rather than
    +   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
    --- End diff --
    
    not your code, but can you just use `//` comments here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12847][Core][Streaming]Remove Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50154637
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerForwardingBus.scala ---
    @@ -17,17 +17,38 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
    +import org.apache.spark.util.ListenerBus
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    +/**
    + * Wrap StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark listener bus.
    + */
    +private[streaming] case class WrappedStreamingListenerEvent(
    +    streamingListenerEvent: StreamingListenerEvent) extends SparkListenerEvent {
    +
    +  // TODO once SPARK-12140 is resolved this will be true as well
    +  protected[spark] override def logEvent: Boolean = false
    +}
     
    -/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
    -private[spark] class StreamingListenerBus
    -  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
    -  with Logging {
    +/**
    + * A Streaming listener bus to forward events in WrappedStreamingListenerEvent to StreamingListeners
    + */
    +private[streaming] class StreamingListenerForwardingBus(sparkListenerBus: LiveListenerBus)
    --- End diff --
    
    This could also be named StreamingListenerManager. Semantically still means that this is something that manages stuff, and is not confused to be a bus in any way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org