You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/02/02 01:55:33 UTC

[9/9] spark git commit: [SPARK-12790][CORE] Remove HistoryServer old multiple files format

[SPARK-12790][CORE] Remove HistoryServer old multiple files format

Removed isLegacyLogDirectory code path and updated tests
andrewor14

Author: felixcheung <fe...@hotmail.com>

Closes #10860 from felixcheung/historyserverformat.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0df3cfb8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0df3cfb8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0df3cfb8

Branch: refs/heads/master
Commit: 0df3cfb8ab4d584c95db6c340694e199d7b59e9e
Parents: 715a19d
Author: felixcheung <fe...@hotmail.com>
Authored: Mon Feb 1 16:55:21 2016 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Mon Feb 1 16:55:21 2016 -0800

----------------------------------------------------------------------
 .rat-excludes                                   |  12 +-
 .../deploy/history/FsHistoryProvider.scala      | 124 ++-----------------
 .../spark/scheduler/EventLoggingListener.scala  |   2 -
 .../resources/spark-events/local-1422981759269  |  88 +++++++++++++
 .../local-1422981759269/APPLICATION_COMPLETE    |   0
 .../local-1422981759269/EVENT_LOG_1             |  88 -------------
 .../local-1422981759269/SPARK_VERSION_1.2.0     |   0
 .../resources/spark-events/local-1422981780767  |  82 ++++++++++++
 .../local-1422981780767/APPLICATION_COMPLETE    |   0
 .../local-1422981780767/EVENT_LOG_1             |  82 ------------
 .../local-1422981780767/SPARK_VERSION_1.2.0     |   0
 .../resources/spark-events/local-1425081759269  |  88 +++++++++++++
 .../local-1425081759269/APPLICATION_COMPLETE    |   0
 .../local-1425081759269/EVENT_LOG_1             |  88 -------------
 .../local-1425081759269/SPARK_VERSION_1.2.0     |   0
 .../resources/spark-events/local-1426533911241  |  24 ++++
 .../local-1426533911241/APPLICATION_COMPLETE    |   0
 .../local-1426533911241/EVENT_LOG_1             |  24 ----
 .../local-1426533911241/SPARK_VERSION_1.2.0     |   0
 .../resources/spark-events/local-1426633911242  |  24 ++++
 .../local-1426633911242/APPLICATION_COMPLETE    |   0
 .../local-1426633911242/EVENT_LOG_1             |  24 ----
 .../local-1426633911242/SPARK_VERSION_1.2.0     |   0
 .../deploy/history/FsHistoryProviderSuite.scala |  95 +-------------
 .../deploy/history/HistoryServerSuite.scala     |  25 +---
 25 files changed, 329 insertions(+), 541 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0df3cfb8/.rat-excludes
----------------------------------------------------------------------
diff --git a/.rat-excludes b/.rat-excludes
index 874a6ee..8b50614 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -73,12 +73,12 @@ logs
 .*dependency-reduced-pom.xml
 known_translations
 json_expectation
-local-1422981759269/*
-local-1422981780767/*
-local-1425081759269/*
-local-1426533911241/*
-local-1426633911242/*
-local-1430917381534/*
+local-1422981759269
+local-1422981780767
+local-1425081759269
+local-1426533911241
+local-1426633911242
+local-1430917381534
 local-1430917381535_1
 local-1430917381535_2
 DESCRIPTION

http://git-wip-us.apache.org/repos/asf/spark/blob/0df3cfb8/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 22e4155..9648959 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -248,9 +248,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       val logInfos: Seq[FileStatus] = statusList
         .filter { entry =>
           try {
-            getModificationTime(entry).map { time =>
-              time >= lastScanTime
-            }.getOrElse(false)
+            !entry.isDirectory() && (entry.getModificationTime() >= lastScanTime)
           } catch {
             case e: AccessControlException =>
               // Do not use "logInfo" since these messages can get pretty noisy if printed on
@@ -261,9 +259,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
         }
         .flatMap { entry => Some(entry) }
         .sortWith { case (entry1, entry2) =>
-          val mod1 = getModificationTime(entry1).getOrElse(-1L)
-          val mod2 = getModificationTime(entry2).getOrElse(-1L)
-          mod1 >= mod2
+          entry1.getModificationTime() >= entry2.getModificationTime()
       }
 
       logInfos.grouped(20)
@@ -341,19 +337,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
             attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
           }.foreach { attempt =>
             val logPath = new Path(logDir, attempt.logPath)
-            // If this is a legacy directory, then add the directory to the zipStream and add
-            // each file to that directory.
-            if (isLegacyLogDirectory(fs.getFileStatus(logPath))) {
-              val files = fs.listStatus(logPath)
-              zipStream.putNextEntry(new ZipEntry(attempt.logPath + "/"))
-              zipStream.closeEntry()
-              files.foreach { file =>
-                val path = file.getPath
-                zipFileToStream(path, attempt.logPath + Path.SEPARATOR + path.getName, zipStream)
-              }
-            } else {
-              zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream)
-            }
+            zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream)
           }
         } finally {
           zipStream.close()
@@ -527,12 +511,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
     val logPath = eventLog.getPath()
     logInfo(s"Replaying log path: $logPath")
-    val logInput =
-      if (isLegacyLogDirectory(eventLog)) {
-        openLegacyEventLog(logPath)
-      } else {
-        EventLoggingListener.openEventLog(logPath, fs)
-      }
+    val logInput = EventLoggingListener.openEventLog(logPath, fs)
     try {
       val appListener = new ApplicationEventListener
       val appCompleted = isApplicationCompleted(eventLog)
@@ -540,9 +519,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       bus.replay(logInput, logPath.toString, !appCompleted)
 
       // Without an app ID, new logs will render incorrectly in the listing page, so do not list or
-      // try to show their UI. Some old versions of Spark generate logs without an app ID, so let
-      // logs generated by those versions go through.
-      if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) {
+      // try to show their UI.
+      if (appListener.appId.isDefined) {
         Some(new FsApplicationAttemptInfo(
           logPath.getName(),
           appListener.appName.getOrElse(NOT_STARTED),
@@ -550,7 +528,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
           appListener.appAttemptId,
           appListener.startTime.getOrElse(-1L),
           appListener.endTime.getOrElse(-1L),
-          getModificationTime(eventLog).get,
+          eventLog.getModificationTime(),
           appListener.sparkUser.getOrElse(NOT_STARTED),
           appCompleted))
       } else {
@@ -562,90 +540,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   }
 
   /**
-   * Loads a legacy log directory. This assumes that the log directory contains a single event
-   * log file (along with other metadata files), which is the case for directories generated by
-   * the code in previous releases.
-   *
-   * @return input stream that holds one JSON record per line.
-   */
-  private[history] def openLegacyEventLog(dir: Path): InputStream = {
-    val children = fs.listStatus(dir)
-    var eventLogPath: Path = null
-    var codecName: Option[String] = None
-
-    children.foreach { child =>
-      child.getPath().getName() match {
-        case name if name.startsWith(LOG_PREFIX) =>
-          eventLogPath = child.getPath()
-        case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) =>
-          codecName = Some(codec.substring(COMPRESSION_CODEC_PREFIX.length()))
-        case _ =>
-      }
-    }
-
-    if (eventLogPath == null) {
-      throw new IllegalArgumentException(s"$dir is not a Spark application log directory.")
-    }
-
-    val codec = try {
-        codecName.map { c => CompressionCodec.createCodec(conf, c) }
-      } catch {
-        case e: Exception =>
-          throw new IllegalArgumentException(s"Unknown compression codec $codecName.")
-      }
-
-    val in = new BufferedInputStream(fs.open(eventLogPath))
-    codec.map(_.compressedInputStream(in)).getOrElse(in)
-  }
-
-  /**
-   * Return whether the specified event log path contains a old directory-based event log.
-   * Previously, the event log of an application comprises of multiple files in a directory.
-   * As of Spark 1.3, these files are consolidated into a single one that replaces the directory.
-   * See SPARK-2261 for more detail.
-   */
-  private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDirectory
-
-  /**
-   * Returns the modification time of the given event log. If the status points at an empty
-   * directory, `None` is returned, indicating that there isn't an event log at that location.
-   */
-  private def getModificationTime(fsEntry: FileStatus): Option[Long] = {
-    if (isLegacyLogDirectory(fsEntry)) {
-      val statusList = fs.listStatus(fsEntry.getPath)
-      if (!statusList.isEmpty) Some(statusList.map(_.getModificationTime()).max) else None
-    } else {
-      Some(fsEntry.getModificationTime())
-    }
-  }
-
-  /**
    * Return true when the application has completed.
    */
   private def isApplicationCompleted(entry: FileStatus): Boolean = {
-    if (isLegacyLogDirectory(entry)) {
-      fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
-    } else {
-      !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
-    }
-  }
-
-  /**
-   * Returns whether the version of Spark that generated logs records app IDs. App IDs were added
-   * in Spark 1.1.
-   */
-  private def sparkVersionHasAppId(entry: FileStatus): Boolean = {
-    if (isLegacyLogDirectory(entry)) {
-      fs.listStatus(entry.getPath())
-        .find { status => status.getPath().getName().startsWith(SPARK_VERSION_PREFIX) }
-        .map { status =>
-          val version = status.getPath().getName().substring(SPARK_VERSION_PREFIX.length())
-          version != "1.0" && version != "1.1"
-        }
-        .getOrElse(true)
-    } else {
-      true
-    }
+    !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
   }
 
   /**
@@ -670,12 +568,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
 
 private[history] object FsHistoryProvider {
   val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
-
-  // Constants used to parse Spark 1.0.0 log directories.
-  val LOG_PREFIX = "EVENT_LOG_"
-  val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
-  val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
-  val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
 }
 
 private class FsApplicationAttemptInfo(

http://git-wip-us.apache.org/repos/asf/spark/blob/0df3cfb8/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 36f2b74..01fee46 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -232,8 +232,6 @@ private[spark] object EventLoggingListener extends Logging {
   // Suffix applied to the names of files still being written by applications.
   val IN_PROGRESS = ".inprogress"
   val DEFAULT_LOG_DIR = "/tmp/spark-events"
-  val SPARK_VERSION_KEY = "SPARK_VERSION"
-  val COMPRESSION_CODEC_KEY = "COMPRESSION_CODEC"
 
   private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org