You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/02/02 02:48:43 UTC

spark git commit: [SPARK-4859][Core][Streaming] Refactor LiveListenerBus and StreamingListenerBus

Repository: spark
Updated Branches:
  refs/heads/master 4a171225b -> 883bc88d5


[SPARK-4859][Core][Streaming] Refactor LiveListenerBus and StreamingListenerBus

This PR refactors LiveListenerBus and StreamingListenerBus and extracts the common codes to a parent class `ListenerBus`.

It also includes bug fixes in #3710:
1. Fix the race condition of queueFullErrorMessageLogged in LiveListenerBus and StreamingListenerBus to avoid outputing `queue-full-error` logs multiple times.
2. Make sure the SHUTDOWN message will be delivered to listenerThread, so that we can make sure listenerThread will always be able to exit.
3. Log the error from listener rather than crashing listenerThread in StreamingListenerBus.

During fixing the above bugs, we find it's better to make LiveListenerBus and StreamingListenerBus have the same bahaviors. Then there will be many duplicated codes in LiveListenerBus and StreamingListenerBus.

Therefore, I extracted their common codes to `ListenerBus` as a parent class: LiveListenerBus and StreamingListenerBus only need to extend `ListenerBus` and implement `onPostEvent` (how to process an event) and `onDropEvent` (do something when droppping an event).

Author: zsxwing <zs...@gmail.com>

Closes #4006 from zsxwing/SPARK-4859-refactor and squashes the following commits:

c8dade2 [zsxwing] Fix the code style after renaming
5715061 [zsxwing] Rename ListenerHelper to ListenerBus and the original ListenerBus to AsynchronousListenerBus
f0ef647 [zsxwing] Fix the code style
4e85ffc [zsxwing] Merge branch 'master' into SPARK-4859-refactor
d2ef990 [zsxwing] Add private[spark]
4539f91 [zsxwing] Remove final to pass MiMa tests
a9dccd3 [zsxwing] Remove SparkListenerShutdown
7cc04c3 [zsxwing] Refactor LiveListenerBus and StreamingListenerBus and make them share same code base


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/883bc88d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/883bc88d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/883bc88d

Branch: refs/heads/master
Commit: 883bc88d520b27bdeb74a1837b45ef0b59753568
Parents: 4a17122
Author: zsxwing <zs...@gmail.com>
Authored: Sun Feb 1 17:47:51 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Sun Feb 1 17:48:41 2015 -0800

----------------------------------------------------------------------
 .../spark/scheduler/LiveListenerBus.scala       | 123 ++-----------
 .../apache/spark/scheduler/SparkListener.scala  |   3 -
 .../spark/scheduler/SparkListenerBus.scala      |  71 +++-----
 .../spark/util/AsynchronousListenerBus.scala    | 173 +++++++++++++++++++
 .../org/apache/spark/util/JsonProtocol.scala    |   1 -
 .../org/apache/spark/util/ListenerBus.scala     |  66 +++++++
 .../streaming/scheduler/StreamingListener.scala |   3 -
 .../scheduler/StreamingListenerBus.scala        |  95 +++-------
 8 files changed, 300 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/883bc88d/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 36a6e63..be23056 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -17,10 +17,9 @@
 
 package org.apache.spark.scheduler
 
-import java.util.concurrent.{LinkedBlockingQueue, Semaphore}
+import java.util.concurrent.atomic.AtomicBoolean
 
-import org.apache.spark.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.util.AsynchronousListenerBus
 
 /**
  * Asynchronously passes SparkListenerEvents to registered SparkListeners.
@@ -29,113 +28,19 @@ import org.apache.spark.util.Utils
  * has started will events be actually propagated to all attached listeners. This listener bus
  * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
  */
-private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
-
-  /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
-   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
-  private val EVENT_QUEUE_CAPACITY = 10000
-  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
-  private var queueFullErrorMessageLogged = false
-  private var started = false
-
-  // A counter that represents the number of events produced and consumed in the queue
-  private val eventLock = new Semaphore(0)
-
-  private val listenerThread = new Thread("SparkListenerBus") {
-    setDaemon(true)
-    override def run(): Unit = Utils.logUncaughtExceptions {
-      while (true) {
-        eventLock.acquire()
-        // Atomically remove and process this event
-        LiveListenerBus.this.synchronized {
-          val event = eventQueue.poll
-          if (event == SparkListenerShutdown) {
-            // Get out of the while loop and shutdown the daemon thread
-            return
-          }
-          Option(event).foreach(postToAll)
-        }
-      }
-    }
-  }
-
-  /**
-   * Start sending events to attached listeners.
-   *
-   * This first sends out all buffered events posted before this listener bus has started, then
-   * listens for any additional events asynchronously while the listener bus is still running.
-   * This should only be called once.
-   */
-  def start() {
-    if (started) {
-      throw new IllegalStateException("Listener bus already started!")
+private[spark] class LiveListenerBus
+  extends AsynchronousListenerBus[SparkListener, SparkListenerEvent]("SparkListenerBus")
+  with SparkListenerBus {
+
+  private val logDroppedEvent = new AtomicBoolean(false)
+
+  override def onDropEvent(event: SparkListenerEvent): Unit = {
+    if (logDroppedEvent.compareAndSet(false, true)) {
+      // Only log the following message once to avoid duplicated annoying logs.
+      logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
+        "This likely means one of the SparkListeners is too slow and cannot keep up with " +
+        "the rate at which tasks are being started by the scheduler.")
     }
-    listenerThread.start()
-    started = true
   }
 
-  def post(event: SparkListenerEvent) {
-    val eventAdded = eventQueue.offer(event)
-    if (eventAdded) {
-      eventLock.release()
-    } else {
-      logQueueFullErrorMessage()
-    }
-  }
-
-  /**
-   * For testing only. Wait until there are no more events in the queue, or until the specified
-   * time has elapsed. Return true if the queue has emptied and false is the specified time
-   * elapsed before the queue emptied.
-   */
-  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
-    val finishTime = System.currentTimeMillis + timeoutMillis
-    while (!queueIsEmpty) {
-      if (System.currentTimeMillis > finishTime) {
-        return false
-      }
-      /* Sleep rather than using wait/notify, because this is used only for testing and
-       * wait/notify add overhead in the general case. */
-      Thread.sleep(10)
-    }
-    true
-  }
-
-  /**
-   * For testing only. Return whether the listener daemon thread is still alive.
-   */
-  def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }
-
-  /**
-   * Return whether the event queue is empty.
-   *
-   * The use of synchronized here guarantees that all events that once belonged to this queue
-   * have already been processed by all attached listeners, if this returns true.
-   */
-  def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }
-
-  /**
-   * Log an error message to indicate that the event queue is full. Do this only once.
-   */
-  private def logQueueFullErrorMessage(): Unit = {
-    if (!queueFullErrorMessageLogged) {
-      if (listenerThread.isAlive) {
-        logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
-          "This likely means one of the SparkListeners is too slow and cannot keep up with" +
-          "the rate at which tasks are being started by the scheduler.")
-      } else {
-        logError("SparkListenerBus thread is dead! This means SparkListenerEvents have not" +
-          "been (and will no longer be) propagated to listeners for some time.")
-      }
-      queueFullErrorMessageLogged = true
-    }
-  }
-
-  def stop() {
-    if (!started) {
-      throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
-    }
-    post(SparkListenerShutdown)
-    listenerThread.join()
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/883bc88d/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 8f5ceaa..dd28ddb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -116,9 +116,6 @@ case class SparkListenerApplicationStart(appName: String, appId: Option[String],
 @DeveloperApi
 case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
 
-/** An event used in the listener to shutdown the listener daemon thread. */
-private[spark] case object SparkListenerShutdown extends SparkListenerEvent
-
 
 /**
  * :: DeveloperApi ::

http://git-wip-us.apache.org/repos/asf/spark/blob/883bc88d/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index e700c6a..fe8a19a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -17,78 +17,47 @@
 
 package org.apache.spark.scheduler
 
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.util.ListenerBus
 
 /**
- * A SparkListenerEvent bus that relays events to its listeners
+ * A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners
  */
-private[spark] trait SparkListenerBus extends Logging {
-
-  // SparkListeners attached to this event bus
-  protected val sparkListeners = new ArrayBuffer[SparkListener]
-    with mutable.SynchronizedBuffer[SparkListener]
-
-  def addListener(listener: SparkListener) {
-    sparkListeners += listener
-  }
+private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] {
 
-  /**
-   * Post an event to all attached listeners.
-   * This does nothing if the event is SparkListenerShutdown.
-   */
-  def postToAll(event: SparkListenerEvent) {
+  override def onPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = {
     event match {
       case stageSubmitted: SparkListenerStageSubmitted =>
-        foreachListener(_.onStageSubmitted(stageSubmitted))
+        listener.onStageSubmitted(stageSubmitted)
       case stageCompleted: SparkListenerStageCompleted =>
-        foreachListener(_.onStageCompleted(stageCompleted))
+        listener.onStageCompleted(stageCompleted)
       case jobStart: SparkListenerJobStart =>
-        foreachListener(_.onJobStart(jobStart))
+        listener.onJobStart(jobStart)
       case jobEnd: SparkListenerJobEnd =>
-        foreachListener(_.onJobEnd(jobEnd))
+        listener.onJobEnd(jobEnd)
       case taskStart: SparkListenerTaskStart =>
-        foreachListener(_.onTaskStart(taskStart))
+        listener.onTaskStart(taskStart)
       case taskGettingResult: SparkListenerTaskGettingResult =>
-        foreachListener(_.onTaskGettingResult(taskGettingResult))
+        listener.onTaskGettingResult(taskGettingResult)
       case taskEnd: SparkListenerTaskEnd =>
-        foreachListener(_.onTaskEnd(taskEnd))
+        listener.onTaskEnd(taskEnd)
       case environmentUpdate: SparkListenerEnvironmentUpdate =>
-        foreachListener(_.onEnvironmentUpdate(environmentUpdate))
+        listener.onEnvironmentUpdate(environmentUpdate)
       case blockManagerAdded: SparkListenerBlockManagerAdded =>
-        foreachListener(_.onBlockManagerAdded(blockManagerAdded))
+        listener.onBlockManagerAdded(blockManagerAdded)
       case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
-        foreachListener(_.onBlockManagerRemoved(blockManagerRemoved))
+        listener.onBlockManagerRemoved(blockManagerRemoved)
       case unpersistRDD: SparkListenerUnpersistRDD =>
-        foreachListener(_.onUnpersistRDD(unpersistRDD))
+        listener.onUnpersistRDD(unpersistRDD)
       case applicationStart: SparkListenerApplicationStart =>
-        foreachListener(_.onApplicationStart(applicationStart))
+        listener.onApplicationStart(applicationStart)
       case applicationEnd: SparkListenerApplicationEnd =>
-        foreachListener(_.onApplicationEnd(applicationEnd))
+        listener.onApplicationEnd(applicationEnd)
       case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
-        foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))
+        listener.onExecutorMetricsUpdate(metricsUpdate)
       case executorAdded: SparkListenerExecutorAdded =>
-        foreachListener(_.onExecutorAdded(executorAdded))
+        listener.onExecutorAdded(executorAdded)
       case executorRemoved: SparkListenerExecutorRemoved =>
-        foreachListener(_.onExecutorRemoved(executorRemoved))
-      case SparkListenerShutdown =>
-    }
-  }
-
-  /**
-   * Apply the given function to all attached listeners, catching and logging any exception.
-   */
-  private def foreachListener(f: SparkListener => Unit): Unit = {
-    sparkListeners.foreach { listener =>
-      try {
-        f(listener)
-      } catch {
-        case e: Exception =>
-          logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
-      }
+        listener.onExecutorRemoved(executorRemoved)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/883bc88d/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
new file mode 100644
index 0000000..18c627e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicBoolean
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * Asynchronously passes events to registered listeners.
+ *
+ * Until `start()` is called, all posted events are only buffered. Only after this listener bus
+ * has started will events be actually propagated to all attached listeners. This listener bus
+ * is stopped when `stop()` is called, and it will drop further events after stopping.
+ *
+ * @param name name of the listener bus, will be the name of the listener thread.
+ * @tparam L type of listener
+ * @tparam E type of event
+ */
+private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: String)
+  extends ListenerBus[L, E] {
+
+  self =>
+
+  /* Cap the capacity of the event queue so we get an explicit error (rather than
+   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
+  private val EVENT_QUEUE_CAPACITY = 10000
+  private val eventQueue = new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY)
+
+  // Indicate if `start()` is called
+  private val started = new AtomicBoolean(false)
+  // Indicate if `stop()` is called
+  private val stopped = new AtomicBoolean(false)
+
+  // Indicate if we are processing some event
+  // Guarded by `self`
+  private var processingEvent = false
+
+  // A counter that represents the number of events produced and consumed in the queue
+  private val eventLock = new Semaphore(0)
+
+  private val listenerThread = new Thread(name) {
+    setDaemon(true)
+    override def run(): Unit = Utils.logUncaughtExceptions {
+      while (true) {
+        eventLock.acquire()
+        self.synchronized {
+          processingEvent = true
+        }
+        try {
+          val event = eventQueue.poll
+          if (event == null) {
+            // Get out of the while loop and shutdown the daemon thread
+            if (!stopped.get) {
+              throw new IllegalStateException("Polling `null` from eventQueue means" +
+                " the listener bus has been stopped. So `stopped` must be true")
+            }
+            return
+          }
+          postToAll(event)
+        } finally {
+          self.synchronized {
+            processingEvent = false
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Start sending events to attached listeners.
+   *
+   * This first sends out all buffered events posted before this listener bus has started, then
+   * listens for any additional events asynchronously while the listener bus is still running.
+   * This should only be called once.
+   */
+  def start() {
+    if (started.compareAndSet(false, true)) {
+      listenerThread.start()
+    } else {
+      throw new IllegalStateException(s"$name already started!")
+    }
+  }
+
+  def post(event: E) {
+    if (stopped.get) {
+      // Drop further events to make `listenerThread` exit ASAP
+      logError(s"$name has already stopped! Dropping event $event")
+      return
+    }
+    val eventAdded = eventQueue.offer(event)
+    if (eventAdded) {
+      eventLock.release()
+    } else {
+      onDropEvent(event)
+    }
+  }
+
+  /**
+   * For testing only. Wait until there are no more events in the queue, or until the specified
+   * time has elapsed. Return true if the queue has emptied and false is the specified time
+   * elapsed before the queue emptied.
+   */
+  @VisibleForTesting
+  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
+    val finishTime = System.currentTimeMillis + timeoutMillis
+    while (!queueIsEmpty) {
+      if (System.currentTimeMillis > finishTime) {
+        return false
+      }
+      /* Sleep rather than using wait/notify, because this is used only for testing and
+       * wait/notify add overhead in the general case. */
+      Thread.sleep(10)
+    }
+    true
+  }
+
+  /**
+   * For testing only. Return whether the listener daemon thread is still alive.
+   */
+  @VisibleForTesting
+  def listenerThreadIsAlive: Boolean = listenerThread.isAlive
+
+  /**
+   * Return whether the event queue is empty.
+   *
+   * The use of synchronized here guarantees that all events that once belonged to this queue
+   * have already been processed by all attached listeners, if this returns true.
+   */
+  private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && !processingEvent }
+
+  /**
+   * Stop the listener bus. It will wait until the queued events have been processed, but drop the
+   * new events after stopping.
+   */
+  def stop() {
+    if (!started.get()) {
+      throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
+    }
+    if (stopped.compareAndSet(false, true)) {
+      // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know
+      // `stop` is called.
+      eventLock.release()
+      listenerThread.join()
+    } else {
+      // Keep quiet
+    }
+  }
+
+  /**
+   * If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be
+   * notified with the dropped events.
+   *
+   * Note: `onDropEvent` can be called in any thread.
+   */
+  def onDropEvent(event: E): Unit
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/883bc88d/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index b5f736d..414bc49 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -91,7 +91,6 @@ private[spark] object JsonProtocol {
       case executorRemoved: SparkListenerExecutorRemoved =>
         executorRemovedToJson(executorRemoved)
       // These aren't used, but keeps compiler happy
-      case SparkListenerShutdown => JNothing
       case SparkListenerExecutorMetricsUpdate(_, _) => JNothing
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/883bc88d/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
new file mode 100644
index 0000000..bd0aa4d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent.CopyOnWriteArrayList
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.Logging
+
+/**
+ * An event bus which posts events to its listeners.
+ */
+private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
+
+  private val listeners = new CopyOnWriteArrayList[L]
+
+  /**
+   * Add a listener to listen events. This method is thread-safe and can be called in any thread.
+   */
+  final def addListener(listener: L) {
+    listeners.add(listener)
+  }
+
+  /**
+   * Post the event to all registered listeners. The `postToAll` caller should guarantee calling
+   * `postToAll` in the same thread for all events.
+   */
+  final def postToAll(event: E): Unit = {
+    // JavaConversions will create a JIterableWrapper if we use some Scala collection functions.
+    // However, this method will be called frequently. To avoid the wrapper cost, here ewe use
+    // Java Iterator directly.
+    val iter = listeners.iterator
+    while (iter.hasNext) {
+      val listener = iter.next()
+      try {
+        onPostEvent(listener, event)
+      } catch {
+        case NonFatal(e) =>
+          logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
+      }
+    }
+  }
+
+  /**
+   * Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same
+   * thread.
+   */
+  def onPostEvent(listener: L, event: E): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/883bc88d/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index ed1aa11..74dbba4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -50,9 +50,6 @@ case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo)
 case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo)
   extends StreamingListenerEvent
 
-/** An event used in the listener to shutdown the listener daemon thread. */
-private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
-
 /**
  * :: DeveloperApi ::
  * A listener interface for receiving information about an ongoing streaming

http://git-wip-us.apache.org/repos/asf/spark/blob/883bc88d/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 398724d..b07d6cf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -17,83 +17,42 @@
 
 package org.apache.spark.streaming.scheduler
 
+import java.util.concurrent.atomic.AtomicBoolean
+
 import org.apache.spark.Logging
-import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
-import java.util.concurrent.LinkedBlockingQueue
+import org.apache.spark.util.AsynchronousListenerBus
 
 /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
-private[spark] class StreamingListenerBus() extends Logging {
-  private val listeners = new ArrayBuffer[StreamingListener]()
-    with SynchronizedBuffer[StreamingListener]
-
-  /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
-   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
-  private val EVENT_QUEUE_CAPACITY = 10000
-  private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY)
-  private var queueFullErrorMessageLogged = false
-
-  val listenerThread = new Thread("StreamingListenerBus") {
-    setDaemon(true)
-    override def run() {
-      while (true) {
-        val event = eventQueue.take
-        event match {
-          case receiverStarted: StreamingListenerReceiverStarted =>
-            listeners.foreach(_.onReceiverStarted(receiverStarted))
-          case receiverError: StreamingListenerReceiverError =>
-            listeners.foreach(_.onReceiverError(receiverError))
-          case receiverStopped: StreamingListenerReceiverStopped =>
-            listeners.foreach(_.onReceiverStopped(receiverStopped))
-          case batchSubmitted: StreamingListenerBatchSubmitted =>
-            listeners.foreach(_.onBatchSubmitted(batchSubmitted))
-          case batchStarted: StreamingListenerBatchStarted =>
-            listeners.foreach(_.onBatchStarted(batchStarted))
-          case batchCompleted: StreamingListenerBatchCompleted =>
-            listeners.foreach(_.onBatchCompleted(batchCompleted))
-          case StreamingListenerShutdown =>
-            // Get out of the while loop and shutdown the daemon thread
-            return
-          case _ =>
-        }
-      }
+private[spark] class StreamingListenerBus
+  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
+  with Logging {
+
+  private val logDroppedEvent = new AtomicBoolean(false)
+
+  override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {
+    event match {
+      case receiverStarted: StreamingListenerReceiverStarted =>
+        listener.onReceiverStarted(receiverStarted)
+      case receiverError: StreamingListenerReceiverError =>
+        listener.onReceiverError(receiverError)
+      case receiverStopped: StreamingListenerReceiverStopped =>
+        listener.onReceiverStopped(receiverStopped)
+      case batchSubmitted: StreamingListenerBatchSubmitted =>
+        listener.onBatchSubmitted(batchSubmitted)
+      case batchStarted: StreamingListenerBatchStarted =>
+        listener.onBatchStarted(batchStarted)
+      case batchCompleted: StreamingListenerBatchCompleted =>
+        listener.onBatchCompleted(batchCompleted)
+      case _ =>
     }
   }
 
-  def start() {
-    listenerThread.start()
-  }
-
-  def addListener(listener: StreamingListener) {
-    listeners += listener
-  }
-
-  def post(event: StreamingListenerEvent) {
-    val eventAdded = eventQueue.offer(event)
-    if (!eventAdded && !queueFullErrorMessageLogged) {
+  override def onDropEvent(event: StreamingListenerEvent): Unit = {
+    if (logDroppedEvent.compareAndSet(false, true)) {
+      // Only log the following message once to avoid duplicated annoying logs.
       logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
         "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
         "rate at which events are being started by the scheduler.")
-      queueFullErrorMessageLogged = true
     }
   }
-
-  /**
-   * Waits until there are no more events in the queue, or until the specified time has elapsed.
-   * Used for testing only. Returns true if the queue has emptied and false is the specified time
-   * elapsed before the queue emptied.
-   */
-  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
-    val finishTime = System.currentTimeMillis + timeoutMillis
-    while (!eventQueue.isEmpty) {
-      if (System.currentTimeMillis > finishTime) {
-        return false
-      }
-      /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
-       * add overhead in the general case. */
-      Thread.sleep(10)
-    }
-    true
-  }
-
-  def stop(): Unit = post(StreamingListenerShutdown)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org