You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2020/01/03 00:46:50 UTC

[spark] branch branch-2.4 updated: [SPARK-30285][CORE] Fix deadlock between LiveListenerBus#stop and AsyncEventQueue#removeListenerOnError

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 16f8fae  [SPARK-30285][CORE] Fix deadlock between LiveListenerBus#stop and AsyncEventQueue#removeListenerOnError
16f8fae is described below

commit 16f8fae01f329d4ba5786176c3c8dc4e648a8c22
Author: Wang Shuo <wa...@gmail.com>
AuthorDate: Thu Jan 2 16:40:22 2020 -0800

    [SPARK-30285][CORE] Fix deadlock between LiveListenerBus#stop and AsyncEventQueue#removeListenerOnError
    
    There is a deadlock between `LiveListenerBus#stop` and `AsyncEventQueue#removeListenerOnError`.
    
    We can reproduce as follows:
    
    1. Post some events to `LiveListenerBus`
    2. Call `LiveListenerBus#stop` and hold the synchronized lock of `bus`(https://github.com/apache/spark/blob/5e92301723464d0876b5a7eec59c15fed0c5b98c/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L229), waiting until all the events are processed by listeners, then remove all the queues
    3. Event queue would drain out events by posting to its listeners. If a listener is interrupted, it will call `AsyncEventQueue#removeListenerOnError`,  inside it will call `bus.removeListener`(https://github.com/apache/spark/blob/7b1b60c7583faca70aeab2659f06d4e491efa5c0/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala#L207), trying to acquire synchronized lock of bus, resulting in deadlock
    
    This PR  removes the `synchronized` from `LiveListenerBus.stop` because underlying data structures themselves are thread-safe.
    
    To fix deadlock.
    
    No.
    
    New UT.
    
    Closes #26924 from wangshuo128/event-queue-race-condition.
    
    Authored-by: Wang Shuo <wa...@gmail.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
    (cherry picked from commit 10cae04108c375a7f5ca7685fea593bd7f49f7a6)
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../apache/spark/scheduler/LiveListenerBus.scala   |  6 +-
 .../spark/scheduler/SparkListenerSuite.scala       | 70 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index d135190..1f42f09 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -215,10 +215,8 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
       return
     }
 
-    synchronized {
-      queues.asScala.foreach(_.stop())
-      queues.clear()
-    }
+    queues.asScala.foreach(_.stop())
+    queues.clear()
   }
 
   // For testing only.
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 6ffd1e8..0b843be 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -531,6 +531,47 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     }
   }
 
+  Seq(true, false).foreach { throwInterruptedException =>
+    val suffix = if (throwInterruptedException) "throw interrupt" else "set Thread interrupted"
+    test(s"SPARK-30285: Fix deadlock in AsyncEventQueue.removeListenerOnError: $suffix") {
+      val LISTENER_BUS_STOP_WAITING_TIMEOUT_MILLIS = 10 * 1000L // 10 seconds
+      val bus = new LiveListenerBus(new SparkConf(false))
+      val counter1 = new BasicJobCounter()
+      val counter2 = new BasicJobCounter()
+      val interruptingListener = new DelayInterruptingJobCounter(throwInterruptedException, 3)
+      bus.addToSharedQueue(counter1)
+      bus.addToSharedQueue(interruptingListener)
+      bus.addToEventLogQueue(counter2)
+      assert(bus.activeQueues() === Set(SHARED_QUEUE, EVENT_LOG_QUEUE))
+      assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
+      assert(bus.findListenersByClass[DelayInterruptingJobCounter]().size === 1)
+
+      bus.start(mockSparkContext, mockMetricsSystem)
+
+      (0 until 5).foreach { jobId =>
+        bus.post(SparkListenerJobEnd(jobId, jobCompletionTime, JobSucceeded))
+      }
+
+      // Call bus.stop in a separate thread, otherwise we will block here until bus is stopped
+      val stoppingThread = new Thread(new Runnable() {
+        override def run(): Unit = bus.stop()
+      })
+      stoppingThread.start()
+      // Notify interrupting listener starts to work
+      interruptingListener.sleep = false
+      // Wait for bus to stop
+      stoppingThread.join(LISTENER_BUS_STOP_WAITING_TIMEOUT_MILLIS)
+
+      // Stopping has been finished
+      assert(stoppingThread.isAlive === false)
+      // All queues are removed
+      assert(bus.activeQueues() === Set.empty)
+      assert(counter1.count === 5)
+      assert(counter2.count === 5)
+      assert(interruptingListener.count === 3)
+    }
+  }
+
   /**
    * Assert that the given list of numbers has an average that is greater than zero.
    */
@@ -601,6 +642,35 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
       }
     }
   }
+
+  /**
+   * A simple listener that works as follows:
+   * 1. sleep and wait when `sleep` is true
+   * 2. when `sleep` is false, start to work:
+   *    if it is interruptOnJobId, interrupt
+   *    else count SparkListenerJobEnd numbers
+   */
+  private class DelayInterruptingJobCounter(
+      val throwInterruptedException: Boolean,
+      val interruptOnJobId: Int) extends SparkListener {
+    @volatile var sleep = true
+    var count = 0
+
+    override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+      while (sleep) {
+        Thread.sleep(10)
+      }
+      if (interruptOnJobId == jobEnd.jobId) {
+        if (throwInterruptedException) {
+          throw new InterruptedException("got interrupted")
+        } else {
+          Thread.currentThread().interrupt()
+        }
+      } else {
+        count += 1
+      }
+    }
+  }
 }
 
 // These classes can't be declared inside of the SparkListenerSuite class because we don't want


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org