You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2022/08/29 23:48:22 UTC
[spark] branch master updated: [SPARK-40235][CORE] Use interruptible lock instead of synchronized in Executor.updateDependencies()
This is an automated email from the ASF dual-hosted git repository.
joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 295dd57c13c [SPARK-40235][CORE] Use interruptible lock instead of synchronized in Executor.updateDependencies()
295dd57c13c is described below
commit 295dd57c13caaa9f9e78cd46dfda4e17ced7c449
Author: Josh Rosen <jo...@databricks.com>
AuthorDate: Mon Aug 29 16:47:38 2022 -0700
[SPARK-40235][CORE] Use interruptible lock instead of synchronized in Executor.updateDependencies()
### What changes were proposed in this pull request?
This patch modifies the synchronization in `Executor.updateDependencies()` in order to allow tasks to be interrupted while they are blocked and waiting on other tasks to finish downloading dependencies.
This synchronization was added years ago in https://github.com/mesos/spark/commit/7b9e96c99206c0679d9925e0161fde738a5c7c3a in order to prevent concurrently-launching tasks from performing concurrent dependency updates. If one task is downloading dependencies, all other newly-launched tasks will block until the original dependency download is complete.
Let's say that a Spark task launches, becomes blocked on a `updateDependencies()` call, then is cancelled while it is blocked. Although Spark will send a `Thread.interrupt()` to the canceled task, the task will continue waiting because threads blocked on a `synchronized` won't throw an InterruptedException in response to the interrupt. As a result, the blocked thread will continue to wait until the other thread exits the synchronized block.
This PR aims to fix this problem by replacing the `synchronized` with a `ReentrantLock`, which has a `lockInterruptibly` method.
### Why are the changes needed?
In a real-world scenario, we hit a case where a task was canceled right after being launched while another task was blocked in a slow library download. The slow library download took so long that the TaskReaper killed the executor because the canceled task could not exit in a timely fashion. This patch's fix prevents this issue.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit test case.
Closes #37681 from JoshRosen/SPARK-40235-update-dependencies-lock.
Authored-by: Josh Rosen <jo...@databricks.com>
Signed-off-by: Josh Rosen <jo...@databricks.com>
---
.../scala/org/apache/spark/executor/Executor.scala | 22 +++++++--
.../org/apache/spark/executor/ExecutorSuite.scala | 53 ++++++++++++++++++++++
2 files changed, 72 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index ab2bd1b7801..db507bd176b 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer
import java.util.{Locale, Properties}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.locks.ReentrantLock
import javax.annotation.concurrent.GuardedBy
import javax.ws.rs.core.UriBuilder
@@ -85,6 +86,11 @@ private[spark] class Executor(
private[executor] val conf = env.conf
+ // SPARK-40235: updateDependencies() uses a ReentrantLock instead of the `synchronized` keyword
+ // so that tasks can exit quickly if they are interrupted while waiting on another task to
+ // finish downloading dependencies.
+ private val updateDependenciesLock = new ReentrantLock()
+
// No ip or host:port - just hostname
Utils.checkHost(executorHostname)
// must not have port specified.
@@ -978,13 +984,19 @@ private[spark] class Executor(
/**
* Download any missing dependencies if we receive a new set of files and JARs from the
* SparkContext. Also adds any new JARs we fetched to the class loader.
+ * Visible for testing.
*/
- private def updateDependencies(
+ private[executor] def updateDependencies(
newFiles: Map[String, Long],
newJars: Map[String, Long],
- newArchives: Map[String, Long]): Unit = {
+ newArchives: Map[String, Long],
+ testStartLatch: Option[CountDownLatch] = None,
+ testEndLatch: Option[CountDownLatch] = None): Unit = {
lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
- synchronized {
+ updateDependenciesLock.lockInterruptibly()
+ try {
+ // For testing, so we can simulate a slow file download:
+ testStartLatch.foreach(_.countDown())
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo(s"Fetching $name with timestamp $timestamp")
@@ -1027,6 +1039,10 @@ private[spark] class Executor(
}
}
}
+ // For testing, so we can simulate a slow file download:
+ testEndLatch.foreach(_.await())
+ } finally {
+ updateDependenciesLock.unlock()
}
}
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 14871efac5b..bef36d08e8a 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -514,6 +514,59 @@ class ExecutorSuite extends SparkFunSuite
}
}
+ test("SPARK-40235: updateDependencies is interruptible when waiting on lock") {
+ val conf = new SparkConf
+ val serializer = new JavaSerializer(conf)
+ val env = createMockEnv(conf, serializer)
+ withExecutor("id", "localhost", env) { executor =>
+ val startLatch = new CountDownLatch(1)
+ val endLatch = new CountDownLatch(1)
+
+ // Start a thread to simulate a task that begins executing updateDependencies()
+ // and takes a long time to finish because file download is slow:
+ val slowLibraryDownloadThread = new Thread(() => {
+ executor.updateDependencies(
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Some(startLatch),
+ Some(endLatch))
+ })
+ slowLibraryDownloadThread.start()
+
+ // Wait for that thread to acquire the lock:
+ startLatch.await()
+
+ // Start a second thread to simulate a task that blocks on the other task's
+ // dependency update:
+ val blockedLibraryDownloadThread = new Thread(() => {
+ executor.updateDependencies(
+ Map.empty,
+ Map.empty,
+ Map.empty)
+ })
+ blockedLibraryDownloadThread.start()
+ eventually(timeout(10.seconds), interval(100.millis)) {
+ val threadState = blockedLibraryDownloadThread.getState
+ assert(Set(Thread.State.BLOCKED, Thread.State.WAITING).contains(threadState))
+ }
+
+ // Interrupt the blocked thread:
+ blockedLibraryDownloadThread.interrupt()
+
+ // The thread should exit:
+ eventually(timeout(10.seconds), interval(100.millis)) {
+ assert(blockedLibraryDownloadThread.getState == Thread.State.TERMINATED)
+ }
+
+ // Allow the first thread to finish and exit:
+ endLatch.countDown()
+ eventually(timeout(10.seconds), interval(100.millis)) {
+ assert(slowLibraryDownloadThread.getState == Thread.State.TERMINATED)
+ }
+ }
+ }
+
private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = {
val mockEnv = mock[SparkEnv]
val mockRpcEnv = mock[RpcEnv]
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org