You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/06/30 23:01:56 UTC

spark git commit: [SPARK-8372] Do not show applications that haven't recorded their app ID yet.

Repository: spark
Updated Branches:
  refs/heads/master 7dda0844e -> 4bb8375fc


[SPARK-8372] Do not show applications that haven't recorded their app ID yet.

Showing these applications may lead to weird behavior in the History Server. For old logs, if
the app ID is recorded later, you may end up with a duplicate entry. For new logs, the app might
be listed with a ".inprogress" suffix.

So ignore those, but still allow old applications that don't record app IDs at all (1.0 and 1.1) to be shown.

Author: Marcelo Vanzin <va...@cloudera.com>
Author: Carson Wang <ca...@intel.com>

Closes #7097 from vanzin/SPARK-8372 and squashes the following commits:

a24eab2 [Marcelo Vanzin] Feedback.
112ae8f [Marcelo Vanzin] Merge branch 'master' into SPARK-8372
7b91b74 [Marcelo Vanzin] Handle logs generated by 1.0 and 1.1.
1eca3fe [Carson Wang] [SPARK-8372] History server shows incorrect information for application not started


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4bb8375f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4bb8375f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4bb8375f

Branch: refs/heads/master
Commit: 4bb8375fc2c6aa8342df03c3617aa97e7d01de3f
Parents: 7dda084
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Tue Jun 30 14:01:52 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Jun 30 14:01:52 2015 -0700

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      |  98 +++++++++++------
 .../deploy/history/FsHistoryProviderSuite.scala | 109 ++++++++++++++-----
 2 files changed, 147 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4bb8375f/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 5427a88..2cc465e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -83,12 +83,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   // List of application logs to be deleted by event log cleaner.
   private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
 
-  // Constants used to parse Spark 1.0.0 log directories.
-  private[history] val LOG_PREFIX = "EVENT_LOG_"
-  private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
-  private[history] val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
-  private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
-
   /**
    * Return a runnable that performs the given operation on the event logs.
    * This operation is expected to be executed periodically.
@@ -146,7 +140,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = {
     try {
       applications.get(appId).flatMap { appInfo =>
-        appInfo.attempts.find(_.attemptId == attemptId).map { attempt =>
+        appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt =>
           val replayBus = new ReplayListenerBus()
           val ui = {
             val conf = this.conf.clone()
@@ -155,20 +149,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
               HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime)
             // Do not call ui.bind() to avoid creating a new server for each application
           }
-
           val appListener = new ApplicationEventListener()
           replayBus.addListener(appListener)
           val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
-
-          ui.setAppName(s"${appInfo.name} ($appId)")
-
-          val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
-          ui.getSecurityManager.setAcls(uiAclsEnabled)
-          // make sure to set admin acls before view acls so they are properly picked up
-          ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
-          ui.getSecurityManager.setViewAcls(attempt.sparkUser,
-            appListener.viewAcls.getOrElse(""))
-          ui
+          appInfo.map { info =>
+            ui.setAppName(s"${info.name} ($appId)")
+
+            val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
+            ui.getSecurityManager.setAcls(uiAclsEnabled)
+            // make sure to set admin acls before view acls so they are properly picked up
+            ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
+            ui.getSecurityManager.setViewAcls(attempt.sparkUser,
+              appListener.viewAcls.getOrElse(""))
+            ui
+          }
         }
       }
     } catch {
@@ -282,8 +276,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     val newAttempts = logs.flatMap { fileStatus =>
       try {
         val res = replay(fileStatus, bus)
-        logInfo(s"Application log ${res.logPath} loaded successfully.")
-        Some(res)
+        res match {
+          case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
+          case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
+            "The application may have not started.")
+        }
+        res
       } catch {
         case e: Exception =>
           logError(
@@ -429,9 +427,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
 
   /**
    * Replays the events in the specified log file and returns information about the associated
-   * application.
+   * application. Return `None` if the application ID cannot be located.
    */
-  private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
+  private def replay(
+      eventLog: FileStatus,
+      bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
     val logPath = eventLog.getPath()
     logInfo(s"Replaying log path: $logPath")
     val logInput =
@@ -445,16 +445,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       val appCompleted = isApplicationCompleted(eventLog)
       bus.addListener(appListener)
       bus.replay(logInput, logPath.toString, !appCompleted)
-      new FsApplicationAttemptInfo(
-        logPath.getName(),
-        appListener.appName.getOrElse(NOT_STARTED),
-        appListener.appId.getOrElse(logPath.getName()),
-        appListener.appAttemptId,
-        appListener.startTime.getOrElse(-1L),
-        appListener.endTime.getOrElse(-1L),
-        getModificationTime(eventLog).get,
-        appListener.sparkUser.getOrElse(NOT_STARTED),
-        appCompleted)
+
+      // Without an app ID, new logs will render incorrectly in the listing page, so do not list or
+      // try to show their UI. Some old versions of Spark generate logs without an app ID, so let
+      // logs generated by those versions go through.
+      if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) {
+        Some(new FsApplicationAttemptInfo(
+          logPath.getName(),
+          appListener.appName.getOrElse(NOT_STARTED),
+          appListener.appId.getOrElse(logPath.getName()),
+          appListener.appAttemptId,
+          appListener.startTime.getOrElse(-1L),
+          appListener.endTime.getOrElse(-1L),
+          getModificationTime(eventLog).get,
+          appListener.sparkUser.getOrElse(NOT_STARTED),
+          appCompleted))
+      } else {
+        None
+      }
     } finally {
       logInput.close()
     }
@@ -529,10 +537,34 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     }
   }
 
+  /**
+   * Returns whether the version of Spark that generated logs records app IDs. App IDs were added
+   * in Spark 1.1.
+   */
+  private def sparkVersionHasAppId(entry: FileStatus): Boolean = {
+    if (isLegacyLogDirectory(entry)) {
+      fs.listStatus(entry.getPath())
+        .find { status => status.getPath().getName().startsWith(SPARK_VERSION_PREFIX) }
+        .map { status =>
+          val version = status.getPath().getName().substring(SPARK_VERSION_PREFIX.length())
+          version != "1.0" && version != "1.1"
+        }
+        .getOrElse(true)
+    } else {
+      true
+    }
+  }
+
 }
 
-private object FsHistoryProvider {
+private[history] object FsHistoryProvider {
   val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+  // Constants used to parse Spark 1.0.0 log directories.
+  val LOG_PREFIX = "EVENT_LOG_"
+  val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
+  val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
+  val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
 }
 
 private class FsApplicationAttemptInfo(

http://git-wip-us.apache.org/repos/asf/spark/blob/4bb8375f/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 09075ee..2a62450 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -39,6 +39,8 @@ import org.apache.spark.util.{JsonProtocol, ManualClock, Utils}
 
 class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
 
+  import FsHistoryProvider._
+
   private var testDir: File = null
 
   before {
@@ -67,7 +69,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     // Write a new-style application log.
     val newAppComplete = newLogFile("new1", None, inProgress = false)
     writeFile(newAppComplete, true, None,
-      SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
+      SparkListenerApplicationStart(newAppComplete.getName(), Some("new-app-complete"), 1L, "test",
+        None),
       SparkListenerApplicationEnd(5L)
       )
 
@@ -75,35 +78,30 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
       Some("lzf"))
     writeFile(newAppCompressedComplete, true, None,
-      SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
+      SparkListenerApplicationStart(newAppCompressedComplete.getName(), Some("new-complete-lzf"),
+        1L, "test", None),
       SparkListenerApplicationEnd(4L))
 
     // Write an unfinished app, new-style.
     val newAppIncomplete = newLogFile("new2", None, inProgress = true)
     writeFile(newAppIncomplete, true, None,
-      SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
+      SparkListenerApplicationStart(newAppIncomplete.getName(), Some("new-incomplete"), 1L, "test",
+        None)
       )
 
     // Write an old-style application log.
-    val oldAppComplete = new File(testDir, "old1")
-    oldAppComplete.mkdir()
-    createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
-    writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
-      SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
+    val oldAppComplete = writeOldLog("old1", "1.0", None, true,
+      SparkListenerApplicationStart("old1", Some("old-app-complete"), 2L, "test", None),
       SparkListenerApplicationEnd(3L)
       )
-    createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
 
     // Check for logs so that we force the older unfinished app to be loaded, to make
     // sure unfinished apps are also sorted correctly.
     provider.checkForLogs()
 
     // Write an unfinished app, old-style.
-    val oldAppIncomplete = new File(testDir, "old2")
-    oldAppIncomplete.mkdir()
-    createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
-    writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
-      SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
+    val oldAppIncomplete = writeOldLog("old2", "1.0", None, false,
+      SparkListenerApplicationStart("old2", None, 2L, "test", None)
       )
 
     // Force a reload of data from the log directory, and check that both logs are loaded.
@@ -124,16 +122,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
           List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
       }
 
-      list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
+      list(0) should be (makeAppInfo("new-app-complete", newAppComplete.getName(), 1L, 5L,
         newAppComplete.lastModified(), "test", true))
-      list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
-        "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
-        true))
-      list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
+      list(1) should be (makeAppInfo("new-complete-lzf", newAppCompressedComplete.getName(),
+        1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
+      list(2) should be (makeAppInfo("old-app-complete", oldAppComplete.getName(), 2L, 3L,
         oldAppComplete.lastModified(), "test", true))
-      list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
-        oldAppIncomplete.lastModified(), "test", false))
-      list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
+      list(3) should be (makeAppInfo(oldAppIncomplete.getName(), oldAppIncomplete.getName(), 2L,
+        -1L, oldAppIncomplete.lastModified(), "test", false))
+      list(4) should be (makeAppInfo("new-incomplete", newAppIncomplete.getName(), 1L, -1L,
         newAppIncomplete.lastModified(), "test", false))
 
       // Make sure the UI can be rendered.
@@ -155,12 +152,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
       val codec = if (valid) CompressionCodec.createCodec(new SparkConf(), codecName) else null
       val logDir = new File(testDir, codecName)
       logDir.mkdir()
-      createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
-      writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
+      createEmptyFile(new File(logDir, SPARK_VERSION_PREFIX + "1.0"))
+      writeFile(new File(logDir, LOG_PREFIX + "1"), false, Option(codec),
         SparkListenerApplicationStart("app2", None, 2L, "test", None),
         SparkListenerApplicationEnd(3L)
         )
-      createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
+      createEmptyFile(new File(logDir, COMPRESSION_CODEC_PREFIX + codecName))
 
       val logPath = new Path(logDir.getAbsolutePath())
       try {
@@ -180,12 +177,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
   test("SPARK-3697: ignore directories that cannot be read.") {
     val logFile1 = newLogFile("new1", None, inProgress = false)
     writeFile(logFile1, true, None,
-      SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
+      SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
       SparkListenerApplicationEnd(2L)
       )
     val logFile2 = newLogFile("new2", None, inProgress = false)
     writeFile(logFile2, true, None,
-      SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
+      SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None),
       SparkListenerApplicationEnd(2L)
       )
     logFile2.setReadable(false, false)
@@ -218,6 +215,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     }
   }
 
+  test("Parse logs that application is not started") {
+    val provider = new FsHistoryProvider((createTestConf()))
+
+    val logFile1 = newLogFile("app1", None, inProgress = true)
+    writeFile(logFile1, true, None,
+      SparkListenerLogStart("1.4")
+    )
+    updateAndCheck(provider) { list =>
+      list.size should be (0)
+    }
+  }
+
   test("SPARK-5582: empty log directory") {
     val provider = new FsHistoryProvider(createTestConf())
 
@@ -373,6 +382,33 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     }
   }
 
+  test("SPARK-8372: new logs with no app ID are ignored") {
+    val provider = new FsHistoryProvider(createTestConf())
+
+    // Write a new log file without an app id, to make sure it's ignored.
+    val logFile1 = newLogFile("app1", None, inProgress = true)
+    writeFile(logFile1, true, None,
+      SparkListenerLogStart("1.4")
+    )
+
+    // Write a 1.2 log file with no start event (= no app id), it should be ignored.
+    writeOldLog("v12Log", "1.2", None, false)
+
+    // Write 1.0 and 1.1 logs, which don't have app ids.
+    writeOldLog("v11Log", "1.1", None, true,
+      SparkListenerApplicationStart("v11Log", None, 2L, "test", None),
+      SparkListenerApplicationEnd(3L))
+    writeOldLog("v10Log", "1.0", None, true,
+      SparkListenerApplicationStart("v10Log", None, 2L, "test", None),
+      SparkListenerApplicationEnd(4L))
+
+    updateAndCheck(provider) { list =>
+      list.size should be (2)
+      list(0).id should be ("v10Log")
+      list(1).id should be ("v11Log")
+    }
+  }
+
   /**
    * Asks the provider to check for logs and calls a function to perform checks on the updated
    * app list. Example:
@@ -412,4 +448,23 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
   }
 
+  private def writeOldLog(
+      fname: String,
+      sparkVersion: String,
+      codec: Option[CompressionCodec],
+      completed: Boolean,
+      events: SparkListenerEvent*): File = {
+    val log = new File(testDir, fname)
+    log.mkdir()
+
+    val oldEventLog = new File(log, LOG_PREFIX + "1")
+    createEmptyFile(new File(log, SPARK_VERSION_PREFIX + sparkVersion))
+    writeFile(new File(log, LOG_PREFIX + "1"), false, codec, events: _*)
+    if (completed) {
+      createEmptyFile(new File(log, APPLICATION_COMPLETE))
+    }
+
+    log
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org