You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by JoshRosen <gi...@git.apache.org> on 2017/05/24 01:40:00 UTC

[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

GitHub user JoshRosen opened a pull request:

    https://github.com/apache/spark/pull/18083

    [SPARK-20863] Add metrics/instrumentation to LiveListenerBus

    ## What changes were proposed in this pull request?
    
    This patch adds Coda Hale metrics for instrumenting the `LiveListenerBus` in order to track the number of events received, dropped, and processed, as well as a timer to track the processing time per event. See the new `LiveListenerBusMetrics` for a complete description of the new metrics.
    
    ## How was this patch tested?
    
    New tests in SparkListenerSuite, including a test to ensure proper counting of dropped listener events.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JoshRosen/spark listener-bus-metrics

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/18083.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #18083
    
----
commit a1fb5a8f2e58fe774aabc76e9d1a6859cfa99370
Author: Josh Rosen <jo...@databricks.com>
Date:   2017-05-23T21:49:35Z

    WIP

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r118434907
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -226,3 +240,34 @@ private[spark] object LiveListenerBus {
       val name = "SparkListenerBus"
     }
     
    +private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source {
    +  override val sourceName: String = "LiveListenerBus"
    +  override val metricRegistry: MetricRegistry = new MetricRegistry
    +
    +  /**
    +   * The total number of events posted to the LiveListenerBus. This counts the number of times
    +   * that `post()` is called, which might be less than the total number of events processed in
    +   * case events are dropped.
    +   */
    +  val numEventsReceived: Counter = metricRegistry.counter(MetricRegistry.name("numEventsReceived"))
    +
    +  /**
    +   * The total number of events that were dropped without being delivered to listeners.
    +   */
    +  val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
    +
    +  /**
    +   * The amount of time taken to post a single event to all listeners.
    +   */
    +  val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
    +
    +  /**
    +   * The number of of messages waiting in the queue.
    +   */
    +  val queueSize: Gauge[Int] = {
    --- End diff --
    
    ah i see


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77758/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/18083


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r118413115
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -226,3 +240,34 @@ private[spark] object LiveListenerBus {
       val name = "SparkListenerBus"
     }
     
    +private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source {
    +  override val sourceName: String = "LiveListenerBus"
    +  override val metricRegistry: MetricRegistry = new MetricRegistry
    +
    +  /**
    +   * The total number of events posted to the LiveListenerBus. This counts the number of times
    +   * that `post()` is called, which might be less than the total number of events processed in
    +   * case events are dropped.
    +   */
    +  val numEventsReceived: Counter = metricRegistry.counter(MetricRegistry.name("numEventsReceived"))
    +
    +  /**
    +   * The total number of events that were dropped without being delivered to listeners.
    +   */
    +  val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
    +
    +  /**
    +   * The amount of time taken to post a single event to all listeners.
    +   */
    +  val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
    +
    +  /**
    +   * The number of of messages waiting in the queue.
    +   */
    +  val queueSize: Gauge[Int] = {
    --- End diff --
    
    do we need this metric? Users can easily get it by looking at the `spark.scheduler.listenerbus.eventqueue.size` config.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    **[Test build #77821 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77821/testReport)** for PR 18083 at commit [`76b669c`](https://github.com/apache/spark/commit/76b669ca6eb35a0cce4291702baa5d1f60adb467).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r118432135
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -226,3 +240,34 @@ private[spark] object LiveListenerBus {
       val name = "SparkListenerBus"
     }
     
    +private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source {
    +  override val sourceName: String = "LiveListenerBus"
    +  override val metricRegistry: MetricRegistry = new MetricRegistry
    +
    +  /**
    +   * The total number of events posted to the LiveListenerBus. This counts the number of times
    +   * that `post()` is called, which might be less than the total number of events processed in
    +   * case events are dropped.
    +   */
    +  val numEventsReceived: Counter = metricRegistry.counter(MetricRegistry.name("numEventsReceived"))
    +
    +  /**
    +   * The total number of events that were dropped without being delivered to listeners.
    +   */
    +  val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
    +
    +  /**
    +   * The amount of time taken to post a single event to all listeners.
    +   */
    +  val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
    +
    +  /**
    +   * The number of of messages waiting in the queue.
    +   */
    +  val queueSize: Gauge[Int] = {
    --- End diff --
    
    It's the queue's _capacity_ that's fixed. In https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html size refers to the number of items currently in the queue, whereas capacity refers to the maximum number of items that the queue can hold. I think the `spark.scheduler.listenerbus.eventqueue.size` configuration is confusingly named.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r120973638
  
    --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
    @@ -23,29 +23,41 @@ import scala.collection.JavaConverters._
     import scala.reflect.ClassTag
     import scala.util.control.NonFatal
     
    +import com.codahale.metrics.Timer
    +
     import org.apache.spark.internal.Logging
     
     /**
      * An event bus which posts events to its listeners.
      */
     private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
     
    +  private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Timer)]
    +
       // Marked `private[spark]` for access in tests.
    -  private[spark] val listeners = new CopyOnWriteArrayList[L]
    +  private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava
    +
    +  /**
    +   * Returns a CodaHale metrics Timer for measuring the listener's event processing time.
    +   * This method is intended to be overridden by subclasses.
    +   */
    +  protected def getTimer(listener: L): Option[Timer] = None
     
       /**
        * Add a listener to listen events. This method is thread-safe and can be called in any thread.
        */
       final def addListener(listener: L): Unit = {
    -    listeners.add(listener)
    +    listenersPlusTimers.add((listener, getTimer(listener).orNull))
       }
     
       /**
        * Remove a listener and it won't receive any events. This method is thread-safe and can be called
        * in any thread.
        */
       final def removeListener(listener: L): Unit = {
    -    listeners.remove(listener)
    +    listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
    +      listenersPlusTimers.remove(listenerAndTimer)
    --- End diff --
    
    since this is a `CopyOnWriteArrayList`, shall we just do a filter and create a new array?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    LGTM except one question


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77821/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    thanks, merging to master!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r118431597
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -33,25 +37,24 @@ import org.apache.spark.util.Utils
      * has started will events be actually propagated to all attached listeners. This listener bus
      * is stopped when `stop()` is called, and it will drop further events after stopping.
      */
    -private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {
    +private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
     
       self =>
     
       import LiveListenerBus._
     
    +  private var sparkContext: SparkContext = _
    +
       // Cap the capacity of the event queue so we get an explicit error (rather than
       // an OOM exception) if it's perpetually being added to more quickly than it's being drained.
    -  private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
    -  private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
    -
    -  private def validateAndGetQueueSize(): Int = {
    -    val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
    -    if (queueSize <= 0) {
    -      throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
    -    }
    -    queueSize
    +  private val eventQueue = {
    +    val capacity = conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
    +    require(capacity > 0, s"${LISTENER_BUS_EVENT_QUEUE_SIZE.key} must be > 0!")
    --- End diff --
    
    Nice, I didn't know about that. I'll move it in my next update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77756/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r118412901
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -33,25 +37,24 @@ import org.apache.spark.util.Utils
      * has started will events be actually propagated to all attached listeners. This listener bus
      * is stopped when `stop()` is called, and it will drop further events after stopping.
      */
    -private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {
    +private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
     
       self =>
     
       import LiveListenerBus._
     
    +  private var sparkContext: SparkContext = _
    +
       // Cap the capacity of the event queue so we get an explicit error (rather than
       // an OOM exception) if it's perpetually being added to more quickly than it's being drained.
    -  private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
    -  private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
    -
    -  private def validateAndGetQueueSize(): Int = {
    -    val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
    -    if (queueSize <= 0) {
    -      throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
    -    }
    -    queueSize
    +  private val eventQueue = {
    +    val capacity = conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
    +    require(capacity > 0, s"${LISTENER_BUS_EVENT_QUEUE_SIZE.key} must be > 0!")
    --- End diff --
    
    this constraint can be put in `LISTENER_BUS_EVENT_QUEUE_SIZE` with `TypedConfigBuilder.checkValue`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r121031573
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---
    @@ -138,6 +162,44 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
         assert(drained)
       }
     
    +  test("metrics for dropped listener events") {
    +    val bus = new LiveListenerBus(new SparkConf().set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 1))
    +
    +    val listenerStarted = new Semaphore(0)
    +    val listenerWait = new Semaphore(0)
    +
    +    bus.addListener(new SparkListener {
    +      override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
    +        listenerStarted.release()
    +        listenerWait.acquire()
    +      }
    +    })
    +
    +    bus.start(mockSparkContext, mockMetricsSystem)
    +
    +    // Post a message to the listener bus and wait for processing to begin:
    +    bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
    +    listenerStarted.acquire()
    --- End diff --
    
    This should be fine:
    
    - If this code runs before `listenerStarted.release()` then it will block until `listenerStarted.release()` is hit.
    - The listener will block in `listenerWait.acquire()` until we call `listenerWait.release()` further down in this method.
    
    This synchronization pattern is already used elsewhere in this suite in https://github.com/JoshRosen/spark/blob/76b669ca6eb35a0cce4291702baa5d1f60adb467/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala#L113



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r118413353
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -124,11 +136,13 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
           logError(s"$name has already stopped! Dropping event $event")
           return
         }
    +    metrics.numEventsReceived.inc()
         val eventAdded = eventQueue.offer(event)
         if (eventAdded) {
           eventLock.release()
         } else {
           onDropEvent(event)
    +      metrics.numDroppedEvents.inc()
    --- End diff --
    
    is it better to move this to `onDropEvent`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r118413314
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -124,11 +136,13 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
           logError(s"$name has already stopped! Dropping event $event")
           return
         }
    +    metrics.numEventsReceived.inc()
    --- End diff --
    
    here we also count dropped events?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r120972548
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -217,3 +243,61 @@ private[spark] object LiveListenerBus {
       val name = "SparkListenerBus"
     }
     
    +private[spark]
    +class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging {
    +  override val sourceName: String = "LiveListenerBus"
    +  override val metricRegistry: MetricRegistry = new MetricRegistry
    +
    +  /**
    +   * The total number of events posted to the LiveListenerBus. This is a count of the total number
    +   * of events which have been produced by the application and sent to the listener bus, NOT a
    +   * count of the number of events which have been processed and delivered to listeners (or dropped
    +   * without being delivered).
    +   */
    +  val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted"))
    +
    +  /**
    +   * The total number of events that were dropped without being delivered to listeners.
    +   */
    +  val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
    +
    +  /**
    +   * The amount of time taken to post a single event to all listeners.
    +   */
    +  val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
    +
    +  /**
    +   * The number of messages waiting in the queue.
    +   */
    +  val queueSize: Gauge[Int] = {
    +    metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{
    +      override def getValue: Int = queue.size()
    +    })
    +  }
    +
    +  // Guarded by synchronization.
    +  private val perListenerClassTimers = mutable.Map[String, Timer]()
    +
    +  /**
    +   * Returns a timer tracking the processing time of the given listener class.
    +   * events processed by that listener. This method is thread-safe.
    +   */
    +  def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = {
    --- End diff --
    
    why not just pass class name as a parameter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r120194864
  
    --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
    @@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
         // JavaConverters can create a JIterableWrapper if we use asScala.
         // However, this method will be called frequently. To avoid the wrapper cost, here we use
         // Java Iterator directly.
    -    val iter = listeners.iterator
    +    val iter = listenersPlusTimers.iterator
         while (iter.hasNext) {
    -      val listener = iter.next()
    +      val listenerAndMaybeTimer = iter.next()
    +      val listener = listenerAndMaybeTimer._1
    +      val maybeTimer = listenerAndMaybeTimer._2
    +      var maybeTimerContext = if (maybeTimer != null) {
    --- End diff --
    
    Yeah, this is just premature optimization. I'll undo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by bOOm-X <gi...@git.apache.org>.
Github user bOOm-X commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r119584011
  
    --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
    @@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
         // JavaConverters can create a JIterableWrapper if we use asScala.
         // However, this method will be called frequently. To avoid the wrapper cost, here we use
         // Java Iterator directly.
    -    val iter = listeners.iterator
    +    val iter = listenersPlusTimers.iterator
         while (iter.hasNext) {
    -      val listener = iter.next()
    +      val listenerAndMaybeTimer = iter.next()
    +      val listener = listenerAndMaybeTimer._1
    +      val maybeTimer = listenerAndMaybeTimer._2
    +      var maybeTimerContext = if (maybeTimer != null) {
    +        maybeTimer.time()
    +      } else {
    +        null
    +      }
           try {
             doPostEvent(listener, event)
           } catch {
             case NonFatal(e) =>
               logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
    +      } finally {
    +        if (maybeTimerContext != null) {
    --- End diff --
    
    Same. simpler with an option


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77395/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r120972760
  
    --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
    @@ -23,29 +23,41 @@ import scala.collection.JavaConverters._
     import scala.reflect.ClassTag
     import scala.util.control.NonFatal
     
    +import com.codahale.metrics.Timer
    +
     import org.apache.spark.internal.Logging
     
     /**
      * An event bus which posts events to its listeners.
      */
     private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
     
    +  private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Timer)]
    --- End diff --
    
    shall we use `Option[Timmer]` as value type?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    **[Test build #77277 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77277/testReport)** for PR 18083 at commit [`a1fb5a8`](https://github.com/apache/spark/commit/a1fb5a8f2e58fe774aabc76e9d1a6859cfa99370).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    **[Test build #77756 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77756/testReport)** for PR 18083 at commit [`4a083de`](https://github.com/apache/spark/commit/4a083decb7e817fab49f25f4f0fe119352525aa7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r121003615
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -217,3 +243,61 @@ private[spark] object LiveListenerBus {
       val name = "SparkListenerBus"
     }
     
    +private[spark]
    +class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging {
    +  override val sourceName: String = "LiveListenerBus"
    +  override val metricRegistry: MetricRegistry = new MetricRegistry
    +
    +  /**
    +   * The total number of events posted to the LiveListenerBus. This is a count of the total number
    +   * of events which have been produced by the application and sent to the listener bus, NOT a
    +   * count of the number of events which have been processed and delivered to listeners (or dropped
    +   * without being delivered).
    +   */
    +  val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted"))
    +
    +  /**
    +   * The total number of events that were dropped without being delivered to listeners.
    +   */
    +  val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
    +
    +  /**
    +   * The amount of time taken to post a single event to all listeners.
    +   */
    +  val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
    +
    +  /**
    +   * The number of messages waiting in the queue.
    +   */
    +  val queueSize: Gauge[Int] = {
    +    metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{
    +      override def getValue: Int = queue.size()
    +    })
    +  }
    +
    +  // Guarded by synchronization.
    +  private val perListenerClassTimers = mutable.Map[String, Timer]()
    +
    +  /**
    +   * Returns a timer tracking the processing time of the given listener class.
    +   * events processed by that listener. This method is thread-safe.
    +   */
    +  def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = {
    +    synchronized {
    +      val className = cls.getName
    +      val maxTimed = 128
    --- End diff --
    
    *Shrug*. Maybe, but note that this would be 128 _separate listener classes_. Let me put in an undocumented configuration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r118413299
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -226,3 +240,34 @@ private[spark] object LiveListenerBus {
       val name = "SparkListenerBus"
     }
     
    +private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source {
    +  override val sourceName: String = "LiveListenerBus"
    +  override val metricRegistry: MetricRegistry = new MetricRegistry
    +
    +  /**
    +   * The total number of events posted to the LiveListenerBus. This counts the number of times
    +   * that `post()` is called, which might be less than the total number of events processed in
    +   * case events are dropped.
    --- End diff --
    
    according to the code, we also count dropped events, isn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by bOOm-X <gi...@git.apache.org>.
Github user bOOm-X commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r119583821
  
    --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
    @@ -23,29 +23,41 @@ import scala.collection.JavaConverters._
     import scala.reflect.ClassTag
     import scala.util.control.NonFatal
     
    +import com.codahale.metrics.Timer
    +
     import org.apache.spark.internal.Logging
     
     /**
      * An event bus which posts events to its listeners.
      */
     private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
     
    +  private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Timer)]
    +
       // Marked `private[spark]` for access in tests.
    -  private[spark] val listeners = new CopyOnWriteArrayList[L]
    +  private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava
    +
    +  /**
    +   * Returns a CodaHale metrics Timer for measuring the listener's event processing time.
    +   * This method is intended to be overridden by subclasses.
    +   */
    +  protected def createTimer(listener: L): Option[Timer] = None
     
       /**
        * Add a listener to listen events. This method is thread-safe and can be called in any thread.
        */
       final def addListener(listener: L): Unit = {
    -    listeners.add(listener)
    +    listenersPlusTimers.add((listener, createTimer(listener).orNull))
    --- End diff --
    
    Why not keeping the option in the collection instead of putting null ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    **[Test build #77289 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77289/testReport)** for PR 18083 at commit [`a46c247`](https://github.com/apache/spark/commit/a46c24766fc2d533be82cc709948b37383e68121).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r118413024
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -226,3 +240,34 @@ private[spark] object LiveListenerBus {
       val name = "SparkListenerBus"
     }
     
    +private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source {
    +  override val sourceName: String = "LiveListenerBus"
    +  override val metricRegistry: MetricRegistry = new MetricRegistry
    +
    +  /**
    +   * The total number of events posted to the LiveListenerBus. This counts the number of times
    +   * that `post()` is called, which might be less than the total number of events processed in
    +   * case events are dropped.
    +   */
    +  val numEventsReceived: Counter = metricRegistry.counter(MetricRegistry.name("numEventsReceived"))
    +
    +  /**
    +   * The total number of events that were dropped without being delivered to listeners.
    +   */
    +  val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
    +
    +  /**
    +   * The amount of time taken to post a single event to all listeners.
    +   */
    +  val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
    +
    +  /**
    +   * The number of of messages waiting in the queue.
    --- End diff --
    
    nit: double `of` here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    **[Test build #77756 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77756/testReport)** for PR 18083 at commit [`4a083de`](https://github.com/apache/spark/commit/4a083decb7e817fab49f25f4f0fe119352525aa7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r120197916
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -111,6 +112,15 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
         }
       }
     
    +  override protected def createTimer(listener: SparkListenerInterface): Option[Timer] = {
    +    if (listener.getClass.getName.startsWith("org.apache.spark")) {
    --- End diff --
    
    This is accounted for in a later commit. All listeners are now captured.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r118628985
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -226,3 +246,61 @@ private[spark] object LiveListenerBus {
       val name = "SparkListenerBus"
     }
     
    +private[spark]
    +class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging {
    +  override val sourceName: String = "LiveListenerBus"
    +  override val metricRegistry: MetricRegistry = new MetricRegistry
    +
    +  /**
    +   * The total number of events posted to the LiveListenerBus. This is a count of the total number
    +   * of events which have been produced by the application and sent to the listener bus, NOT a
    +   * count of the number of events which have been processed and delivered to listeners (or dropped
    +   * without being delivered).
    +   */
    +  val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted"))
    +
    +  /**
    +   * The total number of events that were dropped without being delivered to listeners.
    +   */
    +  val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
    +
    +  /**
    +   * The amount of time taken to post a single event to all listeners.
    +   */
    +  val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
    +
    +  /**
    +   * The number of messages waiting in the queue.
    +   */
    +  val queueSize: Gauge[Int] = {
    +    metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{
    +      override def getValue: Int = queue.size()
    +    })
    +  }
    +
    +  // Guarded by synchronization.
    +  private val perListenerClassTimers = mutable.Map[String, Timer]()
    +
    +  /**
    +   * Returns a timer tracking the processing time of the given listener class.
    +   * events processed by that listener. This method is thread-safe.
    +   */
    +  def getTimerForListener(listener: SparkListenerInterface): Option[Timer] = {
    +    synchronized {
    +      val className = listener.getClass.getName
    --- End diff --
    
    Yes, but my goal with these metrics is to be able to identify which listeners are causing performance problems and for that purpose it's more useful to group listeners by class rather than to instrument individual listeners. Most (all?) of Spark's internal listeners have one instance per driver / SparkContext, so in practice keeping track of stats on a per-instance basis wouldn't actually be a meaningful difference in typical cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    Context for review: a large portion of the diff in this patch was undoing changes to the LiveListenerBus constructor and `start()` method which were introduced in #14269. That patch introduced a bunch of weird implicit initialization order constraints in the form of `lazy val`s which was complicating using those values in metrics gauges. See my comments over on that other PR for more details.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    **[Test build #77758 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77758/testReport)** for PR 18083 at commit [`d1a5e99`](https://github.com/apache/spark/commit/d1a5e991fb7fc3e7f93090c23d8088be8b650f61).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    **[Test build #77277 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77277/testReport)** for PR 18083 at commit [`a1fb5a8`](https://github.com/apache/spark/commit/a1fb5a8f2e58fe774aabc76e9d1a6859cfa99370).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r118623202
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -226,3 +246,61 @@ private[spark] object LiveListenerBus {
       val name = "SparkListenerBus"
     }
     
    +private[spark]
    +class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging {
    +  override val sourceName: String = "LiveListenerBus"
    +  override val metricRegistry: MetricRegistry = new MetricRegistry
    +
    +  /**
    +   * The total number of events posted to the LiveListenerBus. This is a count of the total number
    +   * of events which have been produced by the application and sent to the listener bus, NOT a
    +   * count of the number of events which have been processed and delivered to listeners (or dropped
    +   * without being delivered).
    +   */
    +  val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted"))
    +
    +  /**
    +   * The total number of events that were dropped without being delivered to listeners.
    +   */
    +  val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
    +
    +  /**
    +   * The amount of time taken to post a single event to all listeners.
    +   */
    +  val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
    +
    +  /**
    +   * The number of messages waiting in the queue.
    +   */
    +  val queueSize: Gauge[Int] = {
    +    metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{
    +      override def getValue: Int = queue.size()
    +    })
    +  }
    +
    +  // Guarded by synchronization.
    +  private val perListenerClassTimers = mutable.Map[String, Timer]()
    +
    +  /**
    +   * Returns a timer tracking the processing time of the given listener class.
    +   * events processed by that listener. This method is thread-safe.
    +   */
    +  def getTimerForListener(listener: SparkListenerInterface): Option[Timer] = {
    +    synchronized {
    +      val className = listener.getClass.getName
    --- End diff --
    
    is it possible that users register the same listener twice? Then the class name may not be a good identifier for listeners. I think this is the main problem of having listener-wise metrics, how to identify each listener?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r118432570
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -124,11 +136,13 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
           logError(s"$name has already stopped! Dropping event $event")
           return
         }
    +    metrics.numEventsReceived.inc()
         val eventAdded = eventQueue.offer(event)
         if (eventAdded) {
           eventLock.release()
         } else {
           onDropEvent(event)
    +      metrics.numDroppedEvents.inc()
    --- End diff --
    
    Sure, will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77305/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    **[Test build #77395 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77395/testReport)** for PR 18083 at commit [`60c7448`](https://github.com/apache/spark/commit/60c7448d2cff7dd809f9d75ff48b31e21b88a915).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r120972483
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -217,3 +243,61 @@ private[spark] object LiveListenerBus {
       val name = "SparkListenerBus"
     }
     
    +private[spark]
    +class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging {
    +  override val sourceName: String = "LiveListenerBus"
    +  override val metricRegistry: MetricRegistry = new MetricRegistry
    +
    +  /**
    +   * The total number of events posted to the LiveListenerBus. This is a count of the total number
    +   * of events which have been produced by the application and sent to the listener bus, NOT a
    +   * count of the number of events which have been processed and delivered to listeners (or dropped
    +   * without being delivered).
    +   */
    +  val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted"))
    +
    +  /**
    +   * The total number of events that were dropped without being delivered to listeners.
    +   */
    +  val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
    +
    +  /**
    +   * The amount of time taken to post a single event to all listeners.
    +   */
    +  val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
    +
    +  /**
    +   * The number of messages waiting in the queue.
    +   */
    +  val queueSize: Gauge[Int] = {
    +    metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{
    +      override def getValue: Int = queue.size()
    +    })
    +  }
    +
    +  // Guarded by synchronization.
    +  private val perListenerClassTimers = mutable.Map[String, Timer]()
    +
    +  /**
    +   * Returns a timer tracking the processing time of the given listener class.
    +   * events processed by that listener. This method is thread-safe.
    +   */
    +  def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = {
    +    synchronized {
    +      val className = cls.getName
    +      val maxTimed = 128
    --- End diff --
    
    should this be configurable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    > I am not sure that monitoring (with real metrics) the number of dropped events really worth it. You just want to know if messages have been dropped (and having the number in the log is fine).
    
    Even if the absolute number of dropped events doesn't matter that much I would still like to have this metric: it's simple to implement and being able to use my existing metrics-based monitoring infrastructure correlate dropped events with other signals can be helpful.
    
    > For the execution time of message processing it is very interesting, but not having the by listener or by event type breakdowns (just the global timing) will not allow to do a fine grained analysis and so not to do improvements.
    
    For now my timing is capturing the total time to process each message, counting the time to dequeue plus the aggregate time across all of the listeners. Given the current single-threaded processing strategy this is still a useful signal, even if not quite as useful as per-listener metrics. I agree that per-listener metrics would be more useful, though, so let me see if there's a clean refactoring to get the metrics at the per-listener level.
    
    > So putting the counters in ListenerBus is more appropriate for me. This will allows to not only monitor the LiveListenerBus, but the other one too (like: StreamingQueryListenerBus, StreamingListenerBus, ...)
    
    I considered this and I'll look into it, but it's less of a priority for me given that I'm mostly concerned about perf. bottlenecks in LiveListenerBus event delivery. The other listener busses don't queue/drop events and the two that you mentioned are actually wrapping `LiveListenerBus` and are both listener bus implementations as well as listeners themselves. Thus my cop-out suggestion is going to be to deal with those in a followup PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r121031515
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---
    @@ -138,6 +162,44 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
         assert(drained)
       }
     
    +  test("metrics for dropped listener events") {
    +    val bus = new LiveListenerBus(new SparkConf().set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 1))
    +
    +    val listenerStarted = new Semaphore(0)
    +    val listenerWait = new Semaphore(0)
    +
    +    bus.addListener(new SparkListener {
    +      override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
    +        listenerStarted.release()
    +        listenerWait.acquire()
    +      }
    +    })
    +
    +    bus.start(mockSparkContext, mockMetricsSystem)
    +
    +    // Post a message to the listener bus and wait for processing to begin:
    +    bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
    +    listenerStarted.acquire()
    --- End diff --
    
    actually the order doesn't matter, if `release` is called first, `acquire` won't block


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r118629022
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -226,3 +246,61 @@ private[spark] object LiveListenerBus {
       val name = "SparkListenerBus"
     }
     
    +private[spark]
    +class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging {
    +  override val sourceName: String = "LiveListenerBus"
    +  override val metricRegistry: MetricRegistry = new MetricRegistry
    +
    +  /**
    +   * The total number of events posted to the LiveListenerBus. This is a count of the total number
    +   * of events which have been produced by the application and sent to the listener bus, NOT a
    +   * count of the number of events which have been processed and delivered to listeners (or dropped
    +   * without being delivered).
    +   */
    +  val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted"))
    +
    +  /**
    +   * The total number of events that were dropped without being delivered to listeners.
    +   */
    +  val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
    +
    +  /**
    +   * The amount of time taken to post a single event to all listeners.
    +   */
    +  val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
    +
    +  /**
    +   * The number of messages waiting in the queue.
    +   */
    +  val queueSize: Gauge[Int] = {
    +    metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{
    +      override def getValue: Int = queue.size()
    +    })
    +  }
    +
    +  // Guarded by synchronization.
    +  private val perListenerClassTimers = mutable.Map[String, Timer]()
    +
    +  /**
    +   * Returns a timer tracking the processing time of the given listener class.
    +   * events processed by that listener. This method is thread-safe.
    +   */
    +  def getTimerForListener(listener: SparkListenerInterface): Option[Timer] = {
    +    synchronized {
    +      val className = listener.getClass.getName
    --- End diff --
    
    I'll update the PR description to discuss this per-listener metric.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r118431688
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -124,11 +136,13 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
           logError(s"$name has already stopped! Dropping event $event")
           return
         }
    +    metrics.numEventsReceived.inc()
    --- End diff --
    
    Yes. My idea was to have a counter which is incremented whenever an event is received, regardless of how it ends up being processed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r120999648
  
    --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
    @@ -23,29 +23,41 @@ import scala.collection.JavaConverters._
     import scala.reflect.ClassTag
     import scala.util.control.NonFatal
     
    +import com.codahale.metrics.Timer
    +
     import org.apache.spark.internal.Logging
     
     /**
      * An event bus which posts events to its listeners.
      */
     private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
     
    +  private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Timer)]
    +
       // Marked `private[spark]` for access in tests.
    -  private[spark] val listeners = new CopyOnWriteArrayList[L]
    +  private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava
    +
    +  /**
    +   * Returns a CodaHale metrics Timer for measuring the listener's event processing time.
    +   * This method is intended to be overridden by subclasses.
    +   */
    +  protected def getTimer(listener: L): Option[Timer] = None
     
       /**
        * Add a listener to listen events. This method is thread-safe and can be called in any thread.
        */
       final def addListener(listener: L): Unit = {
    -    listeners.add(listener)
    +    listenersPlusTimers.add((listener, getTimer(listener).orNull))
       }
     
       /**
        * Remove a listener and it won't receive any events. This method is thread-safe and can be called
        * in any thread.
        */
       final def removeListener(listener: L): Unit = {
    -    listeners.remove(listener)
    +    listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
    +      listenersPlusTimers.remove(listenerAndTimer)
    --- End diff --
    
    I think the only reason that `CopyOnWriteArrayList` was used was for thread-safety and fast performance for readers interleaved with very rare mutations / writes. If we were to replace the array list then we'd need to add a `synchronized` to guard the `listenersPlusTimers` field itself.
    
    Given the workload and access patterns here, I'm not sure that it's worth it to attempt to optimize this `removeListener()` method any further.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r120197363
  
    --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
    @@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
         // JavaConverters can create a JIterableWrapper if we use asScala.
         // However, this method will be called frequently. To avoid the wrapper cost, here we use
         // Java Iterator directly.
    -    val iter = listeners.iterator
    +    val iter = listenersPlusTimers.iterator
         while (iter.hasNext) {
    -      val listener = iter.next()
    +      val listenerAndMaybeTimer = iter.next()
    +      val listener = listenerAndMaybeTimer._1
    +      val maybeTimer = listenerAndMaybeTimer._2
    +      var maybeTimerContext = if (maybeTimer != null) {
    --- End diff --
    
    Actually, there is a cost here: allocating a new Option on every `postToAll` is going to create more allocations and method calls. Thus I'm going to leave this unchanged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    **[Test build #77305 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77305/testReport)** for PR 18083 at commit [`378206e`](https://github.com/apache/spark/commit/378206efb9f5c9628a678ba7defb536252f5cbcb).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77277/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77289/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by bOOm-X <gi...@git.apache.org>.
Github user bOOm-X commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r120619672
  
    --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
    @@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
         // JavaConverters can create a JIterableWrapper if we use asScala.
         // However, this method will be called frequently. To avoid the wrapper cost, here we use
         // Java Iterator directly.
    -    val iter = listeners.iterator
    +    val iter = listenersPlusTimers.iterator
         while (iter.hasNext) {
    -      val listener = iter.next()
    +      val listenerAndMaybeTimer = iter.next()
    +      val listener = listenerAndMaybeTimer._1
    +      val maybeTimer = listenerAndMaybeTimer._2
    +      var maybeTimerContext = if (maybeTimer != null) {
    --- End diff --
    
    Indeed ! But you can put the option in the collection listenersPlusTimers (instead of doing a orNull when you create the timer) and so you can use it without having to recreate one each time in the postToAll method 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by bOOm-X <gi...@git.apache.org>.
Github user bOOm-X commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r119585097
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -111,6 +112,15 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
         }
       }
     
    +  override protected def createTimer(listener: SparkListenerInterface): Option[Timer] = {
    +    if (listener.getClass.getName.startsWith("org.apache.spark")) {
    --- End diff --
    
    Why creating listener just for "spark" listener ? We may want timings even for "third-party" listeners. It is even more important in my mind, for these listeners because they can be much less optimized and so bring a huge performance  penalty 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    **[Test build #77305 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77305/testReport)** for PR 18083 at commit [`378206e`](https://github.com/apache/spark/commit/378206efb9f5c9628a678ba7defb536252f5cbcb).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by bOOm-X <gi...@git.apache.org>.
Github user bOOm-X commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r119583956
  
    --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
    @@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
         // JavaConverters can create a JIterableWrapper if we use asScala.
         // However, this method will be called frequently. To avoid the wrapper cost, here we use
         // Java Iterator directly.
    -    val iter = listeners.iterator
    +    val iter = listenersPlusTimers.iterator
         while (iter.hasNext) {
    -      val listener = iter.next()
    +      val listenerAndMaybeTimer = iter.next()
    +      val listener = listenerAndMaybeTimer._1
    +      val maybeTimer = listenerAndMaybeTimer._2
    +      var maybeTimerContext = if (maybeTimer != null) {
    --- End diff --
    
    With an option (instead of null value) it would be much simpler


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    **[Test build #77821 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77821/testReport)** for PR 18083 at commit [`76b669c`](https://github.com/apache/spark/commit/76b669ca6eb35a0cce4291702baa5d1f60adb467).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    **[Test build #77395 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77395/testReport)** for PR 18083 at commit [`60c7448`](https://github.com/apache/spark/commit/60c7448d2cff7dd809f9d75ff48b31e21b88a915).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging `
      * `          logError(s\"Not measuring processing time for listener class $className because a \" +`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    Okay, I took a shot at adding timing metrics on a per-listener-class basis. I'm not sure if my way of integrating these timers is the best, though, so let's hold of on merging this until we've had time to discuss it and do a round of revisions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r118431897
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -226,3 +240,34 @@ private[spark] object LiveListenerBus {
       val name = "SparkListenerBus"
     }
     
    +private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source {
    +  override val sourceName: String = "LiveListenerBus"
    +  override val metricRegistry: MetricRegistry = new MetricRegistry
    +
    +  /**
    +   * The total number of events posted to the LiveListenerBus. This counts the number of times
    +   * that `post()` is called, which might be less than the total number of events processed in
    +   * case events are dropped.
    --- End diff --
    
    Yes. I was perhaps wasn't explicit enough in the comment, so I'll reword it or just drop the second confusing half.
    
    Is it clearer if I say
    
    > This counts the number of times that `post()` has been called called, not the total number of events that have completed processing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    **[Test build #77758 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77758/testReport)** for PR 18083 at commit [`d1a5e99`](https://github.com/apache/spark/commit/d1a5e991fb7fc3e7f93090c23d8088be8b650f61).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18083#discussion_r121021258
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---
    @@ -138,6 +162,44 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
         assert(drained)
       }
     
    +  test("metrics for dropped listener events") {
    +    val bus = new LiveListenerBus(new SparkConf().set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 1))
    +
    +    val listenerStarted = new Semaphore(0)
    +    val listenerWait = new Semaphore(0)
    +
    +    bus.addListener(new SparkListener {
    +      override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
    +        listenerStarted.release()
    +        listenerWait.acquire()
    +      }
    +    })
    +
    +    bus.start(mockSparkContext, mockMetricsSystem)
    +
    +    // Post a message to the listener bus and wait for processing to begin:
    +    bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
    +    listenerStarted.acquire()
    --- End diff --
    
    is this deterministic that this line will be run before `listenerStarted.release()` in `onJobEnd`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by bOOm-X <gi...@git.apache.org>.
Github user bOOm-X commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    I think it is quite interesting to have performance counters on the dequeing process in the LiveListenerBus.
    
    I am not sure that monitoring (with real metrics) the number of dropped events really worth it. You just want to know if messages have been dropped (and having the number in the log is fine).
    
    I am agree that  having the number of messages in the queue is important.
    
    For the number of processed events, it is, in my mind, as for the number of dropped events not so important.
    
    For the execution time of message processing it is very interesting, but not having the by listener or by event type breakdowns (just the global timing) will not allow to do a fine grained analysis and so not to do improvements
    I think that it will be better to have this timings for each listener individually (More than the dequeing process itself, the performance improvements will be achieved at the listener level).
    So putting the counters in ListenerBus is more appropriate for me. This will allows to not only monitor the LiveListenerBus, but the other one too (like: StreamingQueryListenerBus, StreamingListenerBus, ...)
    
        



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18083: [SPARK-20863] Add metrics/instrumentation to LiveListene...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18083
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org