You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/03/14 14:16:50 UTC
[spark] branch master updated: [SPARK-26152] Synchronize Worker
Cleanup with Worker Shutdown
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2a04de5 [SPARK-26152] Synchronize Worker Cleanup with Worker Shutdown
2a04de5 is described below
commit 2a04de52dd7ea12ee6660ac1d385ba09617abf12
Author: Ajith <aj...@gmail.com>
AuthorDate: Thu Mar 14 09:16:29 2019 -0500
[SPARK-26152] Synchronize Worker Cleanup with Worker Shutdown
## What changes were proposed in this pull request?
The race between org.apache.spark.deploy.DeployMessages.WorkDirCleanup event and org.apache.spark.deploy.worker.Worker#onStop. Here its possible that while the WorkDirCleanup event is being processed, org.apache.spark.deploy.worker.Worker#cleanupThreadExecutor was shutdown. hence any submission after ThreadPoolExecutor will result in java.util.concurrent.RejectedExecutionException
## How was this patch tested?
Manually
Closes #24056 from ajithme/workercleanup.
Authored-by: Ajith <aj...@gmail.com>
Signed-off-by: Sean Owen <se...@databricks.com>
---
.../org/apache/spark/deploy/worker/Worker.scala | 68 +++++++++++++---------
1 file changed, 39 insertions(+), 29 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 4bd7aaa..52892c3 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -450,27 +450,32 @@ private[deploy] class Worker(
// rpcEndpoint.
// Copy ids so that it can be used in the cleanup thread.
val appIds = (executors.values.map(_.appId) ++ drivers.values.map(_.driverId)).toSet
- val cleanupFuture = concurrent.Future {
- val appDirs = workDir.listFiles()
- if (appDirs == null) {
- throw new IOException("ERROR: Failed to list files in " + appDirs)
- }
- appDirs.filter { dir =>
- // the directory is used by an application - check that the application is not running
- // when cleaning up
- val appIdFromDir = dir.getName
- val isAppStillRunning = appIds.contains(appIdFromDir)
- dir.isDirectory && !isAppStillRunning &&
- !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)
- }.foreach { dir =>
- logInfo(s"Removing directory: ${dir.getPath}")
- Utils.deleteRecursively(dir)
- }
- }(cleanupThreadExecutor)
+ try {
+ val cleanupFuture: concurrent.Future[Unit] = concurrent.Future {
+ val appDirs = workDir.listFiles()
+ if (appDirs == null) {
+ throw new IOException("ERROR: Failed to list files in " + appDirs)
+ }
+ appDirs.filter { dir =>
+ // the directory is used by an application - check that the application is not running
+ // when cleaning up
+ val appIdFromDir = dir.getName
+ val isAppStillRunning = appIds.contains(appIdFromDir)
+ dir.isDirectory && !isAppStillRunning &&
+ !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)
+ }.foreach { dir =>
+ logInfo(s"Removing directory: ${dir.getPath}")
+ Utils.deleteRecursively(dir)
+ }
+ }(cleanupThreadExecutor)
- cleanupFuture.failed.foreach(e =>
- logError("App dir cleanup failed: " + e.getMessage, e)
- )(cleanupThreadExecutor)
+ cleanupFuture.failed.foreach(e =>
+ logError("App dir cleanup failed: " + e.getMessage, e)
+ )(cleanupThreadExecutor)
+ } catch {
+ case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown =>
+ logWarning("Failed to cleanup work dir as executor pool was shutdown")
+ }
case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
@@ -634,15 +639,20 @@ private[deploy] class Worker(
val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id)
if (shouldCleanup) {
finishedApps -= id
- appDirectories.remove(id).foreach { dirList =>
- concurrent.Future {
- logInfo(s"Cleaning up local directories for application $id")
- dirList.foreach { dir =>
- Utils.deleteRecursively(new File(dir))
- }
- }(cleanupThreadExecutor).failed.foreach(e =>
- logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
- )(cleanupThreadExecutor)
+ try {
+ appDirectories.remove(id).foreach { dirList =>
+ concurrent.Future {
+ logInfo(s"Cleaning up local directories for application $id")
+ dirList.foreach { dir =>
+ Utils.deleteRecursively(new File(dir))
+ }
+ }(cleanupThreadExecutor).failed.foreach(e =>
+ logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
+ )(cleanupThreadExecutor)
+ }
+ } catch {
+ case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown =>
+ logWarning("Failed to cleanup application as executor pool was shutdown")
}
shuffleService.applicationRemoved(id)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org