You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2018/01/19 19:26:55 UTC

spark git commit: [SPARK-20664][CORE] Delete stale application data from SHS.

Repository: spark
Updated Branches:
  refs/heads/master 07296a61c -> fed2139f0


[SPARK-20664][CORE] Delete stale application data from SHS.

Detect the deletion of event log files from storage, and remove
data about the related application attempt in the SHS.

Also contains code to fix SPARK-21571 based on code by ericvandenbergfb.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #20138 from vanzin/SPARK-20664.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fed2139f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fed2139f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fed2139f

Branch: refs/heads/master
Commit: fed2139f053fac4a9a6952ff0ab1cc2a5f657bd0
Parents: 07296a6
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri Jan 19 13:26:37 2018 -0600
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Fri Jan 19 13:26:37 2018 -0600

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 297 ++++++++++++-------
 .../deploy/history/FsHistoryProviderSuite.scala | 117 +++++++-
 .../deploy/history/HistoryServerSuite.scala     |   4 +-
 3 files changed, 306 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fed2139f/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 94c80eb..f9d0b5e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.history
 
 import java.io.{File, FileNotFoundException, IOException}
 import java.util.{Date, ServiceLoader, UUID}
-import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit}
+import java.util.concurrent.{ExecutorService, TimeUnit}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.JavaConverters._
@@ -29,7 +29,7 @@ import scala.xml.Node
 
 import com.fasterxml.jackson.annotation.JsonIgnore
 import com.google.common.io.ByteStreams
-import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
+import com.google.common.util.concurrent.MoreExecutors
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.fs.permission.FsAction
 import org.apache.hadoop.hdfs.DistributedFileSystem
@@ -116,8 +116,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   // Used by check event thread and clean log thread.
   // Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
   // and applications between check task and clean task.
-  private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
-    .setNameFormat("spark-history-task-%d").setDaemon(true).build())
+  private val pool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-history-task-%d")
 
   // The modification time of the newest log detected during the last scan.   Currently only
   // used for logging msgs (logs are re-scanned based on file size, rather than modtime)
@@ -174,7 +173,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
    * Fixed size thread pool to fetch and parse log files.
    */
   private val replayExecutor: ExecutorService = {
-    if (!conf.contains("spark.testing")) {
+    if (Utils.isTesting) {
       ThreadUtils.newDaemonFixedThreadPool(NUM_PROCESSING_THREADS, "log-replay-executor")
     } else {
       MoreExecutors.sameThreadExecutor()
@@ -275,7 +274,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     try {
       Some(load(appId).toApplicationInfo())
     } catch {
-      case e: NoSuchElementException =>
+      case _: NoSuchElementException =>
         None
     }
   }
@@ -405,49 +404,70 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     try {
       val newLastScanTime = getNewLastScanTime()
       logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
-      // scan for modified applications, replay and merge them
-      val logInfos = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil)
+
+      val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil)
         .filter { entry =>
           !entry.isDirectory() &&
             // FsHistoryProvider generates a hidden file which can't be read.  Accidentally
             // reading a garbage file is safe, but we would log an error which can be scary to
             // the end-user.
             !entry.getPath().getName().startsWith(".") &&
-            SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) &&
-            recordedFileSize(entry.getPath()) < entry.getLen()
+            SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
+        }
+        .filter { entry =>
+          try {
+            val info = listing.read(classOf[LogInfo], entry.getPath().toString())
+            if (info.fileSize < entry.getLen()) {
+              // Log size has changed, it should be parsed.
+              true
+            } else {
+              // If the SHS view has a valid application, update the time the file was last seen so
+              // that the entry is not deleted from the SHS listing.
+              if (info.appId.isDefined) {
+                listing.write(info.copy(lastProcessed = newLastScanTime))
+              }
+              false
+            }
+          } catch {
+            case _: NoSuchElementException =>
+              // If the file is currently not being tracked by the SHS, add an entry for it and try
+              // to parse it. This will allow the cleaner code to detect the file as stale later on
+              // if it was not possible to parse it.
+              listing.write(LogInfo(entry.getPath().toString(), newLastScanTime, None, None,
+                entry.getLen()))
+              entry.getLen() > 0
+          }
         }
         .sortWith { case (entry1, entry2) =>
           entry1.getModificationTime() > entry2.getModificationTime()
         }
 
-      if (logInfos.nonEmpty) {
-        logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
+      if (updated.nonEmpty) {
+        logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.getPath)}")
       }
 
-      var tasks = mutable.ListBuffer[Future[_]]()
-
-      try {
-        for (file <- logInfos) {
-          tasks += replayExecutor.submit(new Runnable {
-            override def run(): Unit = mergeApplicationListing(file)
+      val tasks = updated.map { entry =>
+        try {
+          replayExecutor.submit(new Runnable {
+            override def run(): Unit = mergeApplicationListing(entry, newLastScanTime)
           })
+        } catch {
+          // let the iteration over the updated entries break, since an exception on
+          // replayExecutor.submit (..) indicates the ExecutorService is unable
+          // to take any more submissions at this time
+          case e: Exception =>
+            logError(s"Exception while submitting event log for replay", e)
+            null
         }
-      } catch {
-        // let the iteration over logInfos break, since an exception on
-        // replayExecutor.submit (..) indicates the ExecutorService is unable
-        // to take any more submissions at this time
-
-        case e: Exception =>
-          logError(s"Exception while submitting event log for replay", e)
-      }
+      }.filter(_ != null)
 
       pendingReplayTasksCount.addAndGet(tasks.size)
 
+      // Wait for all tasks to finish. This makes sure that checkForLogs
+      // is not scheduled again while some tasks are already running in
+      // the replayExecutor.
       tasks.foreach { task =>
         try {
-          // Wait for all tasks to finish. This makes sure that checkForLogs
-          // is not scheduled again while some tasks are already running in
-          // the replayExecutor.
           task.get()
         } catch {
           case e: InterruptedException =>
@@ -459,13 +479,70 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
         }
       }
 
+      // Delete all information about applications whose log files disappeared from storage.
+      // This is done by identifying the event logs which were not touched by the current
+      // directory scan.
+      //
+      // Only entries with valid applications are cleaned up here. Cleaning up invalid log
+      // files is done by the periodic cleaner task.
+      val stale = listing.view(classOf[LogInfo])
+        .index("lastProcessed")
+        .last(newLastScanTime - 1)
+        .asScala
+        .toList
+      stale.foreach { log =>
+        log.appId.foreach { appId =>
+          cleanAppData(appId, log.attemptId, log.logPath)
+          listing.delete(classOf[LogInfo], log.logPath)
+        }
+      }
+
       lastScanTime.set(newLastScanTime)
     } catch {
       case e: Exception => logError("Exception in checking for event log updates", e)
     }
   }
 
-  private def getNewLastScanTime(): Long = {
+  private def cleanAppData(appId: String, attemptId: Option[String], logPath: String): Unit = {
+    try {
+      val app = load(appId)
+      val (attempt, others) = app.attempts.partition(_.info.attemptId == attemptId)
+
+      assert(attempt.isEmpty || attempt.size == 1)
+      val isStale = attempt.headOption.exists { a =>
+        if (a.logPath != new Path(logPath).getName()) {
+          // If the log file name does not match, then probably the old log file was from an
+          // in progress application. Just return that the app should be left alone.
+          false
+        } else {
+          val maybeUI = synchronized {
+            activeUIs.remove(appId -> attemptId)
+          }
+
+          maybeUI.foreach { ui =>
+            ui.invalidate()
+            ui.ui.store.close()
+          }
+
+          diskManager.foreach(_.release(appId, attemptId, delete = true))
+          true
+        }
+      }
+
+      if (isStale) {
+        if (others.nonEmpty) {
+          val newAppInfo = new ApplicationInfoWrapper(app.info, others)
+          listing.write(newAppInfo)
+        } else {
+          listing.delete(classOf[ApplicationInfoWrapper], appId)
+        }
+      }
+    } catch {
+      case _: NoSuchElementException =>
+    }
+  }
+
+  private[history] def getNewLastScanTime(): Long = {
     val fileName = "." + UUID.randomUUID().toString
     val path = new Path(logDir, fileName)
     val fos = fs.create(path)
@@ -530,7 +607,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   /**
    * Replay the given log file, saving the application in the listing db.
    */
-  protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
+  protected def mergeApplicationListing(fileStatus: FileStatus, scanTime: Long): Unit = {
     val eventsFilter: ReplayEventsFilter = { eventString =>
       eventString.startsWith(APPL_START_EVENT_PREFIX) ||
         eventString.startsWith(APPL_END_EVENT_PREFIX) ||
@@ -544,73 +621,78 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     bus.addListener(listener)
     replay(fileStatus, bus, eventsFilter = eventsFilter)
 
-    listener.applicationInfo.foreach { app =>
-      // Invalidate the existing UI for the reloaded app attempt, if any. See LoadedAppUI for a
-      // discussion on the UI lifecycle.
-      synchronized {
-        activeUIs.get((app.info.id, app.attempts.head.info.attemptId)).foreach { ui =>
-          ui.invalidate()
-          ui.ui.store.close()
+    val (appId, attemptId) = listener.applicationInfo match {
+      case Some(app) =>
+        // Invalidate the existing UI for the reloaded app attempt, if any. See LoadedAppUI for a
+        // discussion on the UI lifecycle.
+        synchronized {
+          activeUIs.get((app.info.id, app.attempts.head.info.attemptId)).foreach { ui =>
+            ui.invalidate()
+            ui.ui.store.close()
+          }
         }
-      }
 
-      addListing(app)
+        addListing(app)
+        (Some(app.info.id), app.attempts.head.info.attemptId)
+
+      case _ =>
+        // If the app hasn't written down its app ID to the logs, still record the entry in the
+        // listing db, with an empty ID. This will make the log eligible for deletion if the app
+        // does not make progress after the configured max log age.
+        (None, None)
     }
-    listing.write(new LogInfo(logPath.toString(), fileStatus.getLen()))
+    listing.write(LogInfo(logPath.toString(), scanTime, appId, attemptId, fileStatus.getLen()))
   }
 
   /**
    * Delete event logs from the log directory according to the clean policy defined by the user.
    */
-  private[history] def cleanLogs(): Unit = {
-    var iterator: Option[KVStoreIterator[ApplicationInfoWrapper]] = None
-    try {
-      val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
-
-      // Iterate descending over all applications whose oldest attempt happened before maxTime.
-      iterator = Some(listing.view(classOf[ApplicationInfoWrapper])
-        .index("oldestAttempt")
-        .reverse()
-        .first(maxTime)
-        .closeableIterator())
-
-      iterator.get.asScala.foreach { app =>
-        // Applications may have multiple attempts, some of which may not need to be deleted yet.
-        val (remaining, toDelete) = app.attempts.partition { attempt =>
-          attempt.info.lastUpdated.getTime() >= maxTime
-        }
+  private[history] def cleanLogs(): Unit = Utils.tryLog {
+    val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
 
-        if (remaining.nonEmpty) {
-          val newApp = new ApplicationInfoWrapper(app.info, remaining)
-          listing.write(newApp)
-        }
+    val expired = listing.view(classOf[ApplicationInfoWrapper])
+      .index("oldestAttempt")
+      .reverse()
+      .first(maxTime)
+      .asScala
+      .toList
+    expired.foreach { app =>
+      // Applications may have multiple attempts, some of which may not need to be deleted yet.
+      val (remaining, toDelete) = app.attempts.partition { attempt =>
+        attempt.info.lastUpdated.getTime() >= maxTime
+      }
 
-        toDelete.foreach { attempt =>
-          val logPath = new Path(logDir, attempt.logPath)
-          try {
-            listing.delete(classOf[LogInfo], logPath.toString())
-          } catch {
-            case _: NoSuchElementException =>
-              logDebug(s"Log info entry for $logPath not found.")
-          }
-          try {
-            fs.delete(logPath, true)
-          } catch {
-            case e: AccessControlException =>
-              logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
-            case t: IOException =>
-              logError(s"IOException in cleaning ${attempt.logPath}", t)
-          }
-        }
+      if (remaining.nonEmpty) {
+        val newApp = new ApplicationInfoWrapper(app.info, remaining)
+        listing.write(newApp)
+      }
 
-        if (remaining.isEmpty) {
-          listing.delete(app.getClass(), app.id)
-        }
+      toDelete.foreach { attempt =>
+        logInfo(s"Deleting expired event log for ${attempt.logPath}")
+        val logPath = new Path(logDir, attempt.logPath)
+        listing.delete(classOf[LogInfo], logPath.toString())
+        cleanAppData(app.id, attempt.info.attemptId, logPath.toString())
+        deleteLog(logPath)
+      }
+
+      if (remaining.isEmpty) {
+        listing.delete(app.getClass(), app.id)
+      }
+    }
+
+    // Delete log files that don't have a valid application and exceed the configured max age.
+    val stale = listing.view(classOf[LogInfo])
+      .index("lastProcessed")
+      .reverse()
+      .first(maxTime)
+      .asScala
+      .toList
+    stale.foreach { log =>
+      if (log.appId.isEmpty) {
+        logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
+        deleteLog(new Path(log.logPath))
+        listing.delete(classOf[LogInfo], log.logPath)
       }
-    } catch {
-      case t: Exception => logError("Exception while cleaning logs", t)
-    } finally {
-      iterator.foreach(_.close())
     }
   }
 
@@ -631,12 +713,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     // an error the other way -- if we report a size bigger (ie later) than the file that is
     // actually read, we may never refresh the app.  FileStatus is guaranteed to be static
     // after it's created, so we get a file size that is no bigger than what is actually read.
-    val logInput = EventLoggingListener.openEventLog(logPath, fs)
-    try {
-      bus.replay(logInput, logPath.toString, !isCompleted, eventsFilter)
+    Utils.tryWithResource(EventLoggingListener.openEventLog(logPath, fs)) { in =>
+      bus.replay(in, logPath.toString, !isCompleted, eventsFilter)
       logInfo(s"Finished parsing $logPath")
-    } finally {
-      logInput.close()
     }
   }
 
@@ -703,18 +782,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
         |  application count=$count}""".stripMargin
   }
 
-  /**
-   * Return the last known size of the given event log, recorded the last time the file
-   * system scanner detected a change in the file.
-   */
-  private def recordedFileSize(log: Path): Long = {
-    try {
-      listing.read(classOf[LogInfo], log.toString()).fileSize
-    } catch {
-      case _: NoSuchElementException => 0L
-    }
-  }
-
   private def load(appId: String): ApplicationInfoWrapper = {
     listing.read(classOf[ApplicationInfoWrapper], appId)
   }
@@ -773,11 +840,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...")
     val lease = dm.lease(status.getLen(), isCompressed)
     val newStorePath = try {
-      val store = KVUtils.open(lease.tmpPath, metadata)
-      try {
+      Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata)) { store =>
         rebuildAppStore(store, status, attempt.info.lastUpdated.getTime())
-      } finally {
-        store.close()
       }
       lease.commit(appId, attempt.info.attemptId)
     } catch {
@@ -806,6 +870,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       throw new NoSuchElementException(s"Cannot find attempt $attemptId of $appId."))
   }
 
+  private def deleteLog(log: Path): Unit = {
+    try {
+      fs.delete(log, true)
+    } catch {
+      case _: AccessControlException =>
+        logInfo(s"No permission to delete $log, ignoring.")
+      case ioe: IOException =>
+        logError(s"IOException in cleaning $log", ioe)
+    }
+  }
+
 }
 
 private[history] object FsHistoryProvider {
@@ -832,8 +907,16 @@ private[history] case class FsHistoryProviderMetadata(
     uiVersion: Long,
     logDir: String)
 
+/**
+ * Tracking info for event logs detected in the configured log directory. Tracks both valid and
+ * invalid logs (e.g. unparseable logs, recorded as logs with no app ID) so that the cleaner
+ * can know what log files are safe to delete.
+ */
 private[history] case class LogInfo(
     @KVIndexParam logPath: String,
+    @KVIndexParam("lastProcessed") lastProcessed: Long,
+    appId: Option[String],
+    attemptId: Option[String],
     fileSize: Long)
 
 private[history] class AttemptInfoWrapper(

http://git-wip-us.apache.org/repos/asf/spark/blob/fed2139f/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 84ee01c..787de59 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hdfs.DistributedFileSystem
 import org.json4s.jackson.JsonMethods._
 import org.mockito.Matchers.any
-import org.mockito.Mockito.{mock, spy, verify}
+import org.mockito.Mockito.{doReturn, mock, spy, verify}
 import org.scalatest.BeforeAndAfter
 import org.scalatest.Matchers
 import org.scalatest.concurrent.Eventually._
@@ -149,8 +149,10 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
 
     class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) {
       var mergeApplicationListingCall = 0
-      override protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
-        super.mergeApplicationListing(fileStatus)
+      override protected def mergeApplicationListing(
+          fileStatus: FileStatus,
+          lastSeen: Long): Unit = {
+        super.mergeApplicationListing(fileStatus, lastSeen)
         mergeApplicationListingCall += 1
       }
     }
@@ -663,6 +665,115 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     freshUI.get.ui.store.job(0)
   }
 
+  test("clean up stale app information") {
+    val storeDir = Utils.createTempDir()
+    val conf = createTestConf().set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
+    val provider = spy(new FsHistoryProvider(conf))
+    val appId = "new1"
+
+    // Write logs for two app attempts.
+    doReturn(1L).when(provider).getNewLastScanTime()
+    val attempt1 = newLogFile(appId, Some("1"), inProgress = false)
+    writeFile(attempt1, true, None,
+      SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("1")),
+      SparkListenerJobStart(0, 1L, Nil, null),
+      SparkListenerApplicationEnd(5L)
+      )
+    val attempt2 = newLogFile(appId, Some("2"), inProgress = false)
+    writeFile(attempt2, true, None,
+      SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("2")),
+      SparkListenerJobStart(0, 1L, Nil, null),
+      SparkListenerApplicationEnd(5L)
+      )
+    updateAndCheck(provider) { list =>
+      assert(list.size === 1)
+      assert(list(0).id === appId)
+      assert(list(0).attempts.size === 2)
+    }
+
+    // Load the app's UI.
+    val ui = provider.getAppUI(appId, Some("1"))
+    assert(ui.isDefined)
+
+    // Delete the underlying log file for attempt 1 and rescan. The UI should go away, but since
+    // attempt 2 still exists, listing data should be there.
+    doReturn(2L).when(provider).getNewLastScanTime()
+    attempt1.delete()
+    updateAndCheck(provider) { list =>
+      assert(list.size === 1)
+      assert(list(0).id === appId)
+      assert(list(0).attempts.size === 1)
+    }
+    assert(!ui.get.valid)
+    assert(provider.getAppUI(appId, None) === None)
+
+    // Delete the second attempt's log file. Now everything should go away.
+    doReturn(3L).when(provider).getNewLastScanTime()
+    attempt2.delete()
+    updateAndCheck(provider) { list =>
+      assert(list.isEmpty)
+    }
+  }
+
+  test("SPARK-21571: clean up removes invalid history files") {
+    // TODO: "maxTime" becoming negative in cleanLogs() causes this test to fail, so avoid that
+    // until we figure out what's causing the problem.
+    val clock = new ManualClock(TimeUnit.DAYS.toMillis(120))
+    val conf = createTestConf().set(MAX_LOG_AGE_S.key, s"2d")
+    val provider = new FsHistoryProvider(conf, clock) {
+      override def getNewLastScanTime(): Long = clock.getTimeMillis()
+    }
+
+    // Create 0-byte size inprogress and complete files
+    var logCount = 0
+    var validLogCount = 0
+
+    val emptyInProgress = newLogFile("emptyInprogressLogFile", None, inProgress = true)
+    emptyInProgress.createNewFile()
+    emptyInProgress.setLastModified(clock.getTimeMillis())
+    logCount += 1
+
+    val slowApp = newLogFile("slowApp", None, inProgress = true)
+    slowApp.createNewFile()
+    slowApp.setLastModified(clock.getTimeMillis())
+    logCount += 1
+
+    val emptyFinished = newLogFile("emptyFinishedLogFile", None, inProgress = false)
+    emptyFinished.createNewFile()
+    emptyFinished.setLastModified(clock.getTimeMillis())
+    logCount += 1
+
+    // Create an incomplete log file, has an end record but no start record.
+    val corrupt = newLogFile("nonEmptyCorruptLogFile", None, inProgress = false)
+    writeFile(corrupt, true, None, SparkListenerApplicationEnd(0))
+    corrupt.setLastModified(clock.getTimeMillis())
+    logCount += 1
+
+    provider.checkForLogs()
+    provider.cleanLogs()
+    assert(new File(testDir.toURI).listFiles().size === logCount)
+
+    // Move the clock forward 1 day and scan the files again. They should still be there.
+    clock.advance(TimeUnit.DAYS.toMillis(1))
+    provider.checkForLogs()
+    provider.cleanLogs()
+    assert(new File(testDir.toURI).listFiles().size === logCount)
+
+    // Update the slow app to contain valid info. Code should detect the change and not clean
+    // it up.
+    writeFile(slowApp, true, None,
+      SparkListenerApplicationStart(slowApp.getName(), Some(slowApp.getName()), 1L, "test", None))
+    slowApp.setLastModified(clock.getTimeMillis())
+    validLogCount += 1
+
+    // Move the clock forward another 2 days and scan the files again. This time the cleaner should
+    // pick up the invalid files and get rid of them.
+    clock.advance(TimeUnit.DAYS.toMillis(2))
+    provider.checkForLogs()
+    provider.cleanLogs()
+    assert(new File(testDir.toURI).listFiles().size === validLogCount)
+  }
+
   /**
    * Asks the provider to check for logs and calls a function to perform checks on the updated
    * app list. Example:

http://git-wip-us.apache.org/repos/asf/spark/blob/fed2139f/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 87778dd..7aa60f2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -48,7 +48,7 @@ import org.apache.spark.deploy.history.config._
 import org.apache.spark.status.api.v1.ApplicationInfo
 import org.apache.spark.status.api.v1.JobData
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{ResetSystemProperties, Utils}
+import org.apache.spark.util.{ResetSystemProperties, ShutdownHookManager, Utils}
 
 /**
  * A collection of tests against the historyserver, including comparing responses from the json
@@ -564,7 +564,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     assert(jobcount === getNumJobs("/jobs"))
 
     // no need to retain the test dir now the tests complete
-    logDir.deleteOnExit()
+    ShutdownHookManager.registerShutdownDeleteDir(logDir)
   }
 
   test("ui and api authorization checks") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org