You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/07/17 23:47:17 UTC

spark git commit: [SPARK-8593] [CORE] Sort app attempts by start time.

Repository: spark
Updated Branches:
  refs/heads/master 8b8be1f5d -> 42d8a012f


[SPARK-8593] [CORE] Sort app attempts by start time.

This makes sure attempts are listed in the order they were executed, and that the
app's state matches the state of the most current attempt.

Author: Joshi <re...@gmail.com>
Author: Rekha Joshi <re...@gmail.com>

Closes #7253 from rekhajoshm/SPARK-8593 and squashes the following commits:

874dd80 [Joshi] History Server: updated order for multiple attempts(logcleaner)
716e0b1 [Joshi] History Server: updated order for multiple attempts(descending start time works everytime)
548c753 [Joshi] History Server: updated order for multiple attempts(descending start time works everytime)
83306a8 [Joshi] History Server: updated order for multiple attempts(descending start time)
b0fc922 [Joshi] History Server: updated order for multiple attempts(updated comment)
cc0fda7 [Joshi] History Server: updated order for multiple attempts(updated test)
304cb0b [Joshi] History Server: updated order for multiple attempts(reverted HistoryPage)
85024e8 [Joshi] History Server: updated order for multiple attempts
a41ac4b [Joshi] History Server: updated order for multiple attempts
ab65fa1 [Joshi] History Server: some attempt completed to work with showIncomplete
0be142d [Rekha Joshi] Merge pull request #3 from apache/master
106fd8e [Rekha Joshi] Merge pull request #2 from apache/master
e3677c9 [Rekha Joshi] Merge pull request #1 from apache/master


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42d8a012
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42d8a012
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42d8a012

Branch: refs/heads/master
Commit: 42d8a012f6652df1fa3f560f87c53731ea070640
Parents: 8b8be1f
Author: Joshi <re...@gmail.com>
Authored: Fri Jul 17 22:47:28 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Jul 17 22:47:28 2015 +0100

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 10 +++-----
 .../deploy/history/FsHistoryProviderSuite.scala | 24 +++++++++-----------
 2 files changed, 14 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/42d8a012/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 2cc465e..e3060ac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -407,8 +407,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
 
   /**
    * Comparison function that defines the sort order for application attempts within the same
-   * application. Order is: running attempts before complete attempts, running attempts sorted
-   * by start time, completed attempts sorted by end time.
+   * application. Order is: attempts are sorted by descending start time.
+   * Most recent attempt state matches with current state of the app.
    *
    * Normally applications should have a single running attempt; but failure to call sc.stop()
    * may cause multiple running attempts to show up.
@@ -418,11 +418,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   private def compareAttemptInfo(
       a1: FsApplicationAttemptInfo,
       a2: FsApplicationAttemptInfo): Boolean = {
-    if (a1.completed == a2.completed) {
-      if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
-    } else {
-      !a1.completed
-    }
+    a1.startTime >= a2.startTime
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/42d8a012/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 2a62450..73cff89 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -243,13 +243,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     appListAfterRename.size should be (1)
   }
 
-  test("apps with multiple attempts") {
+  test("apps with multiple attempts with order") {
     val provider = new FsHistoryProvider(createTestConf())
 
-    val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false)
+    val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true)
     writeFile(attempt1, true, None,
-      SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
-      SparkListenerApplicationEnd(2L)
+      SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1"))
       )
 
     updateAndCheck(provider) { list =>
@@ -259,7 +258,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
 
     val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
     writeFile(attempt2, true, None,
-      SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2"))
+      SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", Some("attempt2"))
       )
 
     updateAndCheck(provider) { list =>
@@ -268,22 +267,21 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
       list.head.attempts.head.attemptId should be (Some("attempt2"))
     }
 
-    val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false)
-    attempt2.delete()
-    writeFile(attempt2, true, None,
-      SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
+    val attempt3 = newLogFile("app1", Some("attempt3"), inProgress = false)
+    writeFile(attempt3, true, None,
+      SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt3")),
       SparkListenerApplicationEnd(4L)
       )
 
     updateAndCheck(provider) { list =>
       list should not be (null)
       list.size should be (1)
-      list.head.attempts.size should be (2)
-      list.head.attempts.head.attemptId should be (Some("attempt2"))
+      list.head.attempts.size should be (3)
+      list.head.attempts.head.attemptId should be (Some("attempt3"))
     }
 
     val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
-    writeFile(attempt2, true, None,
+    writeFile(attempt1, true, None,
       SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
       SparkListenerApplicationEnd(6L)
       )
@@ -291,7 +289,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     updateAndCheck(provider) { list =>
       list.size should be (2)
       list.head.attempts.size should be (1)
-      list.last.attempts.size should be (2)
+      list.last.attempts.size should be (3)
       list.head.attempts.head.attemptId should be (Some("attempt1"))
 
       list.foreach { case app =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org