You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/02/06 20:13:11 UTC

spark git commit: SPARK-2450 Adds executor log links to Web UI

Repository: spark
Updated Branches:
  refs/heads/master 4cdb26c17 -> 32e964c41


SPARK-2450 Adds executor log links to Web UI

Adds links to stderr/stdout in the executor tab of the webUI for:
1) Standalone
2) Yarn client
3) Yarn cluster

This tries to add the log url support in a general way so as to make it easy to add support for all the
cluster managers. This is done by using environment variables to pass to the executor the log urls. The
SPARK_LOG_URL_ prefix is used and so additional logs besides stderr/stdout can also be added.

To propagate this information to the UI we use the onExecutorAdded spark listener event.

Although this commit doesn't add log urls when running on a mesos cluster, it should be possible to add using the same mechanism.

Author: Kostas Sakellis <ko...@cloudera.com>
Author: Josh Rosen <jo...@databricks.com>

Closes #3486 from ksakellis/kostas-spark-2450 and squashes the following commits:

d190936 [Josh Rosen] Fix a few minor style / formatting nits. Reset listener after each test Don't null listener out at end of main().
8673fe1 [Kostas Sakellis] CR feedback. Hide the log column if there are no logs available
5bf6952 [Kostas Sakellis] [SPARK-2450] [CORE] Adds exeuctor log links to Web UI


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32e964c4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32e964c4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32e964c4

Branch: refs/heads/master
Commit: 32e964c410e7083b43264c46291e93cd206a8038
Parents: 4cdb26c
Author: Kostas Sakellis <ko...@cloudera.com>
Authored: Fri Feb 6 11:13:00 2015 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Fri Feb 6 11:13:00 2015 -0800

----------------------------------------------------------------------
 .../spark/deploy/worker/ExecutorRunner.scala    |  7 +++
 .../org/apache/spark/deploy/worker/Worker.scala |  1 +
 .../executor/CoarseGrainedExecutorBackend.scala |  8 ++-
 .../cluster/CoarseGrainedClusterMessage.scala   |  6 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |  4 +-
 .../spark/scheduler/cluster/ExecutorData.scala  |  5 +-
 .../spark/scheduler/cluster/ExecutorInfo.scala  |  9 +--
 .../cluster/mesos/MesosSchedulerBackend.scala   |  3 +-
 .../apache/spark/ui/exec/ExecutorsPage.scala    | 31 ++++++++--
 .../org/apache/spark/ui/exec/ExecutorsTab.scala |  6 ++
 .../org/apache/spark/util/JsonProtocol.scala    |  6 +-
 .../apache/spark/deploy/JsonProtocolSuite.scala |  2 +-
 .../spark/deploy/LogUrlsStandaloneSuite.scala   | 59 ++++++++++++++++++++
 .../deploy/worker/ExecutorRunnerTest.scala      |  2 +-
 .../mesos/MesosSchedulerBackendSuite.scala      |  4 +-
 .../apache/spark/util/JsonProtocolSuite.scala   | 12 +++-
 .../spark/deploy/yarn/ExecutorRunnable.scala    | 12 +++-
 .../spark/deploy/yarn/YarnClusterSuite.scala    | 31 +++++++++-
 18 files changed, 178 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index bc9f78b..0add306 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -43,6 +43,7 @@ private[spark] class ExecutorRunner(
     val worker: ActorRef,
     val workerId: String,
     val host: String,
+    val webUiPort: Int,
     val sparkHome: File,
     val executorDir: File,
     val workerUrl: String,
@@ -134,6 +135,12 @@ private[spark] class ExecutorRunner(
       // In case we are running this from within the Spark Shell, avoid creating a "scala"
       // parent process for the executor command
       builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
+
+      // Add webUI log urls
+      val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
+      builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
+      builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
+
       process = builder.start()
       val header = "Spark Executor Command: %s\n%s\n\n".format(
         command.mkString("\"", "\" \"", "\""), "=" * 40)

http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index b20f5c0..10929eb 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -362,6 +362,7 @@ private[spark] class Worker(
             self,
             workerId,
             host,
+            webUiPort,
             sparkHome,
             executorDir,
             akkaUrl,

http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index bc72c89..3a42f8b 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -49,10 +49,16 @@ private[spark] class CoarseGrainedExecutorBackend(
   override def preStart() {
     logInfo("Connecting to driver: " + driverUrl)
     driver = context.actorSelection(driverUrl)
-    driver ! RegisterExecutor(executorId, hostPort, cores)
+    driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
     context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
   }
 
+  def extractLogUrls: Map[String, String] = {
+    val prefix = "SPARK_LOG_URL_"
+    sys.env.filterKeys(_.startsWith(prefix))
+      .map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
+  }
+
   override def receiveWithLogging = {
     case RegisteredExecutor =>
       logInfo("Successfully registered with driver")

http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 1da6fe9..9bf74f4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -39,7 +39,11 @@ private[spark] object CoarseGrainedClusterMessages {
   case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
 
   // Executors to driver
-  case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
+  case class RegisterExecutor(
+      executorId: String,
+      hostPort: String,
+      cores: Int,
+      logUrls: Map[String, String])
     extends CoarseGrainedClusterMessage {
     Utils.checkHostPort(hostPort, "Expected host port")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 103a5c0..9d2fb4f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -86,7 +86,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
     }
 
     def receiveWithLogging = {
-      case RegisterExecutor(executorId, hostPort, cores) =>
+      case RegisterExecutor(executorId, hostPort, cores, logUrls) =>
         Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
         if (executorDataMap.contains(executorId)) {
           sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
@@ -98,7 +98,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
           totalCoreCount.addAndGet(cores)
           totalRegisteredExecutors.addAndGet(1)
           val (host, _) = Utils.parseHostPort(hostPort)
-          val data = new ExecutorData(sender, sender.path.address, host, cores, cores)
+          val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)
           // This must be synchronized because variables mutated
           // in this block are read when requesting executors
           CoarseGrainedSchedulerBackend.this.synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
index eb52ddf..5e571ef 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
@@ -33,5 +33,6 @@ private[cluster] class ExecutorData(
    val executorAddress: Address,
    override val executorHost: String,
    var freeCores: Int,
-   override val totalCores: Int
-) extends ExecutorInfo(executorHost, totalCores)
+   override val totalCores: Int,
+   override val logUrlMap: Map[String, String]
+) extends ExecutorInfo(executorHost, totalCores, logUrlMap)

http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
index b4738e6..7f21856 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
@@ -25,8 +25,8 @@ import org.apache.spark.annotation.DeveloperApi
 @DeveloperApi
 class ExecutorInfo(
    val executorHost: String,
-   val totalCores: Int
-) {
+   val totalCores: Int,
+   val logUrlMap: Map[String, String]) {
 
   def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
 
@@ -34,12 +34,13 @@ class ExecutorInfo(
     case that: ExecutorInfo =>
       (that canEqual this) &&
         executorHost == that.executorHost &&
-        totalCores == that.totalCores
+        totalCores == that.totalCores &&
+        logUrlMap == that.logUrlMap
     case _ => false
   }
 
   override def hashCode(): Int = {
-    val state = Seq(executorHost, totalCores)
+    val state = Seq(executorHost, totalCores, logUrlMap)
     state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index c3c546b..cfb6592 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -270,7 +270,8 @@ private[spark] class MesosSchedulerBackend(
       mesosTasks.foreach { case (slaveId, tasks) =>
         slaveIdToWorkerOffer.get(slaveId).foreach(o =>
           listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
-            new ExecutorInfo(o.host, o.cores)))
+            // TODO: Add support for log urls for Mesos
+            new ExecutorInfo(o.host, o.cores, Map.empty)))
         )
         d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 363cb96..956608d 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -26,7 +26,8 @@ import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
 import org.apache.spark.util.Utils
 
 /** Summary information about an executor to display in the UI. */
-private case class ExecutorSummaryInfo(
+// Needs to be private[ui] because of a false positive MiMa failure.
+private[ui] case class ExecutorSummaryInfo(
     id: String,
     hostPort: String,
     rddBlocks: Int,
@@ -40,7 +41,8 @@ private case class ExecutorSummaryInfo(
     totalInputBytes: Long,
     totalShuffleRead: Long,
     totalShuffleWrite: Long,
-    maxMemory: Long)
+    maxMemory: Long,
+    executorLogs: Map[String, String])
 
 private[ui] class ExecutorsPage(
     parent: ExecutorsTab,
@@ -55,6 +57,7 @@ private[ui] class ExecutorsPage(
     val diskUsed = storageStatusList.map(_.diskUsed).sum
     val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
     val execInfoSorted = execInfo.sortBy(_.id)
+    val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty
 
     val execTable =
       <table class={UIUtils.TABLE_CLASS_STRIPED}>
@@ -79,10 +82,11 @@ private[ui] class ExecutorsPage(
               Shuffle Write
             </span>
           </th>
+          {if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty}
           {if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
         </thead>
         <tbody>
-          {execInfoSorted.map(execRow)}
+          {execInfoSorted.map(execRow(_, logsExist))}
         </tbody>
       </table>
 
@@ -107,7 +111,7 @@ private[ui] class ExecutorsPage(
   }
 
   /** Render an HTML row representing an executor */
-  private def execRow(info: ExecutorSummaryInfo): Seq[Node] = {
+  private def execRow(info: ExecutorSummaryInfo, logsExist: Boolean): Seq[Node] = {
     val maximumMemory = info.maxMemory
     val memoryUsed = info.memoryUsed
     val diskUsed = info.diskUsed
@@ -139,6 +143,21 @@ private[ui] class ExecutorsPage(
         {Utils.bytesToString(info.totalShuffleWrite)}
       </td>
       {
+        if (logsExist) {
+          <td>
+            {
+              info.executorLogs.map { case (logName, logUrl) =>
+                <div>
+                  <a href={logUrl}>
+                    {logName}
+                  </a>
+                </div>
+              }
+            }
+          </td>
+        }
+      }
+      {
         if (threadDumpEnabled) {
           val encodedId = URLEncoder.encode(info.id, "UTF-8")
           <td>
@@ -168,6 +187,7 @@ private[ui] class ExecutorsPage(
     val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
     val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
     val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
+    val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)
 
     new ExecutorSummaryInfo(
       execId,
@@ -183,7 +203,8 @@ private[ui] class ExecutorsPage(
       totalInputBytes,
       totalShuffleRead,
       totalShuffleWrite,
-      maxMem
+      maxMem,
+      executorLogs
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index dd1c2b7..a38cb75 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -51,9 +51,15 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
   val executorToOutputBytes = HashMap[String, Long]()
   val executorToShuffleRead = HashMap[String, Long]()
   val executorToShuffleWrite = HashMap[String, Long]()
+  val executorToLogUrls = HashMap[String, Map[String, String]]()
 
   def storageStatusList = storageStatusListener.storageStatusList
 
+  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) = synchronized {
+    val eid = executorAdded.executorId
+    executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
+  }
+
   override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
     val eid = taskStart.taskInfo.executorId
     executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1

http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 8e0e41a..c8407bb 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -383,7 +383,8 @@ private[spark] object JsonProtocol {
 
   def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
     ("Host" -> executorInfo.executorHost) ~
-    ("Total Cores" -> executorInfo.totalCores)
+    ("Total Cores" -> executorInfo.totalCores) ~
+    ("Log Urls" -> mapToJson(executorInfo.logUrlMap))
   }
 
   /** ------------------------------ *
@@ -792,7 +793,8 @@ private[spark] object JsonProtocol {
   def executorInfoFromJson(json: JValue): ExecutorInfo = {
     val executorHost = (json \ "Host").extract[String]
     val totalCores = (json \ "Total Cores").extract[Int]
-    new ExecutorInfo(executorHost, totalCores)
+    val logUrls = mapFromJson(json \ "Log Urls").toMap
+    new ExecutorInfo(executorHost, totalCores, logUrls)
   }
 
   /** -------------------------------- *

http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index aa65f7e..ed02ca8 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -117,7 +117,7 @@ class JsonProtocolSuite extends FunSuite {
   }
 
   def createExecutorRunner(): ExecutorRunner = {
-    new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
+    new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
       new File("sparkHome"), new File("workDir"), "akka://worker",
       new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
new file mode 100644
index 0000000..f33bdc7
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import scala.collection.mutable
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
+import org.apache.spark.{SparkContext, LocalSparkContext}
+
+class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
+
+  /** Length of time to wait while draining listener events. */
+  val WAIT_TIMEOUT_MILLIS = 10000
+
+  before {
+    sc = new SparkContext("local-cluster[2,1,512]", "test")
+  }
+
+  test("verify log urls get propagated from workers") {
+    val listener = new SaveExecutorInfo
+    sc.addSparkListener(listener)
+
+    val rdd1 = sc.parallelize(1 to 100, 4)
+    val rdd2 = rdd1.map(_.toString)
+    rdd2.setName("Target RDD")
+    rdd2.count()
+
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    listener.addedExecutorInfos.values.foreach { info =>
+      assert(info.logUrlMap.nonEmpty)
+    }
+  }
+
+  private class SaveExecutorInfo extends SparkListener {
+    val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
+
+    override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
+      addedExecutorInfos(executor.executorId) = executor.executorInfo
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 6f233d7..7651169 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -32,7 +32,7 @@ class ExecutorRunnerTest extends FunSuite {
     val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
     val appDesc = new ApplicationDescription("app name", Some(8), 500,
       Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
-    val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
+    val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123,
       new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
       ExecutorState.RUNNING)
     val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)

http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
index f2ff98e..46ab02b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
@@ -43,7 +43,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
     conf.set("spark.mesos.executor.home" , "/mesos-home")
 
     val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
-    listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
+    listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
     EasyMock.replay(listenerBus)
 
     val sc = EasyMock.createMock(classOf[SparkContext])
@@ -88,7 +88,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
     val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
 
     val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
-    listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
+    listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
     EasyMock.replay(listenerBus)
 
     val sc = EasyMock.createMock(classOf[SparkContext])

http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 6577eba..842f545 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -76,8 +76,9 @@ class JsonProtocolSuite extends FunSuite {
     val unpersistRdd = SparkListenerUnpersistRDD(12345)
     val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
     val applicationEnd = SparkListenerApplicationEnd(42L)
+    val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
     val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
-      new ExecutorInfo("Hostee.awesome.com", 11))
+      new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap))
     val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
 
     testEvent(stageSubmitted, stageSubmittedJsonString)
@@ -100,13 +101,14 @@ class JsonProtocolSuite extends FunSuite {
   }
 
   test("Dependent Classes") {
+    val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
     testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
     testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
     testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
     testTaskMetrics(makeTaskMetrics(
       33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false))
     testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
-    testExecutorInfo(new ExecutorInfo("host", 43))
+    testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap))
 
     // StorageLevel
     testStorageLevel(StorageLevel.NONE)
@@ -1463,7 +1465,11 @@ class JsonProtocolSuite extends FunSuite {
       |  "Executor ID": "exec1",
       |  "Executor Info": {
       |    "Host": "Hostee.awesome.com",
-      |    "Total Cores": 11
+      |    "Total Cores": 11,
+      |    "Log Urls" : {
+      |      "stderr" : "mystderr",
+      |      "stdout" : "mystdout"
+      |    }
       |  }
       |}
     """

http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index ee2002a..408cf09 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -56,7 +56,7 @@ class ExecutorRunnable(
   var rpc: YarnRPC = YarnRPC.create(conf)
   var nmClient: NMClient = _
   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-  lazy val env = prepareEnvironment
+  lazy val env = prepareEnvironment(container)
   
   def run = {
     logInfo("Starting Executor Container")
@@ -254,7 +254,7 @@ class ExecutorRunnable(
     localResources
   }
 
-  private def prepareEnvironment: HashMap[String, String] = {
+  private def prepareEnvironment(container: Container): HashMap[String, String] = {
     val env = new HashMap[String, String]()
     val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
     Client.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
@@ -270,6 +270,14 @@ class ExecutorRunnable(
       YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
     }
 
+    // Add log urls
+    sys.env.get("SPARK_USER").foreach { user =>
+      val baseUrl = "http://%s/node/containerlogs/%s/%s"
+        .format(container.getNodeHttpAddress, ConverterUtils.toString(container.getId), user)
+      env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=0"
+      env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=0"
+    }
+
     System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v }
     env
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/32e964c4/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 7165918..eda40ef 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -21,16 +21,17 @@ import java.io.File
 import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConversions._
+import scala.collection.mutable
 
 import com.google.common.base.Charsets
 import com.google.common.io.Files
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.server.MiniYARNCluster
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
 import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
 import org.apache.spark.util.Utils
 
 class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging {
@@ -143,6 +144,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
     var result = File.createTempFile("result", null, tempDir)
     YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
     checkResult(result)
+
+    // verify log urls are present
+    YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
+      assert(info.logUrlMap.nonEmpty)
+    }
   }
 
   test("run Spark in yarn-cluster mode") {
@@ -156,6 +162,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
       "--num-executors", "1")
     Client.main(args)
     checkResult(result)
+
+    // verify log urls are present.
+    YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
+      assert(info.logUrlMap.nonEmpty)
+    }
   }
 
   test("run Spark in yarn-cluster mode unsuccessfully") {
@@ -203,8 +214,19 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
 
 }
 
+private class SaveExecutorInfo extends SparkListener {
+  val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
+
+  override def onExecutorAdded(executor : SparkListenerExecutorAdded) {
+    addedExecutorInfos(executor.executorId) = executor.executorInfo
+  }
+}
+
 private object YarnClusterDriver extends Logging with Matchers {
 
+  val WAIT_TIMEOUT_MILLIS = 10000
+  var listener: SaveExecutorInfo = null
+
   def main(args: Array[String]) = {
     if (args.length != 2) {
       System.err.println(
@@ -216,12 +238,15 @@ private object YarnClusterDriver extends Logging with Matchers {
       System.exit(1)
     }
 
+    listener = new SaveExecutorInfo
     val sc = new SparkContext(new SparkConf().setMaster(args(0))
       .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
+    sc.addSparkListener(listener)
     val status = new File(args(1))
     var result = "failure"
     try {
       val data = sc.parallelize(1 to 4, 4).collect().toSet
+      assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
       data should be (Set(1, 2, 3, 4))
       result = "success"
     } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org