You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/05/02 06:42:12 UTC

git commit: Add tests for FileLogger, EventLoggingListener, and ReplayListenerBus

Repository: spark
Updated Branches:
  refs/heads/master 40cf6d310 -> 394d8cb1c


Add tests for FileLogger, EventLoggingListener, and ReplayListenerBus

Modifications to Spark core are limited to exposing functionality to test files + minor style fixes.
(728 / 769 lines are from tests)

Author: Andrew Or <an...@gmail.com>

Closes #591 from andrewor14/event-log-tests and squashes the following commits:

2883837 [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests
c3afcea [Andrew Or] Compromise
2d5daf8 [Andrew Or] Use temp directory provided by the OS rather than /tmp
2b52151 [Andrew Or] Remove unnecessary file delete + add a comment
62010fd [Andrew Or] More cleanup (renaming variables, updating comments etc)
ad2beff [Andrew Or] Clean up EventLoggingListenerSuite + modify a few comments
862e752 [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests
e0ba2f8 [Andrew Or] Fix test failures caused by race condition in processing/mutating events
b990453 [Andrew Or] ReplayListenerBus suite - tests do not all pass yet
ab66a84 [Andrew Or] Tests for FileLogger + delete file after tests
187bb25 [Andrew Or] Formatting and renaming variables
769336f [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests
5d38ffe [Andrew Or] Clean up EventLoggingListenerSuite + add comments
e12f4b1 [Andrew Or] Preliminary tests for EventLoggingListener (need major cleanup)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/394d8cb1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/394d8cb1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/394d8cb1

Branch: refs/heads/master
Commit: 394d8cb1c4dfd1e496562009e716b8fc06be22cd
Parents: 40cf6d3
Author: Andrew Or <an...@gmail.com>
Authored: Thu May 1 21:42:06 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu May 1 21:42:06 2014 -0700

----------------------------------------------------------------------
 .../spark/scheduler/EventLoggingListener.scala  |  40 +-
 .../spark/scheduler/SparkListenerBus.scala      |   2 +-
 .../org/apache/spark/util/FileLogger.scala      |  28 +-
 .../org/apache/spark/util/JsonProtocol.scala    |  10 +-
 .../scala/org/apache/spark/util/Utils.scala     |  18 +-
 .../scheduler/EventLoggingListenerSuite.scala   | 400 +++++++++++++++++++
 .../spark/scheduler/ReplayListenerSuite.scala   | 166 ++++++++
 .../org/apache/spark/util/FileLoggerSuite.scala | 163 ++++++++
 8 files changed, 791 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/394d8cb1/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index d822a8e..7968a06 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -18,13 +18,16 @@
 package org.apache.spark.scheduler
 
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.fs.permission.FsPermission
+import org.json4s.JsonAST.JValue
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.util.{FileLogger, JsonProtocol}
 
@@ -40,31 +43,36 @@ import org.apache.spark.util.{FileLogger, JsonProtocol}
  */
 private[spark] class EventLoggingListener(
     appName: String,
-    conf: SparkConf,
-    hadoopConfiguration: Configuration)
+    sparkConf: SparkConf,
+    hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration())
   extends SparkListener with Logging {
 
   import EventLoggingListener._
 
-  private val shouldCompress = conf.getBoolean("spark.eventLog.compress", false)
-  private val shouldOverwrite = conf.getBoolean("spark.eventLog.overwrite", false)
-  private val outputBufferSize = conf.getInt("spark.eventLog.buffer.kb", 100) * 1024
-  private val logBaseDir = conf.get("spark.eventLog.dir", "/tmp/spark-events").stripSuffix("/")
+  private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
+  private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
+  private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
+  private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
+  private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
   private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
   val logDir = logBaseDir + "/" + name
 
-  private val logger =
-    new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
-      shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
+  protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
+    shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
+
+  // For testing. Keep track of all JSON serialized events that have been logged.
+  private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
 
   /**
    * Begin logging events.
    * If compression is used, log a file that indicates which compression library is used.
    */
   def start() {
+    logger.start()
     logInfo("Logging events to %s".format(logDir))
     if (shouldCompress) {
-      val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
+      val codec =
+        sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
       logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
     }
     logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
@@ -73,11 +81,14 @@ private[spark] class EventLoggingListener(
 
   /** Log the event as JSON. */
   private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
-    val eventJson = compact(render(JsonProtocol.sparkEventToJson(event)))
-    logger.logLine(eventJson)
+    val eventJson = JsonProtocol.sparkEventToJson(event)
+    logger.logLine(compact(render(eventJson)))
     if (flushLogger) {
       logger.flush()
     }
+    if (testing) {
+      loggedEvents += eventJson
+    }
   }
 
   // Events that do not trigger a flush
@@ -121,13 +132,12 @@ private[spark] class EventLoggingListener(
 }
 
 private[spark] object EventLoggingListener extends Logging {
+  val DEFAULT_LOG_DIR = "/tmp/spark-events"
   val LOG_PREFIX = "EVENT_LOG_"
   val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
   val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
   val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
-  val LOG_FILE_PERMISSIONS: FsPermission =
-    FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)
-
+  val LOG_FILE_PERMISSIONS = FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)
 
   // A cache for compression codecs to avoid creating the same codec many times
   private val codecMap = new mutable.HashMap[String, CompressionCodec]

http://git-wip-us.apache.org/repos/asf/spark/blob/394d8cb1/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index d6df193..0286aac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -37,7 +37,7 @@ private[spark] trait SparkListenerBus {
    * Post an event to all attached listeners. This does nothing if the event is
    * SparkListenerShutdown.
    */
-  protected def postToAll(event: SparkListenerEvent) {
+  def postToAll(event: SparkListenerEvent) {
     event match {
       case stageSubmitted: SparkListenerStageSubmitted =>
         sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))

http://git-wip-us.apache.org/repos/asf/spark/blob/394d8cb1/core/src/main/scala/org/apache/spark/util/FileLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index 0965e0f..0e6d21b 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.util
 
-import java.io.{FileOutputStream, BufferedOutputStream, PrintWriter, IOException}
+import java.io.{BufferedOutputStream, FileOutputStream, IOException, PrintWriter}
 import java.net.URI
 import java.text.SimpleDateFormat
 import java.util.Date
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
 import org.apache.hadoop.fs.permission.FsPermission
 
 import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io.CompressionCodec
 
 /**
@@ -39,8 +40,8 @@ import org.apache.spark.io.CompressionCodec
  */
 private[spark] class FileLogger(
     logDir: String,
-    conf: SparkConf,
-    hadoopConfiguration: Configuration,
+    sparkConf: SparkConf,
+    hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(),
     outputBufferSize: Int = 8 * 1024, // 8 KB
     compress: Boolean = false,
     overwrite: Boolean = true,
@@ -55,14 +56,19 @@ private[spark] class FileLogger(
   var fileIndex = 0
 
   // Only used if compression is enabled
-  private lazy val compressionCodec = CompressionCodec.createCodec(conf)
+  private lazy val compressionCodec = CompressionCodec.createCodec(sparkConf)
 
   // Only defined if the file system scheme is not local
   private var hadoopDataStream: Option[FSDataOutputStream] = None
 
   private var writer: Option[PrintWriter] = None
 
-  createLogDir()
+  /**
+   * Start this logger by creating the logging directory.
+   */
+  def start() {
+    createLogDir()
+  }
 
   /**
    * Create a logging directory with the given path.
@@ -83,7 +89,7 @@ private[spark] class FileLogger(
     }
     if (dirPermissions.isDefined) {
       val fsStatus = fileSystem.getFileStatus(path)
-      if (fsStatus.getPermission().toShort() != dirPermissions.get.toShort) {
+      if (fsStatus.getPermission.toShort != dirPermissions.get.toShort) {
         fileSystem.setPermission(path, dirPermissions.get)
       }
     }
@@ -92,14 +98,14 @@ private[spark] class FileLogger(
   /**
    * Create a new writer for the file identified by the given path.
    * If the permissions are not passed in, it will default to use the permissions
-   * (dirpermissions) used when class was instantiated.
+   * (dirPermissions) used when class was instantiated.
    */
   private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = {
     val logPath = logDir + "/" + fileName
     val uri = new URI(logPath)
-    val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme
-    val isDefaultLocal = (defaultFs == null || defaultFs == "file")
     val path = new Path(logPath)
+    val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+    val isDefaultLocal = defaultFs == null || defaultFs == "file"
 
     /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
      * Therefore, for local files, use FileOutputStream instead. */
@@ -112,7 +118,7 @@ private[spark] class FileLogger(
         hadoopDataStream.get
       }
 
-    perms.orElse(dirPermissions).foreach {p => fileSystem.setPermission(path, p)}
+    perms.orElse(dirPermissions).foreach { p => fileSystem.setPermission(path, p) }
     val bstream = new BufferedOutputStream(dstream, outputBufferSize)
     val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
     new PrintWriter(cstream)
@@ -127,7 +133,7 @@ private[spark] class FileLogger(
     val writeInfo = if (!withTime) {
       msg
     } else {
-      val date = new Date(System.currentTimeMillis())
+      val date = new Date(System.currentTimeMillis)
       dateFormat.get.format(date) + ": " + msg
     }
     writer.foreach(_.print(writeInfo))

http://git-wip-us.apache.org/repos/asf/spark/blob/394d8cb1/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 9aed3e0..0982508 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -646,11 +646,11 @@ private[spark] object JsonProtocol {
   }
 
   def propertiesFromJson(json: JValue): Properties = {
-    val properties = new Properties()
-    if (json != JNothing) {
-      mapFromJson(json).map { case (k, v) => properties.setProperty(k, v) }
-    }
-    properties
+    Utils.jsonOption(json).map { value =>
+      val properties = new Properties
+      mapFromJson(json).foreach { case (k, v) => properties.setProperty(k, v) }
+      properties
+    }.getOrElse(null)
   }
 
   def UUIDFromJson(json: JValue): UUID = {

http://git-wip-us.apache.org/repos/asf/spark/blob/394d8cb1/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 2c934a4..536a740 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1062,15 +1062,25 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * return true if this is Windows.
+   * Return the absolute path of a file in the given directory.
    */
-  def isWindows = Option(System.getProperty("os.name")).
-    map(_.startsWith("Windows")).getOrElse(false)
+  def getFilePath(dir: File, fileName: String): Path = {
+    assert(dir.isDirectory)
+    val path = new File(dir, fileName).getAbsolutePath
+    new Path(path)
+  }
+
+  /**
+   * Return true if this is Windows.
+   */
+  def isWindows = {
+    Option(System.getProperty("os.name")).exists(_.startsWith("Windows"))
+  }
 
   /**
    * Indicates whether Spark is currently running unit tests.
    */
-  private[spark] def isTesting = {
+  def isTesting = {
     sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/394d8cb1/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
new file mode 100644
index 0000000..95f5bcd
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import scala.io.Source
+import scala.util.Try
+
+import com.google.common.io.Files
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.json4s.jackson.JsonMethods._
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.{JsonProtocol, Utils}
+
+/**
+ * Test whether EventLoggingListener logs events properly.
+ *
+ * This tests whether EventLoggingListener actually creates special files while logging events,
+ * whether the parsing of these special files is correct, and whether the logged events can be
+ * read and deserialized into actual SparkListenerEvents.
+ */
+class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
+  private val fileSystem = Utils.getHadoopFileSystem("/")
+  private val allCompressionCodecs = Seq[String](
+    "org.apache.spark.io.LZFCompressionCodec",
+    "org.apache.spark.io.SnappyCompressionCodec"
+  )
+  private val testDir = Files.createTempDir()
+  private val logDirPath = Utils.getFilePath(testDir, "spark-events")
+
+  after {
+    Try { fileSystem.delete(logDirPath, true) }
+  }
+
+  test("Parse names of special files") {
+    testParsingFileName()
+  }
+
+  test("Verify special files exist") {
+    testSpecialFilesExist()
+  }
+
+  test("Verify special files exist with compression") {
+    allCompressionCodecs.foreach { codec =>
+      testSpecialFilesExist(compressionCodec = Some(codec))
+    }
+  }
+
+  test("Parse event logging info") {
+    testParsingLogInfo()
+  }
+
+  test("Parse event logging info with compression") {
+    allCompressionCodecs.foreach { codec =>
+      testParsingLogInfo(compressionCodec = Some(codec))
+    }
+  }
+
+  test("Basic event logging") {
+    testEventLogging()
+  }
+
+  test("Basic event logging with compression") {
+    allCompressionCodecs.foreach { codec =>
+      testEventLogging(compressionCodec = Some(codec))
+    }
+  }
+
+  test("End-to-end event logging") {
+    testApplicationEventLogging()
+  }
+
+  test("End-to-end event logging with compression") {
+    allCompressionCodecs.foreach { codec =>
+      testApplicationEventLogging(compressionCodec = Some(codec))
+    }
+  }
+
+
+  /* ----------------- *
+   * Actual test logic *
+   * ----------------- */
+
+  import EventLoggingListenerSuite._
+
+  /**
+   * Test whether names of special files are correctly identified and parsed.
+   */
+  private def testParsingFileName() {
+    val logPrefix = EventLoggingListener.LOG_PREFIX
+    val sparkVersionPrefix = EventLoggingListener.SPARK_VERSION_PREFIX
+    val compressionCodecPrefix = EventLoggingListener.COMPRESSION_CODEC_PREFIX
+    val applicationComplete = EventLoggingListener.APPLICATION_COMPLETE
+    assert(EventLoggingListener.isEventLogFile(logPrefix + "0"))
+    assert(EventLoggingListener.isEventLogFile(logPrefix + "100"))
+    assert(EventLoggingListener.isEventLogFile(logPrefix + "ANYTHING"))
+    assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "0.9.1"))
+    assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "1.0.0"))
+    assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "ANYTHING"))
+    assert(EventLoggingListener.isApplicationCompleteFile(applicationComplete))
+    allCompressionCodecs.foreach { codec =>
+      assert(EventLoggingListener.isCompressionCodecFile(compressionCodecPrefix + codec))
+    }
+
+    // Negatives
+    assert(!EventLoggingListener.isEventLogFile("The greatest man of all mankind"))
+    assert(!EventLoggingListener.isSparkVersionFile("Will never falter in the face of death!"))
+    assert(!EventLoggingListener.isCompressionCodecFile("Unless he chooses to leave behind"))
+    assert(!EventLoggingListener.isApplicationCompleteFile("The very treasure he calls Macbeth"))
+
+    // Verify that parsing is correct
+    assert(EventLoggingListener.parseSparkVersion(sparkVersionPrefix + "1.0.0") === "1.0.0")
+    allCompressionCodecs.foreach { codec =>
+      assert(EventLoggingListener.parseCompressionCodec(compressionCodecPrefix + codec) === codec)
+    }
+  }
+
+  /**
+   * Test whether the special files produced by EventLoggingListener exist.
+   *
+   * There should be exactly one event log and one spark version file throughout the entire
+   * execution. If a compression codec is specified, then the compression codec file should
+   * also exist. Only after the application has completed does the test expect the application
+   * completed file to be present.
+   */
+  private def testSpecialFilesExist(compressionCodec: Option[String] = None) {
+
+    def assertFilesExist(logFiles: Array[FileStatus], loggerStopped: Boolean) {
+      val numCompressionCodecFiles = if (compressionCodec.isDefined) 1 else 0
+      val numApplicationCompleteFiles = if (loggerStopped) 1 else 0
+      assert(logFiles.size === 2 + numCompressionCodecFiles + numApplicationCompleteFiles)
+      assert(eventLogsExist(logFiles))
+      assert(sparkVersionExists(logFiles))
+      assert(compressionCodecExists(logFiles) === compressionCodec.isDefined)
+      assert(applicationCompleteExists(logFiles) === loggerStopped)
+      assertSparkVersionIsValid(logFiles)
+      compressionCodec.foreach { codec =>
+        assertCompressionCodecIsValid(logFiles, codec)
+      }
+    }
+
+    // Verify logging directory exists
+    val conf = getLoggingConf(logDirPath, compressionCodec)
+    val eventLogger = new EventLoggingListener("test", conf)
+    eventLogger.start()
+    val logPath = new Path(eventLogger.logDir)
+    assert(fileSystem.exists(logPath))
+    val logDir = fileSystem.getFileStatus(logPath)
+    assert(logDir.isDir)
+
+    // Verify special files are as expected before stop()
+    var logFiles = fileSystem.listStatus(logPath)
+    assert(logFiles != null)
+    assertFilesExist(logFiles, loggerStopped = false)
+
+    // Verify special files are as expected after stop()
+    eventLogger.stop()
+    logFiles = fileSystem.listStatus(logPath)
+    assertFilesExist(logFiles, loggerStopped = true)
+  }
+
+  /**
+   * Test whether EventLoggingListener correctly parses the correct information from the logs.
+   *
+   * This includes whether it returns the correct Spark version, compression codec (if any),
+   * and the application's completion status.
+   */
+  private def testParsingLogInfo(compressionCodec: Option[String] = None) {
+
+    def assertInfoCorrect(info: EventLoggingInfo, loggerStopped: Boolean) {
+      assert(info.logPaths.size > 0)
+      assert(info.sparkVersion === SparkContext.SPARK_VERSION)
+      assert(info.compressionCodec.isDefined === compressionCodec.isDefined)
+      info.compressionCodec.foreach { codec =>
+        assert(compressionCodec.isDefined)
+        val expectedCodec = compressionCodec.get.split('.').last
+        assert(codec.getClass.getSimpleName === expectedCodec)
+      }
+      assert(info.applicationComplete === loggerStopped)
+    }
+
+    // Verify that all information is correctly parsed before stop()
+    val conf = getLoggingConf(logDirPath, compressionCodec)
+    val eventLogger = new EventLoggingListener("test", conf)
+    eventLogger.start()
+    var eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
+    assertInfoCorrect(eventLoggingInfo, loggerStopped = false)
+
+    // Verify that all information is correctly parsed after stop()
+    eventLogger.stop()
+    eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
+    assertInfoCorrect(eventLoggingInfo, loggerStopped = true)
+  }
+
+  /**
+   * Test basic event logging functionality.
+   *
+   * This creates two simple events, posts them to the EventLoggingListener, and verifies that
+   * exactly these two events are logged in the expected file.
+   */
+  private def testEventLogging(compressionCodec: Option[String] = None) {
+    val conf = getLoggingConf(logDirPath, compressionCodec)
+    val eventLogger = new EventLoggingListener("test", conf)
+    val listenerBus = new LiveListenerBus
+    val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey")
+    val applicationEnd = SparkListenerApplicationEnd(1000L)
+
+    // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
+    eventLogger.start()
+    listenerBus.start()
+    listenerBus.addListener(eventLogger)
+    listenerBus.postToAll(applicationStart)
+    listenerBus.postToAll(applicationEnd)
+
+    // Verify file contains exactly the two events logged
+    val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
+    assert(eventLoggingInfo.logPaths.size > 0)
+    val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec)
+    assert(lines.size === 2)
+    assert(lines(0).contains("SparkListenerApplicationStart"))
+    assert(lines(1).contains("SparkListenerApplicationEnd"))
+    assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === applicationStart)
+    assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationEnd)
+    eventLogger.stop()
+  }
+
+  /**
+   * Test end-to-end event logging functionality in an application.
+   * This runs a simple Spark job and asserts that the expected events are logged when expected.
+   */
+  private def testApplicationEventLogging(compressionCodec: Option[String] = None) {
+    val conf = getLoggingConf(logDirPath, compressionCodec)
+    val sc = new SparkContext("local", "test", conf)
+    assert(sc.eventLogger.isDefined)
+    val eventLogger = sc.eventLogger.get
+    val expectedLogDir = logDirPath.toString
+    assert(eventLogger.logDir.startsWith(expectedLogDir))
+
+    // Begin listening for events that trigger asserts
+    val eventExistenceListener = new EventExistenceListener(eventLogger)
+    sc.addSparkListener(eventExistenceListener)
+
+    // Trigger asserts for whether the expected events are actually logged
+    sc.parallelize(1 to 10000).count()
+    sc.stop()
+
+    // Ensure all asserts have actually been triggered
+    eventExistenceListener.assertAllCallbacksInvoked()
+  }
+
+  /**
+   * Assert that all of the specified events are logged by the given EventLoggingListener.
+   */
+  private def assertEventsExist(eventLogger: EventLoggingListener, events: Seq[String]) {
+    val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
+    assert(eventLoggingInfo.logPaths.size > 0)
+    val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec)
+    val eventSet = mutable.Set(events: _*)
+    lines.foreach { line =>
+      eventSet.foreach { event =>
+        if (line.contains(event)) {
+          val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
+          val eventType = Utils.getFormattedClassName(parsedEvent)
+          if (eventType == event) {
+            eventSet.remove(event)
+          }
+        }
+      }
+    }
+    assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
+  }
+
+  /**
+   * Read all lines from the file specified by the given path.
+   * If a compression codec is specified, use it to read the file.
+   */
+  private def readFileLines(
+      filePath: Path,
+      compressionCodec: Option[CompressionCodec]): Seq[String] = {
+    val fstream = fileSystem.open(filePath)
+    val cstream =
+      compressionCodec.map { codec =>
+        codec.compressedInputStream(fstream)
+      }.getOrElse(fstream)
+    Source.fromInputStream(cstream).getLines().toSeq
+  }
+
+  /**
+   * A listener that asserts certain events are logged by the given EventLoggingListener.
+   * This is necessary because events are posted asynchronously in a different thread.
+   */
+  private class EventExistenceListener(eventLogger: EventLoggingListener) extends SparkListener {
+    var jobStarted = false
+    var jobEnded = false
+    var appEnded = false
+
+    override def onJobStart(jobStart: SparkListenerJobStart) {
+      assertEventsExist(eventLogger, Seq[String](
+        Utils.getFormattedClassName(SparkListenerApplicationStart),
+        Utils.getFormattedClassName(SparkListenerBlockManagerAdded),
+        Utils.getFormattedClassName(SparkListenerEnvironmentUpdate)
+      ))
+      jobStarted = true
+    }
+
+    override def onJobEnd(jobEnd: SparkListenerJobEnd) {
+      assertEventsExist(eventLogger, Seq[String](
+        Utils.getFormattedClassName(SparkListenerJobStart),
+        Utils.getFormattedClassName(SparkListenerJobEnd),
+        Utils.getFormattedClassName(SparkListenerStageSubmitted),
+        Utils.getFormattedClassName(SparkListenerStageCompleted),
+        Utils.getFormattedClassName(SparkListenerTaskStart),
+        Utils.getFormattedClassName(SparkListenerTaskEnd)
+      ))
+      jobEnded = true
+    }
+
+    override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
+      assertEventsExist(eventLogger, Seq[String](
+        Utils.getFormattedClassName(SparkListenerApplicationEnd)
+      ))
+      appEnded = true
+    }
+
+    def assertAllCallbacksInvoked() {
+      assert(jobStarted, "JobStart callback not invoked!")
+      assert(jobEnded, "JobEnd callback not invoked!")
+      assert(appEnded, "ApplicationEnd callback not invoked!")
+    }
+  }
+
+
+  /* -------------------------------------------------------- *
+   * Helper methods for validating state of the special files *
+   * -------------------------------------------------------- */
+
+  private def eventLogsExist(logFiles: Array[FileStatus]): Boolean = {
+    logFiles.map(_.getPath.getName).exists(EventLoggingListener.isEventLogFile)
+  }
+
+  private def sparkVersionExists(logFiles: Array[FileStatus]): Boolean = {
+    logFiles.map(_.getPath.getName).exists(EventLoggingListener.isSparkVersionFile)
+  }
+
+  private def compressionCodecExists(logFiles: Array[FileStatus]): Boolean = {
+    logFiles.map(_.getPath.getName).exists(EventLoggingListener.isCompressionCodecFile)
+  }
+
+  private def applicationCompleteExists(logFiles: Array[FileStatus]): Boolean = {
+    logFiles.map(_.getPath.getName).exists(EventLoggingListener.isApplicationCompleteFile)
+  }
+
+  private def assertSparkVersionIsValid(logFiles: Array[FileStatus]) {
+    val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isSparkVersionFile)
+    assert(file.isDefined)
+    assert(EventLoggingListener.parseSparkVersion(file.get) === SparkContext.SPARK_VERSION)
+  }
+
+  private def assertCompressionCodecIsValid(logFiles: Array[FileStatus], compressionCodec: String) {
+    val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isCompressionCodecFile)
+    assert(file.isDefined)
+    assert(EventLoggingListener.parseCompressionCodec(file.get) === compressionCodec)
+  }
+
+}
+
+
+object EventLoggingListenerSuite {
+
+  /** Get a SparkConf with event logging enabled. */
+  def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None) = {
+    val conf = new SparkConf
+    conf.set("spark.eventLog.enabled", "true")
+    conf.set("spark.eventLog.testing", "true")
+    conf.set("spark.eventLog.dir", logDir.toString)
+    compressionCodec.foreach { codec =>
+      conf.set("spark.eventLog.compress", "true")
+      conf.set("spark.io.compression.codec", codec)
+    }
+    conf
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/394d8cb1/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
new file mode 100644
index 0000000..d1fe1fc
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.PrintWriter
+
+import scala.util.Try
+
+import com.google.common.io.Files
+import org.json4s.jackson.JsonMethods._
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.{JsonProtocol, Utils}
+
+/**
+ * Test whether ReplayListenerBus replays events from logs correctly.
+ */
+class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
+  private val fileSystem = Utils.getHadoopFileSystem("/")
+  private val allCompressionCodecs = Seq[String](
+    "org.apache.spark.io.LZFCompressionCodec",
+    "org.apache.spark.io.SnappyCompressionCodec"
+  )
+  private val testDir = Files.createTempDir()
+
+  after {
+    Try { fileSystem.delete(Utils.getFilePath(testDir, "events.txt"), true) }
+    Try { fileSystem.delete(Utils.getFilePath(testDir, "test-replay"), true) }
+  }
+
+  test("Simple replay") {
+    testSimpleReplay()
+  }
+
+  test("Simple replay with compression") {
+    allCompressionCodecs.foreach { codec =>
+      testSimpleReplay(Some(codec))
+    }
+  }
+
+  // This assumes the correctness of EventLoggingListener
+  test("End-to-end replay") {
+    testApplicationReplay()
+  }
+
+  // This assumes the correctness of EventLoggingListener
+  test("End-to-end replay with compression") {
+    allCompressionCodecs.foreach { codec =>
+      testApplicationReplay(Some(codec))
+    }
+  }
+
+
+  /* ----------------- *
+   * Actual test logic *
+   * ----------------- */
+
+  /**
+   * Test simple replaying of events.
+   */
+  private def testSimpleReplay(codecName: Option[String] = None) {
+    val logFilePath = Utils.getFilePath(testDir, "events.txt")
+    val codec = codecName.map(getCompressionCodec)
+    val fstream = fileSystem.create(logFilePath)
+    val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
+    val writer = new PrintWriter(cstream)
+    val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey")
+    val applicationEnd = SparkListenerApplicationEnd(1000L)
+    writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
+    writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
+    writer.close()
+    val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec)
+    val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath, codecName)
+    val eventMonster = new EventMonster(conf)
+    replayer.addListener(eventMonster)
+    replayer.replay()
+    assert(eventMonster.loggedEvents.size === 2)
+    assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart))
+    assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd))
+  }
+
+  /**
+   * Test end-to-end replaying of events.
+   *
+   * This test runs a few simple jobs with event logging enabled, and compares each emitted
+   * event to the corresponding event replayed from the event logs. This test makes the
+   * assumption that the event logging behavior is correct (tested in a separate suite).
+   */
+  private def testApplicationReplay(codecName: Option[String] = None) {
+    val logDirPath = Utils.getFilePath(testDir, "test-replay")
+    val conf = EventLoggingListenerSuite.getLoggingConf(logDirPath, codecName)
+    val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf)
+
+    // Run a few jobs
+    sc.parallelize(1 to 100, 1).count()
+    sc.parallelize(1 to 100, 2).map(i => (i, i)).count()
+    sc.parallelize(1 to 100, 3).map(i => (i, i)).groupByKey().count()
+    sc.parallelize(1 to 100, 4).map(i => (i, i)).groupByKey().persist().count()
+    sc.stop()
+
+    // Prepare information needed for replay
+    val codec = codecName.map(getCompressionCodec)
+    val applications = fileSystem.listStatus(logDirPath)
+    assert(applications != null && applications.size > 0)
+    val eventLogDir = applications.sortBy(_.getAccessTime).last
+    assert(eventLogDir.isDir)
+    val logFiles = fileSystem.listStatus(eventLogDir.getPath)
+    assert(logFiles != null && logFiles.size > 0)
+    val logFile = logFiles.find(_.getPath.getName.startsWith("EVENT_LOG_"))
+    assert(logFile.isDefined)
+    val logFilePath = logFile.get.getPath
+
+    // Replay events
+    val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec)
+    val eventMonster = new EventMonster(conf)
+    replayer.addListener(eventMonster)
+    replayer.replay()
+
+    // Verify the same events are replayed in the same order
+    assert(sc.eventLogger.isDefined)
+    val originalEvents = sc.eventLogger.get.loggedEvents
+    val replayedEvents = eventMonster.loggedEvents
+    originalEvents.zip(replayedEvents).foreach { case (e1, e2) => assert(e1 === e2) }
+  }
+
+  /**
+   * A simple listener that buffers all the events it receives.
+   *
+   * The event buffering functionality must be implemented within EventLoggingListener itself.
+   * This is because of the following race condition: the event may be mutated between being
+   * processed by one listener and being processed by another. Thus, in order to establish
+   * a fair comparison between the original events and the replayed events, both functionalities
+   * must be implemented within one listener (i.e. the EventLoggingListener).
+   *
+   * This child listener inherits only the event buffering functionality, but does not actually
+   * log the events.
+   */
+  private class EventMonster(conf: SparkConf) extends EventLoggingListener("test", conf) {
+    logger.close()
+  }
+
+  private def getCompressionCodec(codecName: String) = {
+    val conf = new SparkConf
+    conf.set("spark.io.compression.codec", codecName)
+    CompressionCodec.createCodec(conf)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/394d8cb1/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
new file mode 100644
index 0000000..f675e1e
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.IOException
+
+import scala.io.Source
+import scala.util.Try
+
+import com.google.common.io.Files
+import org.apache.hadoop.fs.Path
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.io.CompressionCodec
+
+/**
+ * Test writing files through the FileLogger.
+ */
+class FileLoggerSuite extends FunSuite with BeforeAndAfter {
+  private val fileSystem = Utils.getHadoopFileSystem("/")
+  private val allCompressionCodecs = Seq[String](
+    "org.apache.spark.io.LZFCompressionCodec",
+    "org.apache.spark.io.SnappyCompressionCodec"
+  )
+  private val testDir = Files.createTempDir()
+  private val logDirPath = Utils.getFilePath(testDir, "test-file-logger")
+  private val logDirPathString = logDirPath.toString
+
+  after {
+    Try { fileSystem.delete(logDirPath, true) }
+  }
+
+  test("Simple logging") {
+    testSingleFile()
+  }
+
+  test ("Simple logging with compression") {
+    allCompressionCodecs.foreach { codec =>
+      testSingleFile(Some(codec))
+    }
+  }
+
+  test("Logging multiple files") {
+    testMultipleFiles()
+  }
+
+  test("Logging multiple files with compression") {
+    allCompressionCodecs.foreach { codec =>
+      testMultipleFiles(Some(codec))
+    }
+  }
+
+  test("Logging when directory already exists") {
+    // Create the logging directory multiple times
+    new FileLogger(logDirPathString, new SparkConf, overwrite = true).start()
+    new FileLogger(logDirPathString, new SparkConf, overwrite = true).start()
+    new FileLogger(logDirPathString, new SparkConf, overwrite = true).start()
+
+    // If overwrite is not enabled, an exception should be thrown
+    intercept[IOException] {
+      new FileLogger(logDirPathString, new SparkConf, overwrite = false).start()
+    }
+  }
+
+
+  /* ----------------- *
+   * Actual test logic *
+   * ----------------- */
+
+  /**
+   * Test logging to a single file.
+   */
+  private def testSingleFile(codecName: Option[String] = None) {
+    val conf = getLoggingConf(codecName)
+    val codec = codecName.map { c => CompressionCodec.createCodec(conf) }
+    val logger =
+      if (codecName.isDefined) {
+        new FileLogger(logDirPathString, conf, compress = true)
+      } else {
+        new FileLogger(logDirPathString, conf)
+      }
+    logger.start()
+    assert(fileSystem.exists(logDirPath))
+    assert(fileSystem.getFileStatus(logDirPath).isDir)
+    assert(fileSystem.listStatus(logDirPath).size === 0)
+
+    logger.newFile()
+    val files = fileSystem.listStatus(logDirPath)
+    assert(files.size === 1)
+    val firstFile = files.head
+    val firstFilePath = firstFile.getPath
+
+    logger.log("hello")
+    logger.flush()
+    assert(readFileContent(firstFilePath, codec) === "hello")
+
+    logger.log(" world")
+    logger.close()
+    assert(readFileContent(firstFilePath, codec) === "hello world")
+  }
+
+  /**
+   * Test logging to multiple files.
+   */
+  private def testMultipleFiles(codecName: Option[String] = None) {
+    val conf = getLoggingConf(codecName)
+    val codec = codecName.map { c => CompressionCodec.createCodec(conf) }
+    val logger =
+      if (codecName.isDefined) {
+        new FileLogger(logDirPathString, conf, compress = true)
+      } else {
+        new FileLogger(logDirPathString, conf)
+      }
+    logger.start()
+    logger.newFile("Jean_Valjean")
+    logger.logLine("Who am I?")
+    logger.logLine("Destiny?")
+    logger.newFile("John_Valjohn")
+    logger.logLine("One")
+    logger.logLine("Two three...")
+    logger.newFile("Wolverine")
+    logger.logLine("There was a time")
+    logger.logLine("A time when our enemies knew honor.")
+    logger.close()
+    assert(readFileContent(new Path(logDirPath, "Jean_Valjean"), codec) === "Who am I?\nDestiny?")
+    assert(readFileContent(new Path(logDirPath, "John_Valjohn"), codec) === "One\nTwo three...")
+    assert(readFileContent(new Path(logDirPath, "Wolverine"), codec) ===
+      "There was a time\nA time when our enemies knew honor.")
+  }
+
+  /**
+   * Read the content of the file specified by the given path.
+   * If a compression codec is specified, use it to read the file.
+   */
+  private def readFileContent(logPath: Path, codec: Option[CompressionCodec] = None): String = {
+    val fstream = fileSystem.open(logPath)
+    val cstream = codec.map(_.compressedInputStream(fstream)).getOrElse(fstream)
+    Source.fromInputStream(cstream).getLines().mkString("\n")
+  }
+
+  private def getLoggingConf(codecName: Option[String]) = {
+    val conf = new SparkConf
+    codecName.foreach { c => conf.set("spark.io.compression.codec", c) }
+    conf
+  }
+
+}