You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/09/23 21:59:56 UTC

spark git commit: [SPARK-15703][SCHEDULER][CORE][WEBUI] Make ListenerBus event queue size configurable (branch 2.0)

Repository: spark
Updated Branches:
  refs/heads/branch-2.0 5bc5b49fa -> 9e91a1009


[SPARK-15703][SCHEDULER][CORE][WEBUI] Make ListenerBus event queue size configurable (branch 2.0)

## What changes were proposed in this pull request?

Backport #14269 to 2.0.

## How was this patch tested?

Jenkins.

Author: Dhruve Ashar <dh...@gmail.com>

Closes #15222 from zsxwing/SPARK-15703-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e91a100
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e91a100
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e91a100

Branch: refs/heads/branch-2.0
Commit: 9e91a1009e6f916245b4d4018de1664ea3decfe7
Parents: 5bc5b49
Author: Dhruve Ashar <dh...@gmail.com>
Authored: Fri Sep 23 14:59:53 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Fri Sep 23 14:59:53 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  4 +--
 .../apache/spark/internal/config/package.scala  |  5 ++++
 .../spark/scheduler/LiveListenerBus.scala       | 23 +++++++++------
 .../scheduler/EventLoggingListenerSuite.scala   |  4 +--
 .../spark/scheduler/SparkListenerSuite.scala    | 30 +++++++++++---------
 .../storage/BlockManagerReplicationSuite.scala  |  9 ++++--
 .../spark/storage/BlockManagerSuite.scala       |  6 ++--
 .../spark/ui/storage/StorageTabSuite.scala      | 11 +++----
 .../streaming/ReceivedBlockHandlerSuite.scala   |  5 +++-
 9 files changed, 60 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 251c16f..ffd1227 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -249,7 +249,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   def isStopped: Boolean = stopped.get()
 
   // An asynchronous listener bus for Spark events
-  private[spark] val listenerBus = new LiveListenerBus
+  private[spark] val listenerBus = new LiveListenerBus(this)
 
   // This function allows components created by SparkEnv to be mocked in unit tests:
   private[spark] def createSparkEnv(
@@ -2154,7 +2154,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
         }
     }
 
-    listenerBus.start(this)
+    listenerBus.start()
     _listenerBusStarted = true
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index f28a9a5..29f812a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -112,4 +112,9 @@ package object config {
   // To limit how many applications are shown in the History Server summary ui
   private[spark] val HISTORY_UI_MAX_APPS =
     ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
+    ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
+      .intConf
+      .createWithDefault(10000)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 1c21313..bfa3c40 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -22,7 +22,8 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.util.DynamicVariable
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.internal.config._
 import org.apache.spark.util.Utils
 
 /**
@@ -32,18 +33,24 @@ import org.apache.spark.util.Utils
  * has started will events be actually propagated to all attached listeners. This listener bus
  * is stopped when `stop()` is called, and it will drop further events after stopping.
  */
-private[spark] class LiveListenerBus extends SparkListenerBus {
+private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {
 
   self =>
 
   import LiveListenerBus._
 
-  private var sparkContext: SparkContext = null
-
   // Cap the capacity of the event queue so we get an explicit error (rather than
   // an OOM exception) if it's perpetually being added to more quickly than it's being drained.
-  private val EVENT_QUEUE_CAPACITY = 10000
-  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
+  private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
+  private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
+
+  private def validateAndGetQueueSize(): Int = {
+    val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
+    if (queueSize <= 0) {
+      throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
+    }
+    queueSize
+  }
 
   // Indicate if `start()` is called
   private val started = new AtomicBoolean(false)
@@ -96,11 +103,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus {
    * listens for any additional events asynchronously while the listener bus is still running.
    * This should only be called once.
    *
-   * @param sc Used to stop the SparkContext in case the listener thread dies.
    */
-  def start(sc: SparkContext): Unit = {
+  def start(): Unit = {
     if (started.compareAndSet(false, true)) {
-      sparkContext = sc
       listenerThread.start()
     } else {
       throw new IllegalStateException(s"$name already started!")

http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index c4c80b5..7f48592 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -142,14 +142,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
     extraConf.foreach { case (k, v) => conf.set(k, v) }
     val logName = compressionCodec.map("test-" + _).getOrElse("test")
     val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
-    val listenerBus = new LiveListenerBus
+    val listenerBus = new LiveListenerBus(sc)
     val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
       125L, "Mickey", None)
     val applicationEnd = SparkListenerApplicationEnd(1000L)
 
     // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
     eventLogger.start()
-    listenerBus.start(sc)
+    listenerBus.start()
     listenerBus.addListener(eventLogger)
     listenerBus.postToAll(applicationStart)
     listenerBus.postToAll(applicationEnd)

http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 5ba67af..e8a88d4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -37,13 +37,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
   val jobCompletionTime = 1421191296660L
 
   test("don't call sc.stop in listener") {
-    sc = new SparkContext("local", "SparkListenerSuite")
+    sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
     val listener = new SparkContextStoppingListener(sc)
-    val bus = new LiveListenerBus
+    val bus = new LiveListenerBus(sc)
     bus.addListener(listener)
 
     // Starting listener bus should flush all buffered events
-    bus.start(sc)
+    bus.start()
     bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
     bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
 
@@ -52,8 +52,9 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
   }
 
   test("basic creation and shutdown of LiveListenerBus") {
+    sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
     val counter = new BasicJobCounter
-    val bus = new LiveListenerBus
+    val bus = new LiveListenerBus(sc)
     bus.addListener(counter)
 
     // Listener bus hasn't started yet, so posting events should not increment counter
@@ -61,7 +62,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     assert(counter.count === 0)
 
     // Starting listener bus should flush all buffered events
-    bus.start(sc)
+    bus.start()
     bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(counter.count === 5)
 
@@ -72,14 +73,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
 
     // Listener bus must not be started twice
     intercept[IllegalStateException] {
-      val bus = new LiveListenerBus
-      bus.start(sc)
-      bus.start(sc)
+      val bus = new LiveListenerBus(sc)
+      bus.start()
+      bus.start()
     }
 
     // ... or stopped before starting
     intercept[IllegalStateException] {
-      val bus = new LiveListenerBus
+      val bus = new LiveListenerBus(sc)
       bus.stop()
     }
   }
@@ -106,12 +107,12 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
         drained = true
       }
     }
-
-    val bus = new LiveListenerBus
+    sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
+    val bus = new LiveListenerBus(sc)
     val blockingListener = new BlockingListener
 
     bus.addListener(blockingListener)
-    bus.start(sc)
+    bus.start()
     bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
 
     listenerStarted.acquire()
@@ -353,13 +354,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     val badListener = new BadListener
     val jobCounter1 = new BasicJobCounter
     val jobCounter2 = new BasicJobCounter
-    val bus = new LiveListenerBus
+    sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
+    val bus = new LiveListenerBus(sc)
 
     // Propagate events to bad listener first
     bus.addListener(badListener)
     bus.addListener(jobCounter1)
     bus.addListener(jobCounter2)
-    bus.start(sc)
+    bus.start()
 
     // Post events to all listeners, and wait until the queue is drained
     (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 31687e6..b9e3a36 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -38,7 +38,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.storage.StorageLevel._
 
 /** Testsuite that tests block replication in BlockManager */
-class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
+class BlockManagerReplicationSuite extends SparkFunSuite
+    with Matchers
+    with BeforeAndAfter
+    with LocalSparkContext {
 
   private val conf = new SparkConf(false).set("spark.app.id", "test")
   private var rpcEnv: RpcEnv = null
@@ -91,8 +94,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
     // to make cached peers refresh frequently
     conf.set("spark.storage.cachedPeersTtl", "10")
 
+    sc = new SparkContext("local", "test", conf)
     master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
-      new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
+      new BlockManagerMasterEndpoint(rpcEnv, true, conf,
+        new LiveListenerBus(sc))), conf, true)
     allStores.clear()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 6194d23..e93eee2 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -49,7 +49,7 @@ import org.apache.spark.util._
 import org.apache.spark.util.io.ChunkedByteBuffer
 
 class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
-  with PrivateMethodTester with ResetSystemProperties {
+  with PrivateMethodTester with LocalSparkContext with ResetSystemProperties {
 
   import BlockManagerSuite._
 
@@ -107,8 +107,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
     conf.set("spark.driver.port", rpcEnv.address.port.toString)
 
+    sc = new SparkContext("local", "test", conf)
     master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
-      new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
+      new BlockManagerMasterEndpoint(rpcEnv, true, conf,
+        new LiveListenerBus(sc))), conf, true)
 
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()

http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 411a0dd..f6c8418 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -19,15 +19,14 @@ package org.apache.spark.ui.storage
 
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.{SparkConf, SparkFunSuite, Success}
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark._
 import org.apache.spark.scheduler._
 import org.apache.spark.storage._
 
 /**
  * Test various functionality in the StorageListener that supports the StorageTab.
  */
-class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
+class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter {
   private var bus: LiveListenerBus = _
   private var storageStatusListener: StorageStatusListener = _
   private var storageListener: StorageListener = _
@@ -43,8 +42,10 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
   private val bm1 = BlockManagerId("big", "dog", 1)
 
   before {
-    bus = new LiveListenerBus
-    storageStatusListener = new StorageStatusListener(new SparkConf())
+    val conf = new SparkConf()
+    sc = new SparkContext("local", "test", conf)
+    bus = new LiveListenerBus(sc)
+    storageStatusListener = new StorageStatusListener(conf)
     storageListener = new StorageListener(storageStatusListener)
     bus.addListener(storageStatusListener)
     bus.addListener(storageListener)

http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 5d36292..7e66545 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -48,6 +48,7 @@ class ReceivedBlockHandlerSuite
   extends SparkFunSuite
   with BeforeAndAfter
   with Matchers
+  with LocalSparkContext
   with Logging {
 
   import WriteAheadLogBasedBlockHandler._
@@ -78,8 +79,10 @@ class ReceivedBlockHandlerSuite
     rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
     conf.set("spark.driver.port", rpcEnv.address.port.toString)
 
+    sc = new SparkContext("local", "test", conf)
     blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
-      new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
+      new BlockManagerMasterEndpoint(rpcEnv, true, conf,
+        new LiveListenerBus(sc))), conf, true)
 
     storageLevel = StorageLevel.MEMORY_ONLY_SER
     blockManager = createBlockManager(blockManagerSize, conf)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org