You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/05/22 19:04:58 UTC

[GitHub] [spark] dydeve commented on a change in pull request #24676: [SPARK-27804] LiveListenerBus may create multiple AsyncEventQueues with same name in concurrent scene

dydeve commented on a change in pull request #24676: [SPARK-27804] LiveListenerBus may create multiple AsyncEventQueues with same name in concurrent scene
URL: https://github.com/apache/spark/pull/24676#discussion_r286612817
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
 ##########
 @@ -86,44 +87,61 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
   }
 
   /**
-   * Add a listener to a specific queue, creating a new queue if needed. Queues are independent
-   * of each other (each one uses a separate thread for delivering events), allowing slower
-   * listeners to be somewhat isolated from others.
-   */
+    * Add a listener to a specific queue, creating a new queue if needed. Queues are independent
+    * of each other (each one uses a separate thread for delivering events), allowing slower
+    * listeners to be somewhat isolated from others.
+    *
+    * @param listener
+    * @param name name of AsyncEventQueue, @see [[AsyncEventQueue.name]] for more
+    */
   private[spark] def addToQueue(
       listener: SparkListenerInterface,
-      queue: String): Unit = synchronized {
 
 Review comment:
   Thak you. I ignored the word `synchronized`,I am careless.
   And I know the reason of using CopyOnWriteArrayList instead of ArrayList is to avoid `ConcurrentModificationException`.
   
   How about improving the usage of synchronized ?
   ```scala
     private[spark] def addToQueue(
         listener: SparkListenerInterface,
         queue: String): Unit = {
       if (stopped.get()) {//read barrier
         throw new IllegalStateException("LiveListenerBus is stopped.")
       }
   
       queues.asScala.find(_.name == queue) match {
         case Some(queue) =>
           queue.addListener(listener)
   
         case None =>
           this.synchronized {
   
             queues.asScala.find(_.name == queue) match {
               case Some(queue) =>
                 queue.addListener(listener)
   
               case None =>
                 val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
                 newQueue.addListener(listener)
                 if (started.get()) {
                   newQueue.start(sparkContext)
                 }
                 queues.add(newQueue)
             }
           }
   
       }
     }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org