You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/07/31 06:57:37 UTC

git commit: [SPARK-2340] Resolve event logging and History Server paths properly

Repository: spark
Updated Branches:
  refs/heads/master 118c1c422 -> a7c305b86


[SPARK-2340] Resolve event logging and History Server paths properly

We resolve relative paths to the local `file:/` system for `--jars` and `--files` in spark submit (#853). We should do the same for the history server.

Author: Andrew Or <an...@gmail.com>

Closes #1280 from andrewor14/hist-serv-fix and squashes the following commits:

13ff406 [Andrew Or] Merge branch 'master' of github.com:apache/spark into hist-serv-fix
b393e17 [Andrew Or] Strip trailing "/" from logging directory
622a471 [Andrew Or] Fix test in EventLoggingListenerSuite
0e20f71 [Andrew Or] Shift responsibility of resolving paths up one level
b037c0c [Andrew Or] Use resolved paths for everything in history server
c7e36ee [Andrew Or] Resolve paths for event logging too
40e3933 [Andrew Or] Resolve history server file paths


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a7c305b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a7c305b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a7c305b8

Branch: refs/heads/master
Commit: a7c305b86b3b83645ae5ff5d3dfeafc20c443204
Parents: 118c1c4
Author: Andrew Or <an...@gmail.com>
Authored: Wed Jul 30 21:57:32 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Jul 30 21:57:32 2014 -0700

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 34 +++++++++++---------
 .../spark/deploy/history/HistoryPage.scala      |  2 +-
 .../spark/deploy/history/HistoryServer.scala    |  6 ++--
 .../deploy/history/HistoryServerArguments.scala |  5 +--
 .../spark/scheduler/EventLoggingListener.scala  |  6 ++--
 .../org/apache/spark/util/FileLogger.scala      |  2 +-
 .../scheduler/EventLoggingListenerSuite.scala   |  2 +-
 7 files changed, 28 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a7c305b8/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 01e7065..6d2d4ce 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -36,11 +36,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
     conf.getInt("spark.history.updateInterval", 10)) * 1000
 
   private val logDir = conf.get("spark.history.fs.logDirectory", null)
-  if (logDir == null) {
-    throw new IllegalArgumentException("Logging directory must be specified.")
-  }
+  private val resolvedLogDir = Option(logDir)
+    .map { d => Utils.resolveURI(d) }
+    .getOrElse { throw new IllegalArgumentException("Logging directory must be specified.") }
 
-  private val fs = Utils.getHadoopFileSystem(logDir)
+  private val fs = Utils.getHadoopFileSystem(resolvedLogDir)
 
   // A timestamp of when the disk was last accessed to check for log updates
   private var lastLogCheckTimeMs = -1L
@@ -76,14 +76,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
 
   private def initialize() {
     // Validate the log directory.
-    val path = new Path(logDir)
+    val path = new Path(resolvedLogDir)
     if (!fs.exists(path)) {
       throw new IllegalArgumentException(
-        "Logging directory specified does not exist: %s".format(logDir))
+        "Logging directory specified does not exist: %s".format(resolvedLogDir))
     }
     if (!fs.getFileStatus(path).isDir) {
       throw new IllegalArgumentException(
-        "Logging directory specified is not a directory: %s".format(logDir))
+        "Logging directory specified is not a directory: %s".format(resolvedLogDir))
     }
 
     checkForLogs()
@@ -95,15 +95,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
 
   override def getAppUI(appId: String): SparkUI = {
     try {
-      val appLogDir = fs.getFileStatus(new Path(logDir, appId))
-      loadAppInfo(appLogDir, true)._2
+      val appLogDir = fs.getFileStatus(new Path(resolvedLogDir.toString, appId))
+      val (_, ui) = loadAppInfo(appLogDir, renderUI = true)
+      ui
     } catch {
       case e: FileNotFoundException => null
     }
   }
 
   override def getConfig(): Map[String, String] =
-    Map(("Event Log Location" -> logDir))
+    Map("Event Log Location" -> resolvedLogDir.toString)
 
   /**
    * Builds the application list based on the current contents of the log directory.
@@ -114,14 +115,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
     lastLogCheckTimeMs = getMonotonicTimeMs()
     logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
     try {
-      val logStatus = fs.listStatus(new Path(logDir))
+      val logStatus = fs.listStatus(new Path(resolvedLogDir))
       val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
-      val logInfos = logDirs.filter {
-        dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))
+      val logInfos = logDirs.filter { dir =>
+        fs.isFile(new Path(dir.getPath, EventLoggingListener.APPLICATION_COMPLETE))
       }
 
       val currentApps = Map[String, ApplicationHistoryInfo](
-        appList.map(app => (app.id -> app)):_*)
+        appList.map(app => app.id -> app):_*)
 
       // For any application that either (i) is not listed or (ii) has changed since the last time
       // the listing was created (defined by the log dir's modification time), load the app's info.
@@ -131,7 +132,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         val curr = currentApps.getOrElse(dir.getPath().getName(), null)
         if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
           try {
-            newApps += loadAppInfo(dir, false)._1
+            val (app, _) = loadAppInfo(dir, renderUI = false)
+            newApps += app
           } catch {
             case e: Exception => logError(s"Failed to load app info from directory $dir.")
           }
@@ -159,9 +161,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
    * @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
    */
   private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
-    val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs)
     val path = logDir.getPath
     val appId = path.getName
+    val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
     val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
     val appListener = new ApplicationEventListener
     replayBus.addListener(appListener)

http://git-wip-us.apache.org/repos/asf/spark/blob/a7c305b8/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index d7a3e3f..c4ef8b6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -45,7 +45,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
       <div class="row-fluid">
         <div class="span12">
           <ul class="unstyled">
-            { providerConfig.map(e => <li><strong>{e._1}:</strong> {e._2}</li>) }
+            {providerConfig.map { case (k, v) => <li><strong>{k}:</strong> {v}</li> }}
           </ul>
           {
             if (allApps.size > 0) {

http://git-wip-us.apache.org/repos/asf/spark/blob/a7c305b8/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index cacb9da..d1a64c1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -25,9 +25,9 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.ui.{WebUI, SparkUI, UIUtils}
+import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
 import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.{SignalLogger, Utils}
+import org.apache.spark.util.SignalLogger
 
 /**
  * A web server that renders SparkUIs of completed applications.
@@ -177,7 +177,7 @@ object HistoryServer extends Logging {
   def main(argStrings: Array[String]) {
     SignalLogger.register(log)
     initSecurity()
-    val args = new HistoryServerArguments(conf, argStrings)
+    new HistoryServerArguments(conf, argStrings)
     val securityManager = new SecurityManager(conf)
 
     val providerName = conf.getOption("spark.history.provider")

http://git-wip-us.apache.org/repos/asf/spark/blob/a7c305b8/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index be9361b..25fc76c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.deploy.history
 
 import org.apache.spark.SparkConf
-import org.apache.spark.util.Utils
 
 /**
  * Command-line parser for the master.
@@ -32,6 +31,7 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
     args match {
       case ("--dir" | "-d") :: value :: tail =>
         logDir = value
+        conf.set("spark.history.fs.logDirectory", value)
         parse(tail)
 
       case ("--help" | "-h") :: tail =>
@@ -42,9 +42,6 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
       case _ =>
         printUsageAndExit(1)
     }
-    if (logDir != null) {
-      conf.set("spark.history.fs.logDirectory", logDir)
-    }
   }
 
   private def printUsageAndExit(exitCode: Int) {

http://git-wip-us.apache.org/repos/asf/spark/blob/a7c305b8/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index ae6ca9f..406147f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -29,7 +29,7 @@ import org.json4s.jackson.JsonMethods._
 import org.apache.spark.{Logging, SparkConf, SparkContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io.CompressionCodec
-import org.apache.spark.util.{FileLogger, JsonProtocol}
+import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}
 
 /**
  * A SparkListener that logs events to persistent storage.
@@ -55,7 +55,7 @@ private[spark] class EventLoggingListener(
   private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
   private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
   private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
-  val logDir = logBaseDir + "/" + name
+  val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
 
   protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
     shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
@@ -215,7 +215,7 @@ private[spark] object EventLoggingListener extends Logging {
     } catch {
       case e: Exception =>
         logError("Exception in parsing logging info from directory %s".format(logDir), e)
-      EventLoggingInfo.empty
+        EventLoggingInfo.empty
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a7c305b8/core/src/main/scala/org/apache/spark/util/FileLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index 9dcdafd..2e8fbf5 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -52,7 +52,7 @@ private[spark] class FileLogger(
     override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
   }
 
-  private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
+  private val fileSystem = Utils.getHadoopFileSystem(logDir)
   var fileIndex = 0
 
   // Only used if compression is enabled

http://git-wip-us.apache.org/repos/asf/spark/blob/a7c305b8/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 21e3db3..10d8b29 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -259,7 +259,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
     assert(sc.eventLogger.isDefined)
     val eventLogger = sc.eventLogger.get
     val expectedLogDir = logDirPath.toString
-    assert(eventLogger.logDir.startsWith(expectedLogDir))
+    assert(eventLogger.logDir.contains(expectedLogDir))
 
     // Begin listening for events that trigger asserts
     val eventExistenceListener = new EventExistenceListener(eventLogger)