You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/04/23 23:20:24 UTC

spark git commit: [SPARK-6879] [HISTORYSERVER] check if app is completed before clean it up

Repository: spark
Updated Branches:
  refs/heads/master 3e91cc273 -> baa83a9a6


[SPARK-6879] [HISTORYSERVER] check if app is completed before clean it up

https://issues.apache.org/jira/browse/SPARK-6879

Use `applications` to replace `FileStatus`, and check if the app is completed before clean it up.
If an exception was throwed, add it to `applications` to wait for the next loop.

Author: WangTaoTheTonic <wa...@huawei.com>

Closes #5491 from WangTaoTheTonic/SPARK-6879 and squashes the following commits:

4a533eb [WangTaoTheTonic] treat ACE specially
cb45105 [WangTaoTheTonic] rebase
d4d5251 [WangTaoTheTonic] per Marcelo's comments
d7455d8 [WangTaoTheTonic] slightly change when delete file
b0abca5 [WangTaoTheTonic] use global var to store apps to clean
94adfe1 [WangTaoTheTonic] leave expired apps alone to be deleted
9872a9d [WangTaoTheTonic] use the right path
fdef4d6 [WangTaoTheTonic] check if app is completed before clean it up


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/baa83a9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/baa83a9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/baa83a9a

Branch: refs/heads/master
Commit: baa83a9a6769c5e119438d65d7264dceb8d743d5
Parents: 3e91cc2
Author: WangTaoTheTonic <wa...@huawei.com>
Authored: Thu Apr 23 17:20:17 2015 -0400
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Apr 23 17:20:17 2015 -0400

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 32 ++++++++++++--------
 1 file changed, 20 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/baa83a9a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 9847d59..a94ebf6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -35,7 +35,6 @@ import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{ThreadUtils, Utils}
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 
-
 /**
  * A class that provides application history from event logs stored in the file system.
  * This provider checks for new finished applications in the background periodically and
@@ -76,6 +75,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
   @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
     = new mutable.LinkedHashMap()
 
+  // List of applications to be deleted by event log cleaner.
+  private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
+
   // Constants used to parse Spark 1.0.0 log directories.
   private[history] val LOG_PREFIX = "EVENT_LOG_"
   private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
@@ -266,34 +268,40 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
    */
   private def cleanLogs(): Unit = {
     try {
-      val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
-        .getOrElse(Seq[FileStatus]())
       val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000
 
       val now = System.currentTimeMillis()
       val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
 
+      // Scan all logs from the log directory.
+      // Only completed applications older than the specified max age will be deleted.
       applications.values.foreach { info =>
-        if (now - info.lastUpdated <= maxAge) {
+        if (now - info.lastUpdated <= maxAge || !info.completed) {
           appsToRetain += (info.id -> info)
+        } else {
+          appsToClean += info
         }
       }
 
       applications = appsToRetain
 
-      // Scan all logs from the log directory.
-      // Only directories older than the specified max age will be deleted
-      statusList.foreach { dir =>
+      val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
+      appsToClean.foreach { info =>
         try {
-          if (now - dir.getModificationTime() > maxAge) {
-            // if path is a directory and set to  true,
-            // the directory is deleted else throws an exception
-            fs.delete(dir.getPath, true)
+          val path = new Path(logDir, info.logPath)
+          if (fs.exists(path)) {
+            fs.delete(path, true)
           }
         } catch {
-          case t: IOException => logError(s"IOException in cleaning logs of $dir", t)
+          case e: AccessControlException =>
+            logInfo(s"No permission to delete ${info.logPath}, ignoring.")
+          case t: IOException =>
+            logError(s"IOException in cleaning logs of ${info.logPath}", t)
+            leftToClean += info
         }
       }
+
+      appsToClean = leftToClean
     } catch {
       case t: Exception => logError("Exception in cleaning logs", t)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org