You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/02/02 01:55:33 UTC
[9/9] spark git commit: [SPARK-12790][CORE] Remove HistoryServer old
multiple files format
[SPARK-12790][CORE] Remove HistoryServer old multiple files format
Removed isLegacyLogDirectory code path and updated tests
andrewor14
Author: felixcheung <fe...@hotmail.com>
Closes #10860 from felixcheung/historyserverformat.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0df3cfb8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0df3cfb8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0df3cfb8
Branch: refs/heads/master
Commit: 0df3cfb8ab4d584c95db6c340694e199d7b59e9e
Parents: 715a19d
Author: felixcheung <fe...@hotmail.com>
Authored: Mon Feb 1 16:55:21 2016 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Mon Feb 1 16:55:21 2016 -0800
----------------------------------------------------------------------
.rat-excludes | 12 +-
.../deploy/history/FsHistoryProvider.scala | 124 ++-----------------
.../spark/scheduler/EventLoggingListener.scala | 2 -
.../resources/spark-events/local-1422981759269 | 88 +++++++++++++
.../local-1422981759269/APPLICATION_COMPLETE | 0
.../local-1422981759269/EVENT_LOG_1 | 88 -------------
.../local-1422981759269/SPARK_VERSION_1.2.0 | 0
.../resources/spark-events/local-1422981780767 | 82 ++++++++++++
.../local-1422981780767/APPLICATION_COMPLETE | 0
.../local-1422981780767/EVENT_LOG_1 | 82 ------------
.../local-1422981780767/SPARK_VERSION_1.2.0 | 0
.../resources/spark-events/local-1425081759269 | 88 +++++++++++++
.../local-1425081759269/APPLICATION_COMPLETE | 0
.../local-1425081759269/EVENT_LOG_1 | 88 -------------
.../local-1425081759269/SPARK_VERSION_1.2.0 | 0
.../resources/spark-events/local-1426533911241 | 24 ++++
.../local-1426533911241/APPLICATION_COMPLETE | 0
.../local-1426533911241/EVENT_LOG_1 | 24 ----
.../local-1426533911241/SPARK_VERSION_1.2.0 | 0
.../resources/spark-events/local-1426633911242 | 24 ++++
.../local-1426633911242/APPLICATION_COMPLETE | 0
.../local-1426633911242/EVENT_LOG_1 | 24 ----
.../local-1426633911242/SPARK_VERSION_1.2.0 | 0
.../deploy/history/FsHistoryProviderSuite.scala | 95 +-------------
.../deploy/history/HistoryServerSuite.scala | 25 +---
25 files changed, 329 insertions(+), 541 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0df3cfb8/.rat-excludes
----------------------------------------------------------------------
diff --git a/.rat-excludes b/.rat-excludes
index 874a6ee..8b50614 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -73,12 +73,12 @@ logs
.*dependency-reduced-pom.xml
known_translations
json_expectation
-local-1422981759269/*
-local-1422981780767/*
-local-1425081759269/*
-local-1426533911241/*
-local-1426633911242/*
-local-1430917381534/*
+local-1422981759269
+local-1422981780767
+local-1425081759269
+local-1426533911241
+local-1426633911242
+local-1430917381534
local-1430917381535_1
local-1430917381535_2
DESCRIPTION
http://git-wip-us.apache.org/repos/asf/spark/blob/0df3cfb8/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 22e4155..9648959 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -248,9 +248,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val logInfos: Seq[FileStatus] = statusList
.filter { entry =>
try {
- getModificationTime(entry).map { time =>
- time >= lastScanTime
- }.getOrElse(false)
+ !entry.isDirectory() && (entry.getModificationTime() >= lastScanTime)
} catch {
case e: AccessControlException =>
// Do not use "logInfo" since these messages can get pretty noisy if printed on
@@ -261,9 +259,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
.flatMap { entry => Some(entry) }
.sortWith { case (entry1, entry2) =>
- val mod1 = getModificationTime(entry1).getOrElse(-1L)
- val mod2 = getModificationTime(entry2).getOrElse(-1L)
- mod1 >= mod2
+ entry1.getModificationTime() >= entry2.getModificationTime()
}
logInfos.grouped(20)
@@ -341,19 +337,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
}.foreach { attempt =>
val logPath = new Path(logDir, attempt.logPath)
- // If this is a legacy directory, then add the directory to the zipStream and add
- // each file to that directory.
- if (isLegacyLogDirectory(fs.getFileStatus(logPath))) {
- val files = fs.listStatus(logPath)
- zipStream.putNextEntry(new ZipEntry(attempt.logPath + "/"))
- zipStream.closeEntry()
- files.foreach { file =>
- val path = file.getPath
- zipFileToStream(path, attempt.logPath + Path.SEPARATOR + path.getName, zipStream)
- }
- } else {
- zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream)
- }
+ zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream)
}
} finally {
zipStream.close()
@@ -527,12 +511,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
- val logInput =
- if (isLegacyLogDirectory(eventLog)) {
- openLegacyEventLog(logPath)
- } else {
- EventLoggingListener.openEventLog(logPath, fs)
- }
+ val logInput = EventLoggingListener.openEventLog(logPath, fs)
try {
val appListener = new ApplicationEventListener
val appCompleted = isApplicationCompleted(eventLog)
@@ -540,9 +519,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
bus.replay(logInput, logPath.toString, !appCompleted)
// Without an app ID, new logs will render incorrectly in the listing page, so do not list or
- // try to show their UI. Some old versions of Spark generate logs without an app ID, so let
- // logs generated by those versions go through.
- if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) {
+ // try to show their UI.
+ if (appListener.appId.isDefined) {
Some(new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
@@ -550,7 +528,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
- getModificationTime(eventLog).get,
+ eventLog.getModificationTime(),
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted))
} else {
@@ -562,90 +540,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
/**
- * Loads a legacy log directory. This assumes that the log directory contains a single event
- * log file (along with other metadata files), which is the case for directories generated by
- * the code in previous releases.
- *
- * @return input stream that holds one JSON record per line.
- */
- private[history] def openLegacyEventLog(dir: Path): InputStream = {
- val children = fs.listStatus(dir)
- var eventLogPath: Path = null
- var codecName: Option[String] = None
-
- children.foreach { child =>
- child.getPath().getName() match {
- case name if name.startsWith(LOG_PREFIX) =>
- eventLogPath = child.getPath()
- case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) =>
- codecName = Some(codec.substring(COMPRESSION_CODEC_PREFIX.length()))
- case _ =>
- }
- }
-
- if (eventLogPath == null) {
- throw new IllegalArgumentException(s"$dir is not a Spark application log directory.")
- }
-
- val codec = try {
- codecName.map { c => CompressionCodec.createCodec(conf, c) }
- } catch {
- case e: Exception =>
- throw new IllegalArgumentException(s"Unknown compression codec $codecName.")
- }
-
- val in = new BufferedInputStream(fs.open(eventLogPath))
- codec.map(_.compressedInputStream(in)).getOrElse(in)
- }
-
- /**
- * Return whether the specified event log path contains a old directory-based event log.
- * Previously, the event log of an application comprises of multiple files in a directory.
- * As of Spark 1.3, these files are consolidated into a single one that replaces the directory.
- * See SPARK-2261 for more detail.
- */
- private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDirectory
-
- /**
- * Returns the modification time of the given event log. If the status points at an empty
- * directory, `None` is returned, indicating that there isn't an event log at that location.
- */
- private def getModificationTime(fsEntry: FileStatus): Option[Long] = {
- if (isLegacyLogDirectory(fsEntry)) {
- val statusList = fs.listStatus(fsEntry.getPath)
- if (!statusList.isEmpty) Some(statusList.map(_.getModificationTime()).max) else None
- } else {
- Some(fsEntry.getModificationTime())
- }
- }
-
- /**
* Return true when the application has completed.
*/
private def isApplicationCompleted(entry: FileStatus): Boolean = {
- if (isLegacyLogDirectory(entry)) {
- fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
- } else {
- !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
- }
- }
-
- /**
- * Returns whether the version of Spark that generated logs records app IDs. App IDs were added
- * in Spark 1.1.
- */
- private def sparkVersionHasAppId(entry: FileStatus): Boolean = {
- if (isLegacyLogDirectory(entry)) {
- fs.listStatus(entry.getPath())
- .find { status => status.getPath().getName().startsWith(SPARK_VERSION_PREFIX) }
- .map { status =>
- val version = status.getPath().getName().substring(SPARK_VERSION_PREFIX.length())
- version != "1.0" && version != "1.1"
- }
- .getOrElse(true)
- } else {
- true
- }
+ !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
}
/**
@@ -670,12 +568,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private[history] object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
-
- // Constants used to parse Spark 1.0.0 log directories.
- val LOG_PREFIX = "EVENT_LOG_"
- val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
- val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
- val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
}
private class FsApplicationAttemptInfo(
http://git-wip-us.apache.org/repos/asf/spark/blob/0df3cfb8/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 36f2b74..01fee46 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -232,8 +232,6 @@ private[spark] object EventLoggingListener extends Logging {
// Suffix applied to the names of files still being written by applications.
val IN_PROGRESS = ".inprogress"
val DEFAULT_LOG_DIR = "/tmp/spark-events"
- val SPARK_VERSION_KEY = "SPARK_VERSION"
- val COMPRESSION_CODEC_KEY = "COMPRESSION_CODEC"
private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org