You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/03/14 14:16:50 UTC

[spark] branch master updated: [SPARK-26152] Synchronize Worker Cleanup with Worker Shutdown

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a04de5  [SPARK-26152] Synchronize Worker Cleanup with Worker Shutdown
2a04de5 is described below

commit 2a04de52dd7ea12ee6660ac1d385ba09617abf12
Author: Ajith <aj...@gmail.com>
AuthorDate: Thu Mar 14 09:16:29 2019 -0500

    [SPARK-26152] Synchronize Worker Cleanup with Worker Shutdown
    
    ## What changes were proposed in this pull request?
    
    The race between org.apache.spark.deploy.DeployMessages.WorkDirCleanup event and  org.apache.spark.deploy.worker.Worker#onStop. Here its possible that while the WorkDirCleanup event is being processed, org.apache.spark.deploy.worker.Worker#cleanupThreadExecutor was shutdown. hence any submission after ThreadPoolExecutor will result in java.util.concurrent.RejectedExecutionException
    
    ## How was this patch tested?
    
    Manually
    
    Closes #24056 from ajithme/workercleanup.
    
    Authored-by: Ajith <aj...@gmail.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../org/apache/spark/deploy/worker/Worker.scala    | 68 +++++++++++++---------
 1 file changed, 39 insertions(+), 29 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 4bd7aaa..52892c3 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -450,27 +450,32 @@ private[deploy] class Worker(
       // rpcEndpoint.
       // Copy ids so that it can be used in the cleanup thread.
       val appIds = (executors.values.map(_.appId) ++ drivers.values.map(_.driverId)).toSet
-      val cleanupFuture = concurrent.Future {
-        val appDirs = workDir.listFiles()
-        if (appDirs == null) {
-          throw new IOException("ERROR: Failed to list files in " + appDirs)
-        }
-        appDirs.filter { dir =>
-          // the directory is used by an application - check that the application is not running
-          // when cleaning up
-          val appIdFromDir = dir.getName
-          val isAppStillRunning = appIds.contains(appIdFromDir)
-          dir.isDirectory && !isAppStillRunning &&
-          !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)
-        }.foreach { dir =>
-          logInfo(s"Removing directory: ${dir.getPath}")
-          Utils.deleteRecursively(dir)
-        }
-      }(cleanupThreadExecutor)
+      try {
+        val cleanupFuture: concurrent.Future[Unit] = concurrent.Future {
+          val appDirs = workDir.listFiles()
+          if (appDirs == null) {
+            throw new IOException("ERROR: Failed to list files in " + appDirs)
+          }
+          appDirs.filter { dir =>
+            // the directory is used by an application - check that the application is not running
+            // when cleaning up
+            val appIdFromDir = dir.getName
+            val isAppStillRunning = appIds.contains(appIdFromDir)
+            dir.isDirectory && !isAppStillRunning &&
+              !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)
+          }.foreach { dir =>
+            logInfo(s"Removing directory: ${dir.getPath}")
+            Utils.deleteRecursively(dir)
+          }
+        }(cleanupThreadExecutor)
 
-      cleanupFuture.failed.foreach(e =>
-        logError("App dir cleanup failed: " + e.getMessage, e)
-      )(cleanupThreadExecutor)
+        cleanupFuture.failed.foreach(e =>
+          logError("App dir cleanup failed: " + e.getMessage, e)
+        )(cleanupThreadExecutor)
+      } catch {
+        case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown =>
+          logWarning("Failed to cleanup work dir as executor pool was shutdown")
+      }
 
     case MasterChanged(masterRef, masterWebUiUrl) =>
       logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
@@ -634,15 +639,20 @@ private[deploy] class Worker(
     val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id)
     if (shouldCleanup) {
       finishedApps -= id
-      appDirectories.remove(id).foreach { dirList =>
-        concurrent.Future {
-          logInfo(s"Cleaning up local directories for application $id")
-          dirList.foreach { dir =>
-            Utils.deleteRecursively(new File(dir))
-          }
-        }(cleanupThreadExecutor).failed.foreach(e =>
-          logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
-        )(cleanupThreadExecutor)
+      try {
+        appDirectories.remove(id).foreach { dirList =>
+          concurrent.Future {
+            logInfo(s"Cleaning up local directories for application $id")
+            dirList.foreach { dir =>
+              Utils.deleteRecursively(new File(dir))
+            }
+          }(cleanupThreadExecutor).failed.foreach(e =>
+            logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
+          )(cleanupThreadExecutor)
+        }
+      } catch {
+        case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown =>
+          logWarning("Failed to cleanup application as executor pool was shutdown")
       }
       shuffleService.applicationRemoved(id)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org