You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/12/20 03:23:49 UTC

[1/2] spark git commit: [SPARK-2261] Make event logger use a single file.

Repository: spark
Updated Branches:
  refs/heads/master c28083f46 -> 456451911


http://git-wip-us.apache.org/repos/asf/spark/blob/45645191/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
deleted file mode 100644
index 72466a3..0000000
--- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.io.{File, IOException}
-
-import scala.io.Source
-
-import org.apache.hadoop.fs.Path
-import org.scalatest.{BeforeAndAfter, FunSuite}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.io.CompressionCodec
-
-/**
- * Test writing files through the FileLogger.
- */
-class FileLoggerSuite extends FunSuite with BeforeAndAfter {
-  private val fileSystem = Utils.getHadoopFileSystem("/",
-    SparkHadoopUtil.get.newConfiguration(new SparkConf()))
-  private val allCompressionCodecs = Seq[String](
-    "org.apache.spark.io.LZFCompressionCodec",
-    "org.apache.spark.io.SnappyCompressionCodec"
-  )
-  private var testDir: File = _
-  private var logDirPath: Path = _
-  private var logDirPathString: String = _
-
-  before {
-    testDir = Utils.createTempDir()
-    logDirPath = Utils.getFilePath(testDir, "test-file-logger")
-    logDirPathString = logDirPath.toString
-  }
-
-  after {
-    Utils.deleteRecursively(testDir)
-  }
-
-  test("Simple logging") {
-    testSingleFile()
-  }
-
-  test ("Simple logging with compression") {
-    allCompressionCodecs.foreach { codec =>
-      testSingleFile(Some(codec))
-    }
-  }
-
-  test("Logging multiple files") {
-    testMultipleFiles()
-  }
-
-  test("Logging multiple files with compression") {
-    allCompressionCodecs.foreach { codec =>
-      testMultipleFiles(Some(codec))
-    }
-  }
-
-  test("Logging when directory already exists") {
-    // Create the logging directory multiple times
-    new FileLogger(logDirPathString, new SparkConf, compress = false, overwrite = true).start()
-    new FileLogger(logDirPathString, new SparkConf, compress = false, overwrite = true).start()
-    new FileLogger(logDirPathString, new SparkConf, compress = false, overwrite = true).start()
-
-    // If overwrite is not enabled, an exception should be thrown
-    intercept[IOException] {
-      new FileLogger(logDirPathString, new SparkConf, compress = false, overwrite = false).start()
-    }
-  }
-
-
-  /* ----------------- *
-   * Actual test logic *
-   * ----------------- */
-
-  /**
-   * Test logging to a single file.
-   */
-  private def testSingleFile(codecName: Option[String] = None) {
-    val conf = getLoggingConf(codecName)
-    val codec = codecName.map { c => CompressionCodec.createCodec(conf) }
-    val logger =
-      if (codecName.isDefined) {
-        new FileLogger(logDirPathString, conf, compress = true)
-      } else {
-        new FileLogger(logDirPathString, conf)
-      }
-    logger.start()
-    assert(fileSystem.exists(logDirPath))
-    assert(fileSystem.getFileStatus(logDirPath).isDir)
-    assert(fileSystem.listStatus(logDirPath).size === 0)
-
-    logger.newFile()
-    val files = fileSystem.listStatus(logDirPath)
-    assert(files.size === 1)
-    val firstFile = files.head
-    val firstFilePath = firstFile.getPath
-
-    logger.log("hello")
-    logger.flush()
-    assert(readFileContent(firstFilePath, codec) === "hello")
-
-    logger.log(" world")
-    logger.close()
-    assert(readFileContent(firstFilePath, codec) === "hello world")
-  }
-
-  /**
-   * Test logging to multiple files.
-   */
-  private def testMultipleFiles(codecName: Option[String] = None) {
-    val conf = getLoggingConf(codecName)
-    val codec = codecName.map { c => CompressionCodec.createCodec(conf) }
-    val logger =
-      if (codecName.isDefined) {
-        new FileLogger(logDirPathString, conf, compress = true)
-      } else {
-        new FileLogger(logDirPathString, conf)
-      }
-    logger.start()
-    logger.newFile("Jean_Valjean")
-    logger.logLine("Who am I?")
-    logger.logLine("Destiny?")
-    logger.newFile("John_Valjohn")
-    logger.logLine("One")
-    logger.logLine("Two three...")
-    logger.newFile("Wolverine")
-    logger.logLine("There was a time")
-    logger.logLine("A time when our enemies knew honor.")
-    logger.close()
-    assert(readFileContent(new Path(logDirPath, "Jean_Valjean"), codec) === "Who am I?\nDestiny?")
-    assert(readFileContent(new Path(logDirPath, "John_Valjohn"), codec) === "One\nTwo three...")
-    assert(readFileContent(new Path(logDirPath, "Wolverine"), codec) ===
-      "There was a time\nA time when our enemies knew honor.")
-  }
-
-  /**
-   * Read the content of the file specified by the given path.
-   * If a compression codec is specified, use it to read the file.
-   */
-  private def readFileContent(logPath: Path, codec: Option[CompressionCodec] = None): String = {
-    val fstream = fileSystem.open(logPath)
-    val cstream = codec.map(_.compressedInputStream(fstream)).getOrElse(fstream)
-    Source.fromInputStream(cstream).getLines().mkString("\n")
-  }
-
-  private def getLoggingConf(codecName: Option[String]) = {
-    val conf = new SparkConf
-    codecName.foreach { c => conf.set("spark.io.compression.codec", c) }
-    conf
-  }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-2261] Make event logger use a single file.

Posted by an...@apache.org.
[SPARK-2261] Make event logger use a single file.

Currently the event logger uses a directory and several files to
describe an app's event log, all but one of which are empty. This
is not very HDFS-friendly, since creating lots of nodes in HDFS
(especially when they don't contain any data) is frowned upon due
to the node metadata being kept in the NameNode's memory.

Instead, add a header section to the event log file that contains metadata
needed to read the events. This metadata includes things like the Spark
version (for future code that may need it for backwards compatibility) and
the compression codec used for the event data.

With the new approach, aside from reducing the load on the NN, there's
also a lot less remote calls needed when reading the log directory.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #1222 from vanzin/hist-server-single-log and squashes the following commits:

cc8f5de [Marcelo Vanzin] Store header in plain text.
c7e6123 [Marcelo Vanzin] Update comment.
59c561c [Marcelo Vanzin] Review feedback.
216c5a3 [Marcelo Vanzin] Review comments.
dce28e9 [Marcelo Vanzin] Fix log overwrite test.
f91c13e [Marcelo Vanzin] Handle "spark.eventLog.overwrite", and add unit test.
346f0b4 [Marcelo Vanzin] Review feedback.
ed0023e [Marcelo Vanzin] Merge branch 'master' into hist-server-single-log
3f4500f [Marcelo Vanzin] Unit test for SPARK-3697.
45c7a1f [Marcelo Vanzin] Version of SPARK-3697 for this branch.
b3ee30b [Marcelo Vanzin] Merge branch 'master' into hist-server-single-log
a6d5c50 [Marcelo Vanzin] Merge branch 'master' into hist-server-single-log
16fd491 [Marcelo Vanzin] Use unique log directory for each codec.
0ef3f70 [Marcelo Vanzin] Merge branch 'master' into hist-server-single-log
d93c44a [Marcelo Vanzin] Add a newline to make the header more readable.
9e928ba [Marcelo Vanzin] Add types.
bd6ba8c [Marcelo Vanzin] Review feedback.
a624a89 [Marcelo Vanzin] Merge branch 'master' into hist-server-single-log
04364dc [Marcelo Vanzin] Merge branch 'master' into hist-server-single-log
bb7c2d3 [Marcelo Vanzin] Fix scalastyle warning.
16661a3 [Marcelo Vanzin] Simplify some internal code.
cc6bce4 [Marcelo Vanzin] Some review feedback.
a722184 [Marcelo Vanzin] Do not encode metadata in log file name.
3700586 [Marcelo Vanzin] Restore log flushing.
f677930 [Marcelo Vanzin] Fix botched rebase.
ae571fa [Marcelo Vanzin] Fix end-to-end event logger test.
9db0efd [Marcelo Vanzin] Show prettier name in UI.
8f42274 [Marcelo Vanzin] Make history server parse old-style log directories.
6251dd7 [Marcelo Vanzin] Make event logger use a single file.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45645191
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45645191
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45645191

Branch: refs/heads/master
Commit: 456451911d11cc0b6738f31b1e17869b1fb51c87
Parents: c28083f
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri Dec 19 18:21:15 2014 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Fri Dec 19 18:23:42 2014 -0800

----------------------------------------------------------------------
 .../spark/deploy/ApplicationDescription.scala   |   2 +-
 .../deploy/history/FsHistoryProvider.scala      | 194 +++++++----
 .../org/apache/spark/deploy/master/Master.scala |  46 +--
 .../spark/scheduler/EventLoggingListener.scala  | 291 +++++++++++------
 .../spark/scheduler/ReplayListenerBus.scala     |  77 ++---
 .../org/apache/spark/util/FileLogger.scala      | 237 --------------
 .../deploy/history/FsHistoryProviderSuite.scala | 185 +++++++++++
 .../scheduler/EventLoggingListenerSuite.scala   | 321 +++++--------------
 .../spark/scheduler/ReplayListenerSuite.scala   |  82 +++--
 .../org/apache/spark/util/FileLoggerSuite.scala | 169 ----------
 10 files changed, 675 insertions(+), 929 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/45645191/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 65a1a8f..b10b7b8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -23,7 +23,7 @@ private[spark] class ApplicationDescription(
     val memoryPerSlave: Int,
     val command: Command,
     var appUiUrl: String,
-    val eventLogDir: Option[String] = None)
+    val eventLogFile: Option[String] = None)
   extends Serializable {
 
   val user = System.getProperty("user.name", "<unknown>")

http://git-wip-us.apache.org/repos/asf/spark/blob/45645191/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 82a54db..792d15b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -17,14 +17,16 @@
 
 package org.apache.spark.deploy.history
 
-import java.io.FileNotFoundException
+import java.io.{BufferedInputStream, FileNotFoundException, InputStream}
 
 import scala.collection.mutable
 
 import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.permission.AccessControlException
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.scheduler._
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.Utils
@@ -64,6 +66,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
   @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
     = new mutable.LinkedHashMap()
 
+  // Constants used to parse Spark 1.0.0 log directories.
+  private[history] val LOG_PREFIX = "EVENT_LOG_"
+  private[history] val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
+  private[history] val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
+  private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
+
   /**
    * A background thread that periodically checks for event log updates on disk.
    *
@@ -90,7 +98,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
 
   initialize()
 
-  private def initialize() {
+  private def initialize(): Unit = {
     // Validate the log directory.
     val path = new Path(logDir)
     if (!fs.exists(path)) {
@@ -106,8 +114,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
     }
 
     checkForLogs()
-    logCheckingThread.setDaemon(true)
-    logCheckingThread.start()
+
+    // Disable the background thread during tests.
+    if (!conf.contains("spark.testing")) {
+      logCheckingThread.setDaemon(true)
+      logCheckingThread.start()
+    }
   }
 
   override def getListing() = applications.values
@@ -115,8 +127,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
   override def getAppUI(appId: String): Option[SparkUI] = {
     try {
       applications.get(appId).map { info =>
-        val (replayBus, appListener) = createReplayBus(fs.getFileStatus(
-          new Path(logDir, info.logDir)))
+        val replayBus = new ReplayListenerBus()
         val ui = {
           val conf = this.conf.clone()
           val appSecManager = new SecurityManager(conf)
@@ -125,15 +136,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
           // Do not call ui.bind() to avoid creating a new server for each application
         }
 
-        replayBus.replay()
+        val appListener = new ApplicationEventListener()
+        replayBus.addListener(appListener)
+        val appInfo = replay(fs.getFileStatus(new Path(logDir, info.logPath)), replayBus)
 
-        ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED)} ($appId)")
+        ui.setAppName(s"${appInfo.name} ($appId)")
 
         val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
         ui.getSecurityManager.setAcls(uiAclsEnabled)
         // make sure to set admin acls before view acls so they are properly picked up
         ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
-        ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED),
+        ui.getSecurityManager.setViewAcls(appInfo.sparkUser,
           appListener.viewAcls.getOrElse(""))
         ui
       }
@@ -149,41 +162,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
    * Tries to reuse as much of the data already in memory as possible, by not reading
    * applications that haven't been updated since last time the logs were checked.
    */
-  private def checkForLogs() = {
+  private[history] def checkForLogs(): Unit = {
     lastLogCheckTimeMs = getMonotonicTimeMs()
     logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
-    try {
-      val logStatus = fs.listStatus(new Path(logDir))
-      val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
 
-      // Load all new logs from the log directory. Only directories that have a modification time
-      // later than the last known log directory will be loaded.
+    try {
       var newLastModifiedTime = lastModifiedTime
-      val logInfos = logDirs
-        .filter { dir =>
-          if (fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) {
-            val modTime = getModificationTime(dir)
-            newLastModifiedTime = math.max(newLastModifiedTime, modTime)
-            modTime > lastModifiedTime
-          } else {
-            false
+      val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
+        .getOrElse(Seq[FileStatus]())
+      val logInfos = statusList
+        .filter { entry =>
+          try {
+            val isFinishedApplication =
+              if (isLegacyLogDirectory(entry)) {
+                fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
+              } else {
+                !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
+              }
+
+            if (isFinishedApplication) {
+              val modTime = getModificationTime(entry)
+              newLastModifiedTime = math.max(newLastModifiedTime, modTime)
+              modTime >= lastModifiedTime
+            } else {
+              false
+            }
+          } catch {
+            case e: AccessControlException =>
+              // Do not use "logInfo" since these messages can get pretty noisy if printed on
+              // every poll.
+              logDebug(s"No permission to read $entry, ignoring.")
+              false
           }
         }
-        .flatMap { dir =>
+        .flatMap { entry =>
           try {
-            val (replayBus, appListener) = createReplayBus(dir)
-            replayBus.replay()
-            Some(new FsApplicationHistoryInfo(
-              dir.getPath().getName(),
-              appListener.appId.getOrElse(dir.getPath().getName()),
-              appListener.appName.getOrElse(NOT_STARTED),
-              appListener.startTime.getOrElse(-1L),
-              appListener.endTime.getOrElse(-1L),
-              getModificationTime(dir),
-              appListener.sparkUser.getOrElse(NOT_STARTED)))
+            Some(replay(entry, new ReplayListenerBus()))
           } catch {
             case e: Exception =>
-              logInfo(s"Failed to load application log data from $dir.", e)
+              logError(s"Failed to load application log data from $entry.", e)
               None
           }
         }
@@ -217,37 +234,100 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         applications = newApps
       }
     } catch {
-      case t: Throwable => logError("Exception in checking for event log updates", t)
+      case e: Exception => logError("Exception in checking for event log updates", e)
     }
   }
 
-  private def createReplayBus(logDir: FileStatus): (ReplayListenerBus, ApplicationEventListener) = {
-    val path = logDir.getPath()
-    val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
-    val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
-    val appListener = new ApplicationEventListener
-    replayBus.addListener(appListener)
-    (replayBus, appListener)
+  /**
+   * Replays the events in the specified log file and returns information about the associated
+   * application.
+   */
+  private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
+    val logPath = eventLog.getPath()
+    val (logInput, sparkVersion) =
+      if (isLegacyLogDirectory(eventLog)) {
+        openLegacyEventLog(logPath)
+      } else {
+        EventLoggingListener.openEventLog(logPath, fs)
+      }
+    try {
+      val appListener = new ApplicationEventListener
+      bus.addListener(appListener)
+      bus.replay(logInput, sparkVersion)
+      new FsApplicationHistoryInfo(
+        logPath.getName(),
+        appListener.appId.getOrElse(logPath.getName()),
+        appListener.appName.getOrElse(NOT_STARTED),
+        appListener.startTime.getOrElse(-1L),
+        appListener.endTime.getOrElse(-1L),
+        getModificationTime(eventLog),
+        appListener.sparkUser.getOrElse(NOT_STARTED))
+    } finally {
+      logInput.close()
+    }
   }
 
-  /** Return when this directory was last modified. */
-  private def getModificationTime(dir: FileStatus): Long = {
-    try {
-      val logFiles = fs.listStatus(dir.getPath)
-      if (logFiles != null && !logFiles.isEmpty) {
-        logFiles.map(_.getModificationTime).max
-      } else {
-        dir.getModificationTime
+  /**
+   * Loads a legacy log directory. This assumes that the log directory contains a single event
+   * log file (along with other metadata files), which is the case for directories generated by
+   * the code in previous releases.
+   *
+   * @return 2-tuple of (input stream of the events, version of Spark which wrote the log)
+   */
+  private[history] def openLegacyEventLog(dir: Path): (InputStream, String) = {
+    val children = fs.listStatus(dir)
+    var eventLogPath: Path = null
+    var codecName: Option[String] = None
+    var sparkVersion: String = null
+
+    children.foreach { child =>
+      child.getPath().getName() match {
+        case name if name.startsWith(LOG_PREFIX) =>
+          eventLogPath = child.getPath()
+
+        case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) =>
+          codecName = Some(codec.substring(COMPRESSION_CODEC_PREFIX.length()))
+
+        case version if version.startsWith(SPARK_VERSION_PREFIX) =>
+          sparkVersion = version.substring(SPARK_VERSION_PREFIX.length())
+
+        case _ =>
       }
-    } catch {
-      case t: Throwable =>
-        logError("Exception in accessing modification time of %s".format(dir.getPath), t)
-        -1L
+    }
+
+    if (eventLogPath == null || sparkVersion == null) {
+      throw new IllegalArgumentException(s"$dir is not a Spark application log directory.")
+    }
+
+    val codec = try {
+        codecName.map { c => CompressionCodec.createCodec(conf, c) }
+      } catch {
+        case e: Exception =>
+          throw new IllegalArgumentException(s"Unknown compression codec $codecName.")
+      }
+
+    val in = new BufferedInputStream(fs.open(eventLogPath))
+    (codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion)
+  }
+
+  /**
+   * Return whether the specified event log path contains a old directory-based event log.
+   * Previously, the event log of an application comprises of multiple files in a directory.
+   * As of Spark 1.3, these files are consolidated into a single one that replaces the directory.
+   * See SPARK-2261 for more detail.
+   */
+  private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDir()
+
+  private def getModificationTime(fsEntry: FileStatus): Long = {
+    if (fsEntry.isDir) {
+      fs.listStatus(fsEntry.getPath).map(_.getModificationTime()).max
+    } else {
+      fsEntry.getModificationTime()
     }
   }
 
   /** Returns the system's mononotically increasing time. */
-  private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000)
+  private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000)
 
 }
 
@@ -256,7 +336,7 @@ private object FsHistoryProvider {
 }
 
 private class FsApplicationHistoryInfo(
-    val logDir: String,
+    val logPath: String,
     id: String,
     name: String,
     startTime: Long,

http://git-wip-us.apache.org/repos/asf/spark/blob/45645191/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 1f9f35d..ed5eab9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.deploy.master
 
+import java.io.FileNotFoundException
 import java.net.URLEncoder
 import java.text.SimpleDateFormat
 import java.util.Date
@@ -32,6 +33,7 @@ import akka.pattern.ask
 import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
 import akka.serialization.Serialization
 import akka.serialization.SerializationExtension
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
@@ -56,6 +58,7 @@ private[spark] class Master(
   import context.dispatcher   // to use Akka's scheduler.schedule()
 
   val conf = new SparkConf
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
 
   def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")  // For application IDs
   val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
@@ -514,7 +517,7 @@ private[spark] class Master(
     val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
     val numWorkersAlive = shuffledAliveWorkers.size
     var curPos = 0
-    
+
     for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
       // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
       // start from the last worker that was assigned a driver, and continue onwards until we have
@@ -711,41 +714,38 @@ private[spark] class Master(
   def rebuildSparkUI(app: ApplicationInfo): Boolean = {
     val appName = app.desc.name
     val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
-    val eventLogDir = app.desc.eventLogDir.getOrElse {
+    val eventLogFile = app.desc.eventLogFile.getOrElse {
       // Event logging is not enabled for this application
       app.desc.appUiUrl = notFoundBasePath
       return false
     }
 
-    val appEventLogDir = EventLoggingListener.getLogDirPath(eventLogDir, app.id)
-    val fileSystem = Utils.getHadoopFileSystem(appEventLogDir,
-      SparkHadoopUtil.get.newConfiguration(conf))
-    val eventLogInfo = EventLoggingListener.parseLoggingInfo(appEventLogDir, fileSystem)
-    val eventLogPaths = eventLogInfo.logPaths
-    val compressionCodec = eventLogInfo.compressionCodec
-
-    if (eventLogPaths.isEmpty) {
-      // Event logging is enabled for this application, but no event logs are found
-      val title = s"Application history not found (${app.id})"
-      var msg = s"No event logs found for application $appName in $appEventLogDir."
-      logWarning(msg)
-      msg += " Did you specify the correct logging directory?"
-      msg = URLEncoder.encode(msg, "UTF-8")
-      app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
-      return false
-    }
-
     try {
-      val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
+      val fs = Utils.getHadoopFileSystem(eventLogFile, hadoopConf)
+      val (logInput, sparkVersion) = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
+      val replayBus = new ReplayListenerBus()
       val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
         appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
-      replayBus.replay()
+      try {
+        replayBus.replay(logInput, sparkVersion)
+      } finally {
+        logInput.close()
+      }
       appIdToUI(app.id) = ui
       webUi.attachSparkUI(ui)
       // Application UI is successfully rebuilt, so link the Master UI to it
-      app.desc.appUiUrl = ui.getBasePath
+      app.desc.appUiUrl = ui.basePath
       true
     } catch {
+      case fnf: FileNotFoundException =>
+        // Event logging is enabled for this application, but no event logs are found
+        val title = s"Application history not found (${app.id})"
+        var msg = s"No event logs found for application $appName in $eventLogFile."
+        logWarning(msg)
+        msg += " Did you specify the correct logging directory?"
+        msg = URLEncoder.encode(msg, "UTF-8")
+        app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
+        false
       case e: Exception =>
         // Relay exception message to application UI page
         val title = s"Application history load error (${app.id})"

http://git-wip-us.apache.org/repos/asf/spark/blob/45645191/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 597dbc8..27bf4f1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -17,20 +17,23 @@
 
 package org.apache.spark.scheduler
 
+import java.io._
+import java.net.URI
+
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
+import com.google.common.base.Charsets
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
 import org.apache.hadoop.fs.permission.FsPermission
 import org.json4s.JsonAST.JValue
 import org.json4s.jackson.JsonMethods._
 
-import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SPARK_VERSION}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io.CompressionCodec
-import org.apache.spark.SPARK_VERSION
-import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}
+import org.apache.spark.util.{JsonProtocol, Utils}
 
 /**
  * A SparkListener that logs events to persistent storage.
@@ -58,36 +61,78 @@ private[spark] class EventLoggingListener(
   private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
   private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
   private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
-  val logDir = EventLoggingListener.getLogDirPath(logBaseDir, appId)
-  val logDirName: String = logDir.split("/").last
-  protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
-    shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
+  private val fileSystem = Utils.getHadoopFileSystem(new URI(logBaseDir), hadoopConf)
+
+  // Only defined if the file system scheme is not local
+  private var hadoopDataStream: Option[FSDataOutputStream] = None
+
+  // The Hadoop APIs have changed over time, so we use reflection to figure out
+  // the correct method to use to flush a hadoop data stream. See SPARK-1518
+  // for details.
+  private val hadoopFlushMethod = {
+    val cls = classOf[FSDataOutputStream]
+    scala.util.Try(cls.getMethod("hflush")).getOrElse(cls.getMethod("sync"))
+  }
+
+  private var writer: Option[PrintWriter] = None
 
   // For testing. Keep track of all JSON serialized events that have been logged.
   private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
 
+  // Visible for tests only.
+  private[scheduler] val logPath = getLogPath(logBaseDir, appId)
+
   /**
-   * Begin logging events.
-   * If compression is used, log a file that indicates which compression library is used.
+   * Creates the log file in the configured log directory.
    */
   def start() {
-    logger.start()
-    logInfo("Logging events to %s".format(logDir))
-    if (shouldCompress) {
-      val codec =
-        sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
-      logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
+    if (!fileSystem.isDirectory(new Path(logBaseDir))) {
+      throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.")
+    }
+
+    val workingPath = logPath + IN_PROGRESS
+    val uri = new URI(workingPath)
+    val path = new Path(workingPath)
+    val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+    val isDefaultLocal = defaultFs == null || defaultFs == "file"
+
+    if (shouldOverwrite && fileSystem.exists(path)) {
+      logWarning(s"Event log $path already exists. Overwriting...")
+      fileSystem.delete(path, true)
     }
-    logger.newFile(SPARK_VERSION_PREFIX + SPARK_VERSION)
-    logger.newFile(LOG_PREFIX + logger.fileIndex)
+
+    /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
+     * Therefore, for local files, use FileOutputStream instead. */
+    val dstream =
+      if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
+        new FileOutputStream(uri.getPath)
+      } else {
+        hadoopDataStream = Some(fileSystem.create(path))
+        hadoopDataStream.get
+      }
+
+    val compressionCodec =
+      if (shouldCompress) {
+        Some(CompressionCodec.createCodec(sparkConf))
+      } else {
+        None
+      }
+
+    fileSystem.setPermission(path, LOG_FILE_PERMISSIONS)
+    val logStream = initEventLog(new BufferedOutputStream(dstream, outputBufferSize),
+      compressionCodec)
+    writer = Some(new PrintWriter(logStream))
+
+    logInfo("Logging events to %s".format(logPath))
   }
 
   /** Log the event as JSON. */
   private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
     val eventJson = JsonProtocol.sparkEventToJson(event)
-    logger.logLine(compact(render(eventJson)))
+    writer.foreach(_.println(compact(render(eventJson))))
     if (flushLogger) {
-      logger.flush()
+      writer.foreach(_.flush())
+      hadoopDataStream.foreach(hadoopFlushMethod.invoke(_))
     }
     if (testing) {
       loggedEvents += eventJson
@@ -123,130 +168,164 @@ private[spark] class EventLoggingListener(
     logEvent(event, flushLogger = true)
   override def onApplicationEnd(event: SparkListenerApplicationEnd) =
     logEvent(event, flushLogger = true)
+
   // No-op because logging every update would be overkill
   override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate) { }
 
   /**
-   * Stop logging events.
-   * In addition, create an empty special file to indicate application completion.
+   * Stop logging events. The event log file will be renamed so that it loses the
+   * ".inprogress" suffix.
    */
   def stop() = {
-    logger.newFile(APPLICATION_COMPLETE)
-    logger.stop()
+    writer.foreach(_.close())
+
+    val target = new Path(logPath)
+    if (fileSystem.exists(target)) {
+      if (shouldOverwrite) {
+        logWarning(s"Event log $target already exists. Overwriting...")
+        fileSystem.delete(target, true)
+      } else {
+        throw new IOException("Target log file already exists (%s)".format(logPath))
+      }
+    }
+    fileSystem.rename(new Path(logPath + IN_PROGRESS), target)
   }
+
 }
 
 private[spark] object EventLoggingListener extends Logging {
+  // Suffix applied to the names of files still being written by applications.
+  val IN_PROGRESS = ".inprogress"
   val DEFAULT_LOG_DIR = "/tmp/spark-events"
-  val LOG_PREFIX = "EVENT_LOG_"
-  val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
-  val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
-  val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
-  val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
 
-  // A cache for compression codecs to avoid creating the same codec many times
-  private val codecMap = new mutable.HashMap[String, CompressionCodec]
+  private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
 
-  def isEventLogFile(fileName: String): Boolean = {
-    fileName.startsWith(LOG_PREFIX)
-  }
+  // Marker for the end of header data in a log file. After this marker, log data, potentially
+  // compressed, will be found.
+  private val HEADER_END_MARKER = "=== LOG_HEADER_END ==="
 
-  def isSparkVersionFile(fileName: String): Boolean = {
-    fileName.startsWith(SPARK_VERSION_PREFIX)
-  }
+  // To avoid corrupted files causing the heap to fill up. Value is arbitrary.
+  private val MAX_HEADER_LINE_LENGTH = 4096
 
-  def isCompressionCodecFile(fileName: String): Boolean = {
-    fileName.startsWith(COMPRESSION_CODEC_PREFIX)
-  }
+  // A cache for compression codecs to avoid creating the same codec many times
+  private val codecMap = new mutable.HashMap[String, CompressionCodec]
 
-  def isApplicationCompleteFile(fileName: String): Boolean = {
-    fileName == APPLICATION_COMPLETE
-  }
+  /**
+   * Write metadata about the event log to the given stream.
+   *
+   * The header is a serialized version of a map, except it does not use Java serialization to
+   * avoid incompatibilities between different JDKs. It writes one map entry per line, in
+   * "key=value" format.
+   *
+   * The very last entry in the header is the `HEADER_END_MARKER` marker, so that the parsing code
+   * can know when to stop.
+   *
+   * The format needs to be kept in sync with the openEventLog() method below. Also, it cannot
+   * change in new Spark versions without some other way of detecting the change (like some
+   * metadata encoded in the file name).
+   *
+   * @param logStream Raw output stream to the even log file.
+   * @param compressionCodec Optional compression codec to use.
+   * @return A stream where to write event log data. This may be a wrapper around the original
+   *         stream (for example, when compression is enabled).
+   */
+  def initEventLog(
+      logStream: OutputStream,
+      compressionCodec: Option[CompressionCodec]): OutputStream = {
+    val meta = mutable.HashMap(("version" -> SPARK_VERSION))
+    compressionCodec.foreach { codec =>
+      meta += ("compressionCodec" -> codec.getClass().getName())
+    }
 
-  def parseSparkVersion(fileName: String): String = {
-    if (isSparkVersionFile(fileName)) {
-      fileName.replaceAll(SPARK_VERSION_PREFIX, "")
-    } else ""
-  }
+    def write(entry: String) = {
+      val bytes = entry.getBytes(Charsets.UTF_8)
+      if (bytes.length > MAX_HEADER_LINE_LENGTH) {
+        throw new IOException(s"Header entry too long: ${entry}")
+      }
+      logStream.write(bytes, 0, bytes.length)
+    }
 
-  def parseCompressionCodec(fileName: String): String = {
-    if (isCompressionCodecFile(fileName)) {
-      fileName.replaceAll(COMPRESSION_CODEC_PREFIX, "")
-    } else ""
+    meta.foreach { case (k, v) => write(s"$k=$v\n") }
+    write(s"$HEADER_END_MARKER\n")
+    compressionCodec.map(_.compressedOutputStream(logStream)).getOrElse(logStream)
   }
 
   /**
-   * Return a file-system-safe path to the log directory for the given application.
+   * Return a file-system-safe path to the log file for the given application.
    *
-   * @param logBaseDir A base directory for the path to the log directory for given application.
+   * @param logBaseDir Directory where the log file will be written.
    * @param appId A unique app ID.
    * @return A path which consists of file-system-safe characters.
    */
-  def getLogDirPath(logBaseDir: String, appId: String): String = {
+  def getLogPath(logBaseDir: String, appId: String): String = {
     val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
     Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
   }
 
   /**
-   * Parse the event logging information associated with the logs in the given directory.
+   * Opens an event log file and returns an input stream to the event data.
    *
-   * Specifically, this looks for event log files, the Spark version file, the compression
-   * codec file (if event logs are compressed), and the application completion file (if the
-   * application has run to completion).
+   * @return 2-tuple (event input stream, Spark version of event data)
    */
-  def parseLoggingInfo(logDir: Path, fileSystem: FileSystem): EventLoggingInfo = {
+  def openEventLog(log: Path, fs: FileSystem): (InputStream, String) = {
+    // It's not clear whether FileSystem.open() throws FileNotFoundException or just plain
+    // IOException when a file does not exist, so try our best to throw a proper exception.
+    if (!fs.exists(log)) {
+      throw new FileNotFoundException(s"File $log does not exist.")
+    }
+
+    val in = new BufferedInputStream(fs.open(log))
+    // Read a single line from the input stream without buffering.
+    // We cannot use BufferedReader because we must avoid reading
+    // beyond the end of the header, after which the content of the
+    // file may be compressed.
+    def readLine(): String = {
+      val bytes = new ByteArrayOutputStream()
+      var next = in.read()
+      var count = 0
+      while (next != '\n') {
+        if (next == -1) {
+          throw new IOException("Unexpected end of file.")
+        }
+        bytes.write(next)
+        count = count + 1
+        if (count > MAX_HEADER_LINE_LENGTH) {
+          throw new IOException("Maximum header line length exceeded.")
+        }
+        next = in.read()
+      }
+      new String(bytes.toByteArray(), Charsets.UTF_8)
+    }
+
+    // Parse the header metadata in the form of k=v pairs
+    // This assumes that every line before the header end marker follows this format
     try {
-      val fileStatuses = fileSystem.listStatus(logDir)
-      val filePaths =
-        if (fileStatuses != null) {
-          fileStatuses.filter(!_.isDir).map(_.getPath).toSeq
-        } else {
-          Seq[Path]()
+      val meta = new mutable.HashMap[String, String]()
+      var foundEndMarker = false
+      while (!foundEndMarker) {
+        readLine() match {
+          case HEADER_END_MARKER =>
+            foundEndMarker = true
+          case entry =>
+            val prop = entry.split("=", 2)
+            if (prop.length != 2) {
+              throw new IllegalArgumentException("Invalid metadata in log file.")
+            }
+            meta += (prop(0) -> prop(1))
         }
-      if (filePaths.isEmpty) {
-        logWarning("No files found in logging directory %s".format(logDir))
       }
-      EventLoggingInfo(
-        logPaths = filePaths.filter { path => isEventLogFile(path.getName) },
-        sparkVersion = filePaths
-          .find { path => isSparkVersionFile(path.getName) }
-          .map { path => parseSparkVersion(path.getName) }
-          .getOrElse("<Unknown>"),
-        compressionCodec = filePaths
-          .find { path => isCompressionCodecFile(path.getName) }
-          .map { path =>
-            val codec = EventLoggingListener.parseCompressionCodec(path.getName)
-            val conf = new SparkConf
-            conf.set("spark.io.compression.codec", codec)
-            codecMap.getOrElseUpdate(codec, CompressionCodec.createCodec(conf))
-          },
-        applicationComplete = filePaths.exists { path => isApplicationCompleteFile(path.getName) }
-      )
+
+      val sparkVersion = meta.get("version").getOrElse(
+        throw new IllegalArgumentException("Missing Spark version in log metadata."))
+      val codec = meta.get("compressionCodec").map { codecName =>
+        codecMap.getOrElseUpdate(codecName, CompressionCodec.createCodec(new SparkConf, codecName))
+      }
+      (codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion)
     } catch {
       case e: Exception =>
-        logError("Exception in parsing logging info from directory %s".format(logDir), e)
-        EventLoggingInfo.empty
+        in.close()
+        throw e
     }
   }
 
-  /**
-   * Parse the event logging information associated with the logs in the given directory.
-   */
-  def parseLoggingInfo(logDir: String, fileSystem: FileSystem): EventLoggingInfo = {
-    parseLoggingInfo(new Path(logDir), fileSystem)
-  }
-}
-
-
-/**
- * Information needed to process the event logs associated with an application.
- */
-private[spark] case class EventLoggingInfo(
-    logPaths: Seq[Path],
-    sparkVersion: String,
-    compressionCodec: Option[CompressionCodec],
-    applicationComplete: Boolean = false)
-
-private[spark] object EventLoggingInfo {
-  def empty = EventLoggingInfo(Seq[Path](), "<Unknown>", None, applicationComplete = false)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/45645191/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index f89724d..584f4e7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -17,74 +17,45 @@
 
 package org.apache.spark.scheduler
 
-import java.io.{BufferedInputStream, InputStream}
+import java.io.{InputStream, IOException}
 
 import scala.io.Source
 
-import org.apache.hadoop.fs.{Path, FileSystem}
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.Logging
-import org.apache.spark.io.CompressionCodec
 import org.apache.spark.util.JsonProtocol
 
 /**
- * A SparkListenerBus that replays logged events from persisted storage.
- *
- * This assumes the given paths are valid log files, where each line can be deserialized into
- * exactly one SparkListenerEvent.
+ * A SparkListenerBus that can be used to replay events from serialized event data.
  */
-private[spark] class ReplayListenerBus(
-    logPaths: Seq[Path],
-    fileSystem: FileSystem,
-    compressionCodec: Option[CompressionCodec])
-  extends SparkListenerBus with Logging {
-
-  private var replayed = false
-
-  if (logPaths.length == 0) {
-    logWarning("Log path provided contains no log files.")
-  }
+private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
 
   /**
-   * Replay each event in the order maintained in the given logs.
-   * This should only be called exactly once.
+   * Replay each event in the order maintained in the given stream. The stream is expected to
+   * contain one JSON-encoded SparkListenerEvent per line.
+   *
+   * This method can be called multiple times, but the listener behavior is undefined after any
+   * error is thrown by this method.
+   *
+   * @param logData Stream containing event log data.
+   * @param version Spark version that generated the events.
    */
-  def replay() {
-    assert(!replayed, "ReplayListenerBus cannot replay events more than once")
-    logPaths.foreach { path =>
-      // Keep track of input streams at all levels to close them later
-      // This is necessary because an exception can occur in between stream initializations
-      var fileStream: Option[InputStream] = None
-      var bufferedStream: Option[InputStream] = None
-      var compressStream: Option[InputStream] = None
-      var currentLine = "<not started>"
-      try {
-        fileStream = Some(fileSystem.open(path))
-        bufferedStream = Some(new BufferedInputStream(fileStream.get))
-        compressStream = Some(wrapForCompression(bufferedStream.get))
-
-        // Parse each line as an event and post the event to all attached listeners
-        val lines = Source.fromInputStream(compressStream.get).getLines()
-        lines.foreach { line =>
-          currentLine = line
-          postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
-        }
-      } catch {
-        case e: Exception =>
-          logError("Exception in parsing Spark event log %s".format(path), e)
-          logError("Malformed line: %s\n".format(currentLine))
-      } finally {
-        fileStream.foreach(_.close())
-        bufferedStream.foreach(_.close())
-        compressStream.foreach(_.close())
+  def replay(logData: InputStream, version: String) {
+    var currentLine: String = null
+    try {
+      val lines = Source.fromInputStream(logData).getLines()
+      lines.foreach { line =>
+        currentLine = line
+        postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
       }
+    } catch {
+      case ioe: IOException =>
+        throw ioe
+      case e: Exception =>
+        logError("Exception in parsing Spark event log.", e)
+        logError("Malformed line: %s\n".format(currentLine))
     }
-    replayed = true
   }
 
-  /** If a compression codec is specified, wrap the given stream in a compression stream. */
-  private def wrapForCompression(stream: InputStream): InputStream = {
-    compressionCodec.map(_.compressedInputStream(stream)).getOrElse(stream)
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/45645191/core/src/main/scala/org/apache/spark/util/FileLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
deleted file mode 100644
index fdc73f0..0000000
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.io.{BufferedOutputStream, FileOutputStream, IOException, PrintWriter}
-import java.net.URI
-import java.text.SimpleDateFormat
-import java.util.Date
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
-import org.apache.hadoop.fs.permission.FsPermission
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.io.CompressionCodec
-
-/**
- * A generic class for logging information to file.
- *
- * @param logDir Path to the directory in which files are logged
- * @param outputBufferSize The buffer size to use when writing to an output stream in bytes
- * @param compress Whether to compress output
- * @param overwrite Whether to overwrite existing files
- */
-private[spark] class FileLogger(
-    logDir: String,
-    sparkConf: SparkConf,
-    hadoopConf: Configuration,
-    outputBufferSize: Int = 8 * 1024, // 8 KB
-    compress: Boolean = false,
-    overwrite: Boolean = true,
-    dirPermissions: Option[FsPermission] = None)
-  extends Logging {
-
-  def this(
-      logDir: String,
-      sparkConf: SparkConf,
-      compress: Boolean,
-      overwrite: Boolean) = {
-    this(logDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf), compress = compress,
-      overwrite = overwrite)
-  }
-
-  def this(
-      logDir: String,
-      sparkConf: SparkConf,
-      compress: Boolean) = {
-    this(logDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf), compress = compress,
-      overwrite = true)
-  }
-
-  def this(
-      logDir: String,
-      sparkConf: SparkConf) = {
-    this(logDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf), compress = false,
-      overwrite = true)
-  }
-
-  private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
-    override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
-  }
-
-  /**
-   * To avoid effects of FileSystem#close or FileSystem.closeAll called from other modules,
-   * create unique FileSystem instance only for FileLogger
-   */
-  private val fileSystem = {
-    val conf = SparkHadoopUtil.get.newConfiguration(sparkConf)
-    val logUri = new URI(logDir)
-    val scheme = logUri.getScheme
-    if (scheme == "hdfs") {
-      conf.setBoolean("fs.hdfs.impl.disable.cache", true)
-    }
-    FileSystem.get(logUri, conf)
-  }
-
-  var fileIndex = 0
-
-  // Only used if compression is enabled
-  private lazy val compressionCodec = CompressionCodec.createCodec(sparkConf)
-
-  // Only defined if the file system scheme is not local
-  private var hadoopDataStream: Option[FSDataOutputStream] = None
-
-  // The Hadoop APIs have changed over time, so we use reflection to figure out
-  // the correct method to use to flush a hadoop data stream. See SPARK-1518
-  // for details.
-  private val hadoopFlushMethod = {
-    val cls = classOf[FSDataOutputStream]
-    scala.util.Try(cls.getMethod("hflush")).getOrElse(cls.getMethod("sync"))
-  }
-
-  private var writer: Option[PrintWriter] = None
-
-  /**
-   * Start this logger by creating the logging directory.
-   */
-  def start() {
-    createLogDir()
-  }
-
-  /**
-   * Create a logging directory with the given path.
-   */
-  private def createLogDir() {
-    val path = new Path(logDir)
-    if (fileSystem.exists(path)) {
-      if (overwrite) {
-        logWarning("Log directory %s already exists. Overwriting...".format(logDir))
-        // Second parameter is whether to delete recursively
-        fileSystem.delete(path, true)
-      } else {
-        throw new IOException("Log directory %s already exists!".format(logDir))
-      }
-    }
-    if (!fileSystem.mkdirs(path)) {
-      throw new IOException("Error in creating log directory: %s".format(logDir))
-    }
-    if (dirPermissions.isDefined) {
-      val fsStatus = fileSystem.getFileStatus(path)
-      if (fsStatus.getPermission.toShort != dirPermissions.get.toShort) {
-        fileSystem.setPermission(path, dirPermissions.get)
-      }
-    }
-  }
-
-  /**
-   * Create a new writer for the file identified by the given path.
-   * If the permissions are not passed in, it will default to use the permissions
-   * (dirPermissions) used when class was instantiated.
-   */
-  private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = {
-    val logPath = logDir + "/" + fileName
-    val uri = new URI(logPath)
-    val path = new Path(logPath)
-    val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
-    val isDefaultLocal = defaultFs == null || defaultFs == "file"
-
-    /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
-     * Therefore, for local files, use FileOutputStream instead. */
-    val dstream =
-      if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
-        // Second parameter is whether to append
-        new FileOutputStream(uri.getPath, !overwrite)
-      } else {
-        hadoopDataStream = Some(fileSystem.create(path, overwrite))
-        hadoopDataStream.get
-      }
-
-    perms.orElse(dirPermissions).foreach { p => fileSystem.setPermission(path, p) }
-    val bstream = new BufferedOutputStream(dstream, outputBufferSize)
-    val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
-    new PrintWriter(cstream)
-  }
-
-  /**
-   * Log the message to the given writer.
-   * @param msg The message to be logged
-   * @param withTime Whether to prepend message with a timestamp
-   */
-  def log(msg: String, withTime: Boolean = false) {
-    val writeInfo = if (!withTime) {
-      msg
-    } else {
-      val date = new Date(System.currentTimeMillis)
-      dateFormat.get.format(date) + ": " + msg
-    }
-    writer.foreach(_.print(writeInfo))
-  }
-
-  /**
-   * Log the message to the given writer as a new line.
-   * @param msg The message to be logged
-   * @param withTime Whether to prepend message with a timestamp
-   */
-  def logLine(msg: String, withTime: Boolean = false) = log(msg + "\n", withTime)
-
-  /**
-   * Flush the writer to disk manually.
-   *
-   * When using a Hadoop filesystem, we need to invoke the hflush or sync
-   * method. In HDFS, hflush guarantees that the data gets to all the
-   * DataNodes.
-   */
-  def flush() {
-    writer.foreach(_.flush())
-    hadoopDataStream.foreach(hadoopFlushMethod.invoke(_))
-  }
-
-  /**
-   * Close the writer. Any subsequent calls to log or flush will have no effect.
-   */
-  def close() {
-    writer.foreach(_.close())
-    writer = None
-  }
-
-  /**
-   * Start a writer for a new file, closing the existing one if it exists.
-   * @param fileName Name of the new file, defaulting to the file index if not provided.
-   * @param perms Permissions to put on the new file.
-   */
-  def newFile(fileName: String = "", perms: Option[FsPermission] = None) {
-    fileIndex += 1
-    writer.foreach(_.close())
-    val name = fileName match {
-      case "" => fileIndex.toString
-      case _ => fileName
-    }
-    writer = Some(createWriter(name, perms))
-  }
-
-  /**
-   * Close all open writers, streams, and file systems. Any subsequent uses of this FileLogger
-   * instance will throw exceptions.
-   */
-  def stop() {
-    hadoopDataStream.foreach(_.close())
-    writer.foreach(_.close())
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/45645191/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
new file mode 100644
index 0000000..d719e93
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{File, FileOutputStream, OutputStreamWriter}
+
+import scala.io.Source
+
+import com.google.common.io.Files
+import org.apache.hadoop.fs.Path
+import org.json4s.jackson.JsonMethods._
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.Matchers
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.io._
+import org.apache.spark.scheduler._
+import org.apache.spark.util.{JsonProtocol, Utils}
+
+class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
+
+  private var testDir: File = null
+
+  private var provider: FsHistoryProvider = null
+
+  before {
+    testDir = Utils.createTempDir()
+    provider = new FsHistoryProvider(new SparkConf()
+      .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
+      .set("spark.history.fs.updateInterval", "0"))
+  }
+
+  after {
+    Utils.deleteRecursively(testDir)
+  }
+
+  test("Parse new and old application logs") {
+    val conf = new SparkConf()
+      .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
+      .set("spark.history.fs.updateInterval", "0")
+    val provider = new FsHistoryProvider(conf)
+
+    // Write a new-style application log.
+    val logFile1 = new File(testDir, "new1")
+    writeFile(logFile1, true, None,
+      SparkListenerApplicationStart("app1-1", None, 1L, "test"),
+      SparkListenerApplicationEnd(2L)
+      )
+
+    // Write an unfinished app, new-style.
+    writeFile(new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS), true, None,
+      SparkListenerApplicationStart("app2-2", None, 1L, "test")
+      )
+
+    // Write an old-style application log.
+    val oldLog = new File(testDir, "old1")
+    oldLog.mkdir()
+    createEmptyFile(new File(oldLog, provider.SPARK_VERSION_PREFIX + "1.0"))
+    writeFile(new File(oldLog, provider.LOG_PREFIX + "1"), false, None,
+      SparkListenerApplicationStart("app3", None, 2L, "test"),
+      SparkListenerApplicationEnd(3L)
+      )
+    createEmptyFile(new File(oldLog, provider.APPLICATION_COMPLETE))
+
+    // Write an unfinished app, old-style.
+    val oldLog2 = new File(testDir, "old2")
+    oldLog2.mkdir()
+    createEmptyFile(new File(oldLog2, provider.SPARK_VERSION_PREFIX + "1.0"))
+    writeFile(new File(oldLog2, provider.LOG_PREFIX + "1"), false, None,
+      SparkListenerApplicationStart("app4", None, 2L, "test")
+      )
+
+    // Force a reload of data from the log directory, and check that both logs are loaded.
+    // Take the opportunity to check that the offset checks work as expected.
+    provider.checkForLogs()
+
+    val list = provider.getListing().toSeq
+    list should not be (null)
+    list.size should be (2)
+
+    list(0) should be (ApplicationHistoryInfo(oldLog.getName(), "app3", 2L, 3L,
+      oldLog.lastModified(), "test"))
+    list(1) should be (ApplicationHistoryInfo(logFile1.getName(), "app1-1", 1L, 2L,
+      logFile1.lastModified(), "test"))
+
+    // Make sure the UI can be rendered.
+    list.foreach { case info =>
+      val appUi = provider.getAppUI(info.id)
+      appUi should not be null
+    }
+  }
+
+  test("Parse legacy logs with compression codec set") {
+    val testCodecs = List((classOf[LZFCompressionCodec].getName(), true),
+      (classOf[SnappyCompressionCodec].getName(), true),
+      ("invalid.codec", false))
+
+    testCodecs.foreach { case (codecName, valid) =>
+      val codec = if (valid) CompressionCodec.createCodec(new SparkConf(), codecName) else null
+      val logDir = new File(testDir, codecName)
+      logDir.mkdir()
+      createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
+      writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
+        SparkListenerApplicationStart("app2", None, 2L, "test"),
+        SparkListenerApplicationEnd(3L)
+        )
+      createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
+
+      val logPath = new Path(logDir.getAbsolutePath())
+      try {
+        val (logInput, sparkVersion) = provider.openLegacyEventLog(logPath)
+        try {
+          Source.fromInputStream(logInput).getLines().toSeq.size should be (2)
+        } finally {
+          logInput.close()
+        }
+      } catch {
+        case e: IllegalArgumentException =>
+          valid should be (false)
+      }
+    }
+  }
+
+  test("SPARK-3697: ignore directories that cannot be read.") {
+    val logFile1 = new File(testDir, "new1")
+    writeFile(logFile1, true, None,
+      SparkListenerApplicationStart("app1-1", None, 1L, "test"),
+      SparkListenerApplicationEnd(2L)
+      )
+    val logFile2 = new File(testDir, "new2")
+    writeFile(logFile2, true, None,
+      SparkListenerApplicationStart("app1-2", None, 1L, "test"),
+      SparkListenerApplicationEnd(2L)
+      )
+    logFile2.setReadable(false, false)
+
+    val conf = new SparkConf()
+      .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
+      .set("spark.history.fs.updateInterval", "0")
+    val provider = new FsHistoryProvider(conf)
+    provider.checkForLogs()
+
+    val list = provider.getListing().toSeq
+    list should not be (null)
+    list.size should be (1)
+  }
+
+  private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
+    events: SparkListenerEvent*) = {
+    val out =
+      if (isNewFormat) {
+        EventLoggingListener.initEventLog(new FileOutputStream(file), codec)
+      } else {
+        val fileStream = new FileOutputStream(file)
+        codec.map(_.compressedOutputStream(fileStream)).getOrElse(fileStream)
+      }
+    val writer = new OutputStreamWriter(out, "UTF-8")
+    try {
+      events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n"))
+    } finally {
+      writer.close()
+    }
+  }
+
+  private def createEmptyFile(file: File) = {
+    new FileOutputStream(file).close()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/45645191/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index abc300f..5909811 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -17,69 +17,59 @@
 
 package org.apache.spark.scheduler
 
+import java.io.{File, FileOutputStream, InputStream, IOException}
+
 import scala.collection.mutable
 import scala.io.Source
 
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.Path
 import org.json4s.jackson.JsonMethods._
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext}
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.io.CompressionCodec
-import org.apache.spark.SPARK_VERSION
+import org.apache.spark.io._
 import org.apache.spark.util.{JsonProtocol, Utils}
 
-import java.io.File
-
 /**
  * Test whether EventLoggingListener logs events properly.
  *
- * This tests whether EventLoggingListener actually creates special files while logging events,
- * whether the parsing of these special files is correct, and whether the logged events can be
- * read and deserialized into actual SparkListenerEvents.
+ * This tests whether EventLoggingListener actually log files with expected name patterns while
+ * logging events, whether the parsing of the file names is correct, and whether the logged events
+ * can be read and deserialized into actual SparkListenerEvents.
  */
-class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
+class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Logging {
+  import EventLoggingListenerSuite._
+
   private val fileSystem = Utils.getHadoopFileSystem("/",
     SparkHadoopUtil.get.newConfiguration(new SparkConf()))
-  private val allCompressionCodecs = Seq[String](
-    "org.apache.spark.io.LZFCompressionCodec",
-    "org.apache.spark.io.SnappyCompressionCodec"
-  )
   private var testDir: File = _
-  private var logDirPath: Path = _
+  private var testDirPath: Path = _
 
   before {
     testDir = Utils.createTempDir()
-    logDirPath = Utils.getFilePath(testDir, "spark-events")
+    testDir.deleteOnExit()
+    testDirPath = new Path(testDir.getAbsolutePath())
   }
 
   after {
     Utils.deleteRecursively(testDir)
   }
 
-  test("Parse names of special files") {
-    testParsingFileName()
-  }
-
-  test("Verify special files exist") {
-    testSpecialFilesExist()
-  }
-
-  test("Verify special files exist with compression") {
-    allCompressionCodecs.foreach { codec =>
-      testSpecialFilesExist(compressionCodec = Some(codec))
-    }
-  }
+  test("Verify log file exist") {
+    // Verify logging directory exists
+    val conf = getLoggingConf(testDirPath)
+    val eventLogger = new EventLoggingListener("test", testDirPath.toUri().toString(), conf)
+    eventLogger.start()
 
-  test("Parse event logging info") {
-    testParsingLogInfo()
-  }
+    val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
+    assert(fileSystem.exists(logPath))
+    val logStatus = fileSystem.getFileStatus(logPath)
+    assert(logStatus.isFile)
 
-  test("Parse event logging info with compression") {
-    allCompressionCodecs.foreach { codec =>
-      testParsingLogInfo(compressionCodec = Some(codec))
-    }
+    // Verify log is renamed after stop()
+    eventLogger.stop()
+    assert(fileSystem.getFileStatus(new Path(eventLogger.logPath)).isFile())
   }
 
   test("Basic event logging") {
@@ -87,7 +77,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
   }
 
   test("Basic event logging with compression") {
-    allCompressionCodecs.foreach { codec =>
+    CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec =>
       testEventLogging(compressionCodec = Some(codec))
     }
   }
@@ -97,11 +87,25 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
   }
 
   test("End-to-end event logging with compression") {
-    allCompressionCodecs.foreach { codec =>
+    CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec =>
       testApplicationEventLogging(compressionCodec = Some(codec))
     }
   }
 
+  test("Log overwriting") {
+    val log = new FileOutputStream(new File(testDir, "test"))
+    log.close()
+    try {
+      testEventLogging()
+      assert(false)
+    } catch {
+      case e: IOException =>
+        // Expected, since we haven't enabled log overwrite.
+    }
+
+    // Try again, but enable overwriting.
+    testEventLogging(extraConf = Map("spark.eventLog.overwrite" -> "true"))
+  }
 
   /* ----------------- *
    * Actual test logic *
@@ -110,129 +114,18 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
   import EventLoggingListenerSuite._
 
   /**
-   * Test whether names of special files are correctly identified and parsed.
-   */
-  private def testParsingFileName() {
-    val logPrefix = EventLoggingListener.LOG_PREFIX
-    val sparkVersionPrefix = EventLoggingListener.SPARK_VERSION_PREFIX
-    val compressionCodecPrefix = EventLoggingListener.COMPRESSION_CODEC_PREFIX
-    val applicationComplete = EventLoggingListener.APPLICATION_COMPLETE
-    assert(EventLoggingListener.isEventLogFile(logPrefix + "0"))
-    assert(EventLoggingListener.isEventLogFile(logPrefix + "100"))
-    assert(EventLoggingListener.isEventLogFile(logPrefix + "ANYTHING"))
-    assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "0.9.1"))
-    assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "1.0.0"))
-    assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "ANYTHING"))
-    assert(EventLoggingListener.isApplicationCompleteFile(applicationComplete))
-    allCompressionCodecs.foreach { codec =>
-      assert(EventLoggingListener.isCompressionCodecFile(compressionCodecPrefix + codec))
-    }
-
-    // Negatives
-    assert(!EventLoggingListener.isEventLogFile("The greatest man of all mankind"))
-    assert(!EventLoggingListener.isSparkVersionFile("Will never falter in the face of death!"))
-    assert(!EventLoggingListener.isCompressionCodecFile("Unless he chooses to leave behind"))
-    assert(!EventLoggingListener.isApplicationCompleteFile("The very treasure he calls Macbeth"))
-
-    // Verify that parsing is correct
-    assert(EventLoggingListener.parseSparkVersion(sparkVersionPrefix + "1.0.0") === "1.0.0")
-    allCompressionCodecs.foreach { codec =>
-      assert(EventLoggingListener.parseCompressionCodec(compressionCodecPrefix + codec) === codec)
-    }
-  }
-
-  /**
-   * Test whether the special files produced by EventLoggingListener exist.
-   *
-   * There should be exactly one event log and one spark version file throughout the entire
-   * execution. If a compression codec is specified, then the compression codec file should
-   * also exist. Only after the application has completed does the test expect the application
-   * completed file to be present.
-   */
-  private def testSpecialFilesExist(compressionCodec: Option[String] = None) {
-
-    def assertFilesExist(logFiles: Array[FileStatus], loggerStopped: Boolean) {
-      val numCompressionCodecFiles = if (compressionCodec.isDefined) 1 else 0
-      val numApplicationCompleteFiles = if (loggerStopped) 1 else 0
-      assert(logFiles.size === 2 + numCompressionCodecFiles + numApplicationCompleteFiles)
-      assert(eventLogsExist(logFiles))
-      assert(sparkVersionExists(logFiles))
-      assert(compressionCodecExists(logFiles) === compressionCodec.isDefined)
-      assert(applicationCompleteExists(logFiles) === loggerStopped)
-      assertSparkVersionIsValid(logFiles)
-      compressionCodec.foreach { codec =>
-        assertCompressionCodecIsValid(logFiles, codec)
-      }
-    }
-
-    // Verify logging directory exists
-    val conf = getLoggingConf(logDirPath, compressionCodec)
-    val logBaseDir = conf.get("spark.eventLog.dir")
-    val appId = EventLoggingListenerSuite.getUniqueApplicationId
-    val eventLogger = new EventLoggingListener(appId, logBaseDir, conf)
-    eventLogger.start()
-    val logPath = new Path(eventLogger.logDir)
-    assert(fileSystem.exists(logPath))
-    val logDir = fileSystem.getFileStatus(logPath)
-    assert(logDir.isDir)
-
-    // Verify special files are as expected before stop()
-    var logFiles = fileSystem.listStatus(logPath)
-    assert(logFiles != null)
-    assertFilesExist(logFiles, loggerStopped = false)
-
-    // Verify special files are as expected after stop()
-    eventLogger.stop()
-    logFiles = fileSystem.listStatus(logPath)
-    assertFilesExist(logFiles, loggerStopped = true)
-  }
-
-  /**
-   * Test whether EventLoggingListener correctly parses the correct information from the logs.
-   *
-   * This includes whether it returns the correct Spark version, compression codec (if any),
-   * and the application's completion status.
-   */
-  private def testParsingLogInfo(compressionCodec: Option[String] = None) {
-
-    def assertInfoCorrect(info: EventLoggingInfo, loggerStopped: Boolean) {
-      assert(info.logPaths.size > 0)
-      assert(info.sparkVersion === SPARK_VERSION)
-      assert(info.compressionCodec.isDefined === compressionCodec.isDefined)
-      info.compressionCodec.foreach { codec =>
-        assert(compressionCodec.isDefined)
-        val expectedCodec = compressionCodec.get.split('.').last
-        assert(codec.getClass.getSimpleName === expectedCodec)
-      }
-      assert(info.applicationComplete === loggerStopped)
-    }
-
-    // Verify that all information is correctly parsed before stop()
-    val conf = getLoggingConf(logDirPath, compressionCodec)
-    val logBaseDir = conf.get("spark.eventLog.dir")
-    val appId = EventLoggingListenerSuite.getUniqueApplicationId
-    val eventLogger = new EventLoggingListener(appId, logBaseDir, conf)
-    eventLogger.start()
-    var eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
-    assertInfoCorrect(eventLoggingInfo, loggerStopped = false)
-
-    // Verify that all information is correctly parsed after stop()
-    eventLogger.stop()
-    eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
-    assertInfoCorrect(eventLoggingInfo, loggerStopped = true)
-  }
-
-  /**
    * Test basic event logging functionality.
    *
    * This creates two simple events, posts them to the EventLoggingListener, and verifies that
    * exactly these two events are logged in the expected file.
    */
-  private def testEventLogging(compressionCodec: Option[String] = None) {
-    val conf = getLoggingConf(logDirPath, compressionCodec)
-    val logBaseDir = conf.get("spark.eventLog.dir")
-    val appId = EventLoggingListenerSuite.getUniqueApplicationId
-    val eventLogger = new EventLoggingListener(appId, logBaseDir, conf)
+  private def testEventLogging(
+      compressionCodec: Option[String] = None,
+      extraConf: Map[String, String] = Map()) {
+    val conf = getLoggingConf(testDirPath, compressionCodec)
+    extraConf.foreach { case (k, v) => conf.set(k, v) }
+    val logName = compressionCodec.map("test-" + _).getOrElse("test")
+    val eventLogger = new EventLoggingListener(logName, testDirPath.toUri().toString(), conf)
     val listenerBus = new LiveListenerBus
     val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
       125L, "Mickey")
@@ -244,17 +137,21 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
     listenerBus.addListener(eventLogger)
     listenerBus.postToAll(applicationStart)
     listenerBus.postToAll(applicationEnd)
+    eventLogger.stop()
 
     // Verify file contains exactly the two events logged
-    val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
-    assert(eventLoggingInfo.logPaths.size > 0)
-    val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec)
-    assert(lines.size === 2)
-    assert(lines(0).contains("SparkListenerApplicationStart"))
-    assert(lines(1).contains("SparkListenerApplicationEnd"))
-    assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === applicationStart)
-    assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationEnd)
-    eventLogger.stop()
+    val (logData, version) = EventLoggingListener.openEventLog(new Path(eventLogger.logPath),
+      fileSystem)
+    try {
+      val lines = readLines(logData)
+      assert(lines.size === 2)
+      assert(lines(0).contains("SparkListenerApplicationStart"))
+      assert(lines(1).contains("SparkListenerApplicationEnd"))
+      assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === applicationStart)
+      assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationEnd)
+    } finally {
+      logData.close()
+    }
   }
 
   /**
@@ -262,12 +159,12 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
    * This runs a simple Spark job and asserts that the expected events are logged when expected.
    */
   private def testApplicationEventLogging(compressionCodec: Option[String] = None) {
-    val conf = getLoggingConf(logDirPath, compressionCodec)
+    val conf = getLoggingConf(testDirPath, compressionCodec)
     val sc = new SparkContext("local", "test", conf)
     assert(sc.eventLogger.isDefined)
     val eventLogger = sc.eventLogger.get
-    val expectedLogDir = logDirPath.toString
-    assert(eventLogger.logDir.contains(expectedLogDir))
+    val expectedLogDir = testDir.toURI().toString()
+    assert(eventLogger.logPath.startsWith(expectedLogDir + "/"))
 
     // Begin listening for events that trigger asserts
     val eventExistenceListener = new EventExistenceListener(eventLogger)
@@ -279,16 +176,22 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
 
     // Ensure all asserts have actually been triggered
     eventExistenceListener.assertAllCallbacksInvoked()
-  }
 
-  /**
-   * Assert that all of the specified events are logged by the given EventLoggingListener.
-   */
-  private def assertEventsExist(eventLogger: EventLoggingListener, events: Seq[String]) {
-    val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
-    assert(eventLoggingInfo.logPaths.size > 0)
-    val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec)
-    val eventSet = mutable.Set(events: _*)
+    // Make sure expected events exist in the log file.
+    val (logData, version) = EventLoggingListener.openEventLog(new Path(eventLogger.logPath),
+      fileSystem)
+    val lines = readLines(logData)
+    val eventSet = mutable.Set(
+      SparkListenerApplicationStart,
+      SparkListenerBlockManagerAdded,
+      SparkListenerEnvironmentUpdate,
+      SparkListenerJobStart,
+      SparkListenerJobEnd,
+      SparkListenerStageSubmitted,
+      SparkListenerStageCompleted,
+      SparkListenerTaskStart,
+      SparkListenerTaskEnd,
+      SparkListenerApplicationEnd).map(Utils.getFormattedClassName)
     lines.foreach { line =>
       eventSet.foreach { event =>
         if (line.contains(event)) {
@@ -303,19 +206,8 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
     assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
   }
 
-  /**
-   * Read all lines from the file specified by the given path.
-   * If a compression codec is specified, use it to read the file.
-   */
-  private def readFileLines(
-      filePath: Path,
-      compressionCodec: Option[CompressionCodec]): Seq[String] = {
-    val fstream = fileSystem.open(filePath)
-    val cstream =
-      compressionCodec.map { codec =>
-        codec.compressedInputStream(fstream)
-      }.getOrElse(fstream)
-    Source.fromInputStream(cstream).getLines().toSeq
+  private def readLines(in: InputStream): Seq[String] = {
+    Source.fromInputStream(in).getLines().toSeq
   }
 
   /**
@@ -328,30 +220,14 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
     var appEnded = false
 
     override def onJobStart(jobStart: SparkListenerJobStart) {
-      assertEventsExist(eventLogger, Seq[String](
-        Utils.getFormattedClassName(SparkListenerApplicationStart),
-        Utils.getFormattedClassName(SparkListenerBlockManagerAdded),
-        Utils.getFormattedClassName(SparkListenerEnvironmentUpdate)
-      ))
       jobStarted = true
     }
 
     override def onJobEnd(jobEnd: SparkListenerJobEnd) {
-      assertEventsExist(eventLogger, Seq[String](
-        Utils.getFormattedClassName(SparkListenerJobStart),
-        Utils.getFormattedClassName(SparkListenerJobEnd),
-        Utils.getFormattedClassName(SparkListenerStageSubmitted),
-        Utils.getFormattedClassName(SparkListenerStageCompleted),
-        Utils.getFormattedClassName(SparkListenerTaskStart),
-        Utils.getFormattedClassName(SparkListenerTaskEnd)
-      ))
       jobEnded = true
     }
 
     override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
-      assertEventsExist(eventLogger, Seq[String](
-        Utils.getFormattedClassName(SparkListenerApplicationEnd)
-      ))
       appEnded = true
     }
 
@@ -362,39 +238,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
     }
   }
 
-
-  /* -------------------------------------------------------- *
-   * Helper methods for validating state of the special files *
-   * -------------------------------------------------------- */
-
-  private def eventLogsExist(logFiles: Array[FileStatus]): Boolean = {
-    logFiles.map(_.getPath.getName).exists(EventLoggingListener.isEventLogFile)
-  }
-
-  private def sparkVersionExists(logFiles: Array[FileStatus]): Boolean = {
-    logFiles.map(_.getPath.getName).exists(EventLoggingListener.isSparkVersionFile)
-  }
-
-  private def compressionCodecExists(logFiles: Array[FileStatus]): Boolean = {
-    logFiles.map(_.getPath.getName).exists(EventLoggingListener.isCompressionCodecFile)
-  }
-
-  private def applicationCompleteExists(logFiles: Array[FileStatus]): Boolean = {
-    logFiles.map(_.getPath.getName).exists(EventLoggingListener.isApplicationCompleteFile)
-  }
-
-  private def assertSparkVersionIsValid(logFiles: Array[FileStatus]) {
-    val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isSparkVersionFile)
-    assert(file.isDefined)
-    assert(EventLoggingListener.parseSparkVersion(file.get) === SPARK_VERSION)
-  }
-
-  private def assertCompressionCodecIsValid(logFiles: Array[FileStatus], compressionCodec: String) {
-    val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isCompressionCodecFile)
-    assert(file.isDefined)
-    assert(EventLoggingListener.parseCompressionCodec(file.get) === compressionCodec)
-  }
-
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/45645191/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 90bdfe0..7e635cb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -22,6 +22,7 @@ import java.io.{File, PrintWriter}
 import org.json4s.jackson.JsonMethods._
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
+import org.apache.spark.{SparkConf, SparkContext, SPARK_VERSION}
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io.CompressionCodec
@@ -33,7 +34,6 @@ import org.apache.spark.util.{JsonProtocol, Utils}
 class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
   private val fileSystem = Utils.getHadoopFileSystem("/",
     SparkHadoopUtil.get.newConfiguration(new SparkConf()))
-  private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
   private var testDir: File = _
 
   before {
@@ -45,13 +45,29 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
   }
 
   test("Simple replay") {
-    testSimpleReplay()
-  }
+    val logFilePath = Utils.getFilePath(testDir, "events.txt")
+    val fstream = fileSystem.create(logFilePath)
+    val writer = new PrintWriter(fstream)
+    val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
+      125L, "Mickey")
+    val applicationEnd = SparkListenerApplicationEnd(1000L)
+    writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
+    writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
+    writer.close()
 
-  test("Simple replay with compression") {
-    allCompressionCodecs.foreach { codec =>
-      testSimpleReplay(Some(codec))
+    val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
+    val logData = fileSystem.open(logFilePath)
+    val eventMonster = new EventMonster(conf)
+    try {
+      val replayer = new ReplayListenerBus()
+      replayer.addListener(eventMonster)
+      replayer.replay(logData, SPARK_VERSION)
+    } finally {
+      logData.close()
     }
+    assert(eventMonster.loggedEvents.size === 2)
+    assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart))
+    assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd))
   }
 
   // This assumes the correctness of EventLoggingListener
@@ -61,7 +77,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
 
   // This assumes the correctness of EventLoggingListener
   test("End-to-end replay with compression") {
-    allCompressionCodecs.foreach { codec =>
+    CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec =>
       testApplicationReplay(Some(codec))
     }
   }
@@ -72,31 +88,6 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
    * ----------------- */
 
   /**
-   * Test simple replaying of events.
-   */
-  private def testSimpleReplay(codecName: Option[String] = None) {
-    val logFilePath = Utils.getFilePath(testDir, "events.txt")
-    val codec = codecName.map(getCompressionCodec)
-    val fstream = fileSystem.create(logFilePath)
-    val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
-    val writer = new PrintWriter(cstream)
-    val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
-      125L, "Mickey")
-    val applicationEnd = SparkListenerApplicationEnd(1000L)
-    writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
-    writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
-    writer.close()
-    val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec)
-    val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath, codecName)
-    val eventMonster = new EventMonster(conf)
-    replayer.addListener(eventMonster)
-    replayer.replay()
-    assert(eventMonster.loggedEvents.size === 2)
-    assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart))
-    assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd))
-  }
-
-  /**
    * Test end-to-end replaying of events.
    *
    * This test runs a few simple jobs with event logging enabled, and compares each emitted
@@ -105,6 +96,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
    */
   private def testApplicationReplay(codecName: Option[String] = None) {
     val logDirPath = Utils.getFilePath(testDir, "test-replay")
+    fileSystem.mkdirs(logDirPath)
+
     val conf = EventLoggingListenerSuite.getLoggingConf(logDirPath, codecName)
     val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf)
 
@@ -116,22 +109,21 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
     sc.stop()
 
     // Prepare information needed for replay
-    val codec = codecName.map(getCompressionCodec)
     val applications = fileSystem.listStatus(logDirPath)
     assert(applications != null && applications.size > 0)
-    val eventLogDir = applications.sortBy(_.getAccessTime).last
-    assert(eventLogDir.isDir)
-    val logFiles = fileSystem.listStatus(eventLogDir.getPath)
-    assert(logFiles != null && logFiles.size > 0)
-    val logFile = logFiles.find(_.getPath.getName.startsWith("EVENT_LOG_"))
-    assert(logFile.isDefined)
-    val logFilePath = logFile.get.getPath
+    val eventLog = applications.sortBy(_.getModificationTime).last
+    assert(eventLog.isFile)
 
     // Replay events
-    val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec)
+    val (logData, version) = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem)
     val eventMonster = new EventMonster(conf)
-    replayer.addListener(eventMonster)
-    replayer.replay()
+    try {
+      val replayer = new ReplayListenerBus()
+      replayer.addListener(eventMonster)
+      replayer.replay(logData, version)
+    } finally {
+      logData.close()
+    }
 
     // Verify the same events are replayed in the same order
     assert(sc.eventLogger.isDefined)
@@ -154,7 +146,9 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
    */
   private class EventMonster(conf: SparkConf)
     extends EventLoggingListener("test", "testdir", conf) {
-    logger.close()
+
+    override def start() { }
+
   }
 
   private def getCompressionCodec(codecName: String) = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org