You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/09/23 21:59:56 UTC
spark git commit: [SPARK-15703][SCHEDULER][CORE][WEBUI] Make
ListenerBus event queue size configurable (branch 2.0)
Repository: spark
Updated Branches:
refs/heads/branch-2.0 5bc5b49fa -> 9e91a1009
[SPARK-15703][SCHEDULER][CORE][WEBUI] Make ListenerBus event queue size configurable (branch 2.0)
## What changes were proposed in this pull request?
Backport #14269 to 2.0.
## How was this patch tested?
Jenkins.
Author: Dhruve Ashar <dh...@gmail.com>
Closes #15222 from zsxwing/SPARK-15703-2.0.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e91a100
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e91a100
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e91a100
Branch: refs/heads/branch-2.0
Commit: 9e91a1009e6f916245b4d4018de1664ea3decfe7
Parents: 5bc5b49
Author: Dhruve Ashar <dh...@gmail.com>
Authored: Fri Sep 23 14:59:53 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Fri Sep 23 14:59:53 2016 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/SparkContext.scala | 4 +--
.../apache/spark/internal/config/package.scala | 5 ++++
.../spark/scheduler/LiveListenerBus.scala | 23 +++++++++------
.../scheduler/EventLoggingListenerSuite.scala | 4 +--
.../spark/scheduler/SparkListenerSuite.scala | 30 +++++++++++---------
.../storage/BlockManagerReplicationSuite.scala | 9 ++++--
.../spark/storage/BlockManagerSuite.scala | 6 ++--
.../spark/ui/storage/StorageTabSuite.scala | 11 +++----
.../streaming/ReceivedBlockHandlerSuite.scala | 5 +++-
9 files changed, 60 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 251c16f..ffd1227 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -249,7 +249,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def isStopped: Boolean = stopped.get()
// An asynchronous listener bus for Spark events
- private[spark] val listenerBus = new LiveListenerBus
+ private[spark] val listenerBus = new LiveListenerBus(this)
// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
@@ -2154,7 +2154,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}
- listenerBus.start(this)
+ listenerBus.start()
_listenerBusStarted = true
}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index f28a9a5..29f812a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -112,4 +112,9 @@ package object config {
// To limit how many applications are shown in the History Server summary ui
private[spark] val HISTORY_UI_MAX_APPS =
ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE)
+
+ private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
+ ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
+ .intConf
+ .createWithDefault(10000)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 1c21313..bfa3c40 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -22,7 +22,8 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.util.DynamicVariable
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
/**
@@ -32,18 +33,24 @@ import org.apache.spark.util.Utils
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when `stop()` is called, and it will drop further events after stopping.
*/
-private[spark] class LiveListenerBus extends SparkListenerBus {
+private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {
self =>
import LiveListenerBus._
- private var sparkContext: SparkContext = null
-
// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
- private val EVENT_QUEUE_CAPACITY = 10000
- private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
+ private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
+ private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
+
+ private def validateAndGetQueueSize(): Int = {
+ val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
+ if (queueSize <= 0) {
+ throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
+ }
+ queueSize
+ }
// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
@@ -96,11 +103,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus {
* listens for any additional events asynchronously while the listener bus is still running.
* This should only be called once.
*
- * @param sc Used to stop the SparkContext in case the listener thread dies.
*/
- def start(sc: SparkContext): Unit = {
+ def start(): Unit = {
if (started.compareAndSet(false, true)) {
- sparkContext = sc
listenerThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index c4c80b5..7f48592 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -142,14 +142,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
- val listenerBus = new LiveListenerBus
+ val listenerBus = new LiveListenerBus(sc)
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
eventLogger.start()
- listenerBus.start(sc)
+ listenerBus.start()
listenerBus.addListener(eventLogger)
listenerBus.postToAll(applicationStart)
listenerBus.postToAll(applicationEnd)
http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 5ba67af..e8a88d4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -37,13 +37,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val jobCompletionTime = 1421191296660L
test("don't call sc.stop in listener") {
- sc = new SparkContext("local", "SparkListenerSuite")
+ sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
val listener = new SparkContextStoppingListener(sc)
- val bus = new LiveListenerBus
+ val bus = new LiveListenerBus(sc)
bus.addListener(listener)
// Starting listener bus should flush all buffered events
- bus.start(sc)
+ bus.start()
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
@@ -52,8 +52,9 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}
test("basic creation and shutdown of LiveListenerBus") {
+ sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
val counter = new BasicJobCounter
- val bus = new LiveListenerBus
+ val bus = new LiveListenerBus(sc)
bus.addListener(counter)
// Listener bus hasn't started yet, so posting events should not increment counter
@@ -61,7 +62,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
assert(counter.count === 0)
// Starting listener bus should flush all buffered events
- bus.start(sc)
+ bus.start()
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter.count === 5)
@@ -72,14 +73,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Listener bus must not be started twice
intercept[IllegalStateException] {
- val bus = new LiveListenerBus
- bus.start(sc)
- bus.start(sc)
+ val bus = new LiveListenerBus(sc)
+ bus.start()
+ bus.start()
}
// ... or stopped before starting
intercept[IllegalStateException] {
- val bus = new LiveListenerBus
+ val bus = new LiveListenerBus(sc)
bus.stop()
}
}
@@ -106,12 +107,12 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
drained = true
}
}
-
- val bus = new LiveListenerBus
+ sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
+ val bus = new LiveListenerBus(sc)
val blockingListener = new BlockingListener
bus.addListener(blockingListener)
- bus.start(sc)
+ bus.start()
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
listenerStarted.acquire()
@@ -353,13 +354,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val badListener = new BadListener
val jobCounter1 = new BasicJobCounter
val jobCounter2 = new BasicJobCounter
- val bus = new LiveListenerBus
+ sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
+ val bus = new LiveListenerBus(sc)
// Propagate events to bad listener first
bus.addListener(badListener)
bus.addListener(jobCounter1)
bus.addListener(jobCounter2)
- bus.start(sc)
+ bus.start()
// Post events to all listeners, and wait until the queue is drained
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 31687e6..b9e3a36 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -38,7 +38,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.StorageLevel._
/** Testsuite that tests block replication in BlockManager */
-class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
+class BlockManagerReplicationSuite extends SparkFunSuite
+ with Matchers
+ with BeforeAndAfter
+ with LocalSparkContext {
private val conf = new SparkConf(false).set("spark.app.id", "test")
private var rpcEnv: RpcEnv = null
@@ -91,8 +94,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
// to make cached peers refresh frequently
conf.set("spark.storage.cachedPeersTtl", "10")
+ sc = new SparkContext("local", "test", conf)
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
- new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
+ new BlockManagerMasterEndpoint(rpcEnv, true, conf,
+ new LiveListenerBus(sc))), conf, true)
allStores.clear()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 6194d23..e93eee2 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -49,7 +49,7 @@ import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer
class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
- with PrivateMethodTester with ResetSystemProperties {
+ with PrivateMethodTester with LocalSparkContext with ResetSystemProperties {
import BlockManagerSuite._
@@ -107,8 +107,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set("spark.driver.port", rpcEnv.address.port.toString)
+ sc = new SparkContext("local", "test", conf)
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
- new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
+ new BlockManagerMasterEndpoint(rpcEnv, true, conf,
+ new LiveListenerBus(sc))), conf, true)
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 411a0dd..f6c8418 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -19,15 +19,14 @@ package org.apache.spark.ui.storage
import org.scalatest.BeforeAndAfter
-import org.apache.spark.{SparkConf, SparkFunSuite, Success}
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark._
import org.apache.spark.scheduler._
import org.apache.spark.storage._
/**
* Test various functionality in the StorageListener that supports the StorageTab.
*/
-class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
+class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter {
private var bus: LiveListenerBus = _
private var storageStatusListener: StorageStatusListener = _
private var storageListener: StorageListener = _
@@ -43,8 +42,10 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
private val bm1 = BlockManagerId("big", "dog", 1)
before {
- bus = new LiveListenerBus
- storageStatusListener = new StorageStatusListener(new SparkConf())
+ val conf = new SparkConf()
+ sc = new SparkContext("local", "test", conf)
+ bus = new LiveListenerBus(sc)
+ storageStatusListener = new StorageStatusListener(conf)
storageListener = new StorageListener(storageStatusListener)
bus.addListener(storageStatusListener)
bus.addListener(storageListener)
http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 5d36292..7e66545 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -48,6 +48,7 @@ class ReceivedBlockHandlerSuite
extends SparkFunSuite
with BeforeAndAfter
with Matchers
+ with LocalSparkContext
with Logging {
import WriteAheadLogBasedBlockHandler._
@@ -78,8 +79,10 @@ class ReceivedBlockHandlerSuite
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set("spark.driver.port", rpcEnv.address.port.toString)
+ sc = new SparkContext("local", "test", conf)
blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
- new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
+ new BlockManagerMasterEndpoint(rpcEnv, true, conf,
+ new LiveListenerBus(sc))), conf, true)
storageLevel = StorageLevel.MEMORY_ONLY_SER
blockManager = createBlockManager(blockManagerSize, conf)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org