You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/06/17 22:42:42 UTC

spark git commit: [SPARK-8372] History server shows incorrect information for application not started

Repository: spark
Updated Branches:
  refs/heads/master 7ad8c5d86 -> 2837e0670


[SPARK-8372] History server shows incorrect information for application not started

The history server may show an incorrect App ID for an incomplete application like <App ID>.inprogress. This app info will never disappear even after the app is completed.
![incorrectappinfo](https://cloud.githubusercontent.com/assets/9278199/8156147/2a10fdbe-137d-11e5-9620-c5b61d93e3c1.png)

The cause of the issue is that a log path name is used as the app id when app id cannot be got during replay.

Author: Carson Wang <ca...@intel.com>

Closes #6827 from carsonwang/SPARK-8372 and squashes the following commits:

cdbb089 [Carson Wang] Fix code style
3e46b35 [Carson Wang] Update code style
90f5dde [Carson Wang] Add a unit test
d8c9cd0 [Carson Wang] Replaying events only return information when app is started


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2837e067
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2837e067
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2837e067

Branch: refs/heads/master
Commit: 2837e067099921dd4ab6639ac5f6e89f789d4ff4
Parents: 7ad8c5d
Author: Carson Wang <ca...@intel.com>
Authored: Wed Jun 17 13:41:36 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Jun 17 13:42:36 2015 -0700

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 38 ++++++++++-------
 .../deploy/history/FsHistoryProviderSuite.scala | 43 ++++++++++++++------
 2 files changed, 53 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2837e067/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 5427a88..db383b9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -160,7 +160,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
           replayBus.addListener(appListener)
           val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
 
-          ui.setAppName(s"${appInfo.name} ($appId)")
+          appInfo.foreach { app => ui.setAppName(s"${app.name} ($appId)") }
 
           val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
           ui.getSecurityManager.setAcls(uiAclsEnabled)
@@ -282,8 +282,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     val newAttempts = logs.flatMap { fileStatus =>
       try {
         val res = replay(fileStatus, bus)
-        logInfo(s"Application log ${res.logPath} loaded successfully.")
-        Some(res)
+        res match {
+          case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
+          case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
+            "The application may have not started.")
+        }
+        res
       } catch {
         case e: Exception =>
           logError(
@@ -429,9 +433,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
 
   /**
    * Replays the events in the specified log file and returns information about the associated
-   * application.
+   * application. Return `None` if the application ID cannot be located.
    */
-  private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
+  private def replay(
+      eventLog: FileStatus,
+      bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
     val logPath = eventLog.getPath()
     logInfo(s"Replaying log path: $logPath")
     val logInput =
@@ -445,16 +451,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       val appCompleted = isApplicationCompleted(eventLog)
       bus.addListener(appListener)
       bus.replay(logInput, logPath.toString, !appCompleted)
-      new FsApplicationAttemptInfo(
-        logPath.getName(),
-        appListener.appName.getOrElse(NOT_STARTED),
-        appListener.appId.getOrElse(logPath.getName()),
-        appListener.appAttemptId,
-        appListener.startTime.getOrElse(-1L),
-        appListener.endTime.getOrElse(-1L),
-        getModificationTime(eventLog).get,
-        appListener.sparkUser.getOrElse(NOT_STARTED),
-        appCompleted)
+      appListener.appId.map { appId =>
+        new FsApplicationAttemptInfo(
+          logPath.getName(),
+          appListener.appName.getOrElse(NOT_STARTED),
+          appId,
+          appListener.appAttemptId,
+          appListener.startTime.getOrElse(-1L),
+          appListener.endTime.getOrElse(-1L),
+          getModificationTime(eventLog).get,
+          appListener.sparkUser.getOrElse(NOT_STARTED),
+          appCompleted)
+      }
     } finally {
       logInput.close()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2837e067/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 09075ee..d3a6db5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -67,7 +67,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     // Write a new-style application log.
     val newAppComplete = newLogFile("new1", None, inProgress = false)
     writeFile(newAppComplete, true, None,
-      SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
+      SparkListenerApplicationStart(
+        "new-app-complete", Some("new-app-complete"), 1L, "test", None),
       SparkListenerApplicationEnd(5L)
       )
 
@@ -75,13 +76,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
       Some("lzf"))
     writeFile(newAppCompressedComplete, true, None,
-      SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
+      SparkListenerApplicationStart(
+        "new-app-compressed-complete", Some("new-app-compressed-complete"), 1L, "test", None),
       SparkListenerApplicationEnd(4L))
 
     // Write an unfinished app, new-style.
     val newAppIncomplete = newLogFile("new2", None, inProgress = true)
     writeFile(newAppIncomplete, true, None,
-      SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
+      SparkListenerApplicationStart(
+        "new-app-incomplete", Some("new-app-incomplete"), 1L, "test", None)
       )
 
     // Write an old-style application log.
@@ -89,7 +92,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     oldAppComplete.mkdir()
     createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
     writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
-      SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
+      SparkListenerApplicationStart(
+        "old-app-complete", Some("old-app-complete"), 2L, "test", None),
       SparkListenerApplicationEnd(3L)
       )
     createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
@@ -103,7 +107,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     oldAppIncomplete.mkdir()
     createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
     writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
-      SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
+      SparkListenerApplicationStart(
+        "old-app-incomplete", Some("old-app-incomplete"), 2L, "test", None)
       )
 
     // Force a reload of data from the log directory, and check that both logs are loaded.
@@ -124,16 +129,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
           List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
       }
 
-      list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
+      list(0) should be (makeAppInfo("new-app-complete", "new-app-complete", 1L, 5L,
         newAppComplete.lastModified(), "test", true))
-      list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
+      list(1) should be (makeAppInfo("new-app-compressed-complete",
         "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
         true))
-      list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
+      list(2) should be (makeAppInfo("old-app-complete", "old-app-complete", 2L, 3L,
         oldAppComplete.lastModified(), "test", true))
-      list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
+      list(3) should be (makeAppInfo("old-app-incomplete", "old-app-incomplete", 2L, -1L,
         oldAppIncomplete.lastModified(), "test", false))
-      list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
+      list(4) should be (makeAppInfo("new-app-incomplete", "new-app-incomplete", 1L, -1L,
         newAppIncomplete.lastModified(), "test", false))
 
       // Make sure the UI can be rendered.
@@ -157,7 +162,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
       logDir.mkdir()
       createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
       writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
-        SparkListenerApplicationStart("app2", None, 2L, "test", None),
+        SparkListenerApplicationStart("app2", Some("app2"), 2L, "test", None),
         SparkListenerApplicationEnd(3L)
         )
       createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
@@ -180,12 +185,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
   test("SPARK-3697: ignore directories that cannot be read.") {
     val logFile1 = newLogFile("new1", None, inProgress = false)
     writeFile(logFile1, true, None,
-      SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
+      SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
       SparkListenerApplicationEnd(2L)
       )
     val logFile2 = newLogFile("new2", None, inProgress = false)
     writeFile(logFile2, true, None,
-      SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
+      SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None),
       SparkListenerApplicationEnd(2L)
       )
     logFile2.setReadable(false, false)
@@ -218,6 +223,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     }
   }
 
+  test("Parse logs that application is not started") {
+    val provider = new FsHistoryProvider((createTestConf()))
+
+    val logFile1 = newLogFile("app1", None, inProgress = true)
+    writeFile(logFile1, true, None,
+      SparkListenerLogStart("1.4")
+    )
+    updateAndCheck(provider) { list =>
+      list.size should be (0)
+    }
+  }
+
   test("SPARK-5582: empty log directory") {
     val provider = new FsHistoryProvider(createTestConf())
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org