You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/09/26 17:44:39 UTC
spark git commit: [SPARK-17649][CORE] Log how many Spark events got
dropped in LiveListenerBus
Repository: spark
Updated Branches:
refs/heads/master f234b7cd7 -> bde85f8b7
[SPARK-17649][CORE] Log how many Spark events got dropped in LiveListenerBus
## What changes were proposed in this pull request?
Log how many Spark events got dropped in LiveListenerBus so that the user can get insights on how to set a correct event queue size.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <sh...@databricks.com>
Closes #15220 from zsxwing/SPARK-17649.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bde85f8b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bde85f8b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bde85f8b
Branch: refs/heads/master
Commit: bde85f8b70138a51052b613664facbc981378c38
Parents: f234b7c
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Mon Sep 26 10:44:35 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Mon Sep 26 10:44:35 2016 -0700
----------------------------------------------------------------------
.../spark/scheduler/LiveListenerBus.scala | 26 +++++++++++++++++++-
1 file changed, 25 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/bde85f8b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index bfa3c40..5533f7b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -18,7 +18,7 @@
package org.apache.spark.scheduler
import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.util.DynamicVariable
@@ -57,6 +57,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
// Indicate if `stop()` is called
private val stopped = new AtomicBoolean(false)
+ /** A counter for dropped events. It will be reset every time we log it. */
+ private val droppedEventsCounter = new AtomicLong(0L)
+
+ /** When `droppedEventsCounter` was logged last time in milliseconds. */
+ @volatile private var lastReportTimestamp = 0L
+
// Indicate if we are processing some event
// Guarded by `self`
private var processingEvent = false
@@ -123,6 +129,24 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
eventLock.release()
} else {
onDropEvent(event)
+ droppedEventsCounter.incrementAndGet()
+ }
+
+ val droppedEvents = droppedEventsCounter.get
+ if (droppedEvents > 0) {
+ // Don't log too frequently
+ if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
+ // There may be multiple threads trying to decrease droppedEventsCounter.
+ // Use "compareAndSet" to make sure only one thread can win.
+ // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
+ // then that thread will update it.
+ if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
+ val prevLastReportTimestamp = lastReportTimestamp
+ lastReportTimestamp = System.currentTimeMillis()
+ logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
+ new java.util.Date(prevLastReportTimestamp))
+ }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org