You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/02/06 23:23:11 UTC

spark git commit: [SPARK-5600] [core] Clean up FsHistoryProvider test, fix app sort order.

Repository: spark
Updated Branches:
  refs/heads/master ca66159a4 -> 5687bab8f


[SPARK-5600] [core] Clean up FsHistoryProvider test, fix app sort order.

Clean up some test setup code to remove duplicate instantiation of the
provider. Also make sure unfinished apps are sorted correctly.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #4370 from vanzin/SPARK-5600 and squashes the following commits:

0d048d5 [Marcelo Vanzin] Cleanup test code a bit.
2585119 [Marcelo Vanzin] Review feedback.
8b97544 [Marcelo Vanzin] Merge branch 'master' into SPARK-5600
be979e9 [Marcelo Vanzin] Merge branch 'master' into SPARK-5600
298371c [Marcelo Vanzin] [SPARK-5600] [core] Clean up FsHistoryProvider test, fix app sort order.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5687bab8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5687bab8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5687bab8

Branch: refs/heads/master
Commit: 5687bab8fdfdc5345b8c5b9be8d4595299005fc8
Parents: ca66159
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri Feb 6 14:23:09 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Fri Feb 6 14:23:09 2015 -0800

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 15 +++-
 .../deploy/history/FsHistoryProviderSuite.scala | 85 +++++++++-----------
 2 files changed, 52 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5687bab8/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 92125f2..868c63d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -194,7 +194,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
               None
           }
         }
-        .sortBy { info => (-info.endTime, -info.startTime) }
+        .sortWith(compareAppInfo)
 
       lastModifiedTime = newLastModifiedTime
 
@@ -214,7 +214,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         val newIterator = logInfos.iterator.buffered
         val oldIterator = applications.values.iterator.buffered
         while (newIterator.hasNext && oldIterator.hasNext) {
-          if (newIterator.head.endTime > oldIterator.head.endTime) {
+          if (compareAppInfo(newIterator.head, oldIterator.head)) {
             addIfAbsent(newIterator.next)
           } else {
             addIfAbsent(oldIterator.next)
@@ -231,6 +231,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
   }
 
   /**
+   * Comparison function that defines the sort order for the application listing.
+   *
+   * @return Whether `i1` should precede `i2`.
+   */
+  private def compareAppInfo(
+      i1: FsApplicationHistoryInfo,
+      i2: FsApplicationHistoryInfo): Boolean = {
+    if (i1.endTime != i2.endTime) i1.endTime >= i2.endTime else i1.startTime >= i2.startTime
+  }
+
+  /**
    * Replays the events in the specified log file and returns information about the associated
    * application.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/5687bab8/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 1d95432..85939ea 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -37,13 +37,8 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
 
   private var testDir: File = null
 
-  private var provider: FsHistoryProvider = null
-
   before {
     testDir = Utils.createTempDir()
-    provider = new FsHistoryProvider(new SparkConf()
-      .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
-      .set("spark.history.fs.updateInterval", "0"))
   }
 
   after {
@@ -51,40 +46,41 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
   }
 
   test("Parse new and old application logs") {
-    val conf = new SparkConf()
-      .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
-      .set("spark.history.fs.updateInterval", "0")
-    val provider = new FsHistoryProvider(conf)
+    val provider = new FsHistoryProvider(createTestConf())
 
     // Write a new-style application log.
-    val logFile1 = new File(testDir, "new1")
-    writeFile(logFile1, true, None,
-      SparkListenerApplicationStart("app1-1", None, 1L, "test"),
-      SparkListenerApplicationEnd(2L)
+    val newAppComplete = new File(testDir, "new1")
+    writeFile(newAppComplete, true, None,
+      SparkListenerApplicationStart("new-app-complete", None, 1L, "test"),
+      SparkListenerApplicationEnd(4L)
       )
 
     // Write an unfinished app, new-style.
-    val logFile2 = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS)
-    writeFile(logFile2, true, None,
-      SparkListenerApplicationStart("app2-2", None, 1L, "test")
+    val newAppIncomplete = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS)
+    writeFile(newAppIncomplete, true, None,
+      SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test")
       )
 
     // Write an old-style application log.
-    val oldLog = new File(testDir, "old1")
-    oldLog.mkdir()
-    createEmptyFile(new File(oldLog, provider.SPARK_VERSION_PREFIX + "1.0"))
-    writeFile(new File(oldLog, provider.LOG_PREFIX + "1"), false, None,
-      SparkListenerApplicationStart("app3", None, 2L, "test"),
+    val oldAppComplete = new File(testDir, "old1")
+    oldAppComplete.mkdir()
+    createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
+    writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
+      SparkListenerApplicationStart("old-app-complete", None, 2L, "test"),
       SparkListenerApplicationEnd(3L)
       )
-    createEmptyFile(new File(oldLog, provider.APPLICATION_COMPLETE))
+    createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
+
+    // Check for logs so that we force the older unfinished app to be loaded, to make
+    // sure unfinished apps are also sorted correctly.
+    provider.checkForLogs()
 
     // Write an unfinished app, old-style.
-    val oldLog2 = new File(testDir, "old2")
-    oldLog2.mkdir()
-    createEmptyFile(new File(oldLog2, provider.SPARK_VERSION_PREFIX + "1.0"))
-    writeFile(new File(oldLog2, provider.LOG_PREFIX + "1"), false, None,
-      SparkListenerApplicationStart("app4", None, 2L, "test")
+    val oldAppIncomplete = new File(testDir, "old2")
+    oldAppIncomplete.mkdir()
+    createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
+    writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
+      SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test")
       )
 
     // Force a reload of data from the log directory, and check that both logs are loaded.
@@ -96,14 +92,14 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
     list.size should be (4)
     list.count(e => e.completed) should be (2)
 
-    list(0) should be (ApplicationHistoryInfo(oldLog.getName(), "app3", 2L, 3L,
-      oldLog.lastModified(), "test", true))
-    list(1) should be (ApplicationHistoryInfo(logFile1.getName(), "app1-1", 1L, 2L,
-      logFile1.lastModified(), "test", true))
-    list(2) should be (ApplicationHistoryInfo(oldLog2.getName(), "app4", 2L, -1L,
-      oldLog2.lastModified(), "test", false))
-     list(3) should be (ApplicationHistoryInfo(logFile2.getName(), "app2-2", 1L, -1L,
-      logFile2.lastModified(), "test", false))
+    list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 4L,
+      newAppComplete.lastModified(), "test", true))
+    list(1) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
+      oldAppComplete.lastModified(), "test", true))
+    list(2) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L,
+      -1L, oldAppIncomplete.lastModified(), "test", false))
+    list(3) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L,
+      -1L, newAppIncomplete.lastModified(), "test", false))
 
     // Make sure the UI can be rendered.
     list.foreach { case info =>
@@ -113,6 +109,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
   }
 
   test("Parse legacy logs with compression codec set") {
+    val provider = new FsHistoryProvider(createTestConf())
     val testCodecs = List((classOf[LZFCompressionCodec].getName(), true),
       (classOf[SnappyCompressionCodec].getName(), true),
       ("invalid.codec", false))
@@ -156,10 +153,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
       )
     logFile2.setReadable(false, false)
 
-    val conf = new SparkConf()
-      .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
-      .set("spark.history.fs.updateInterval", "0")
-    val provider = new FsHistoryProvider(conf)
+    val provider = new FsHistoryProvider(createTestConf())
     provider.checkForLogs()
 
     val list = provider.getListing().toSeq
@@ -168,10 +162,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
   }
 
   test("history file is renamed from inprogress to completed") {
-    val conf = new SparkConf()
-      .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
-      .set("spark.testing", "true")
-    val provider = new FsHistoryProvider(conf)
+    val provider = new FsHistoryProvider(createTestConf())
 
     val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
     writeFile(logFile1, true, None,
@@ -191,9 +182,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
   }
 
   test("SPARK-5582: empty log directory") {
-    val conf = new SparkConf()
-      .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
-    val provider = new FsHistoryProvider(conf)
+    val provider = new FsHistoryProvider(createTestConf())
 
     val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
     writeFile(logFile1, true, None,
@@ -229,4 +218,8 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
     new FileOutputStream(file).close()
   }
 
+  private def createTestConf(): SparkConf = {
+    new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org