You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/09/12 19:40:17 UTC

git commit: Revert "[Spark-3490] Disable SparkUI for tests"

Repository: spark
Updated Branches:
  refs/heads/branch-1.1 e69deb818 -> f17b7957a


Revert "[Spark-3490] Disable SparkUI for tests"

This reverts commit 2ffc7980c6818eec05e32141c52e335bc71daed9.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f17b7957
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f17b7957
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f17b7957

Branch: refs/heads/branch-1.1
Commit: f17b7957a4283952021d9e4106c5bd9994148128
Parents: e69deb8
Author: Andrew Or <an...@gmail.com>
Authored: Fri Sep 12 10:40:03 2014 -0700
Committer: Andrew Or <an...@gmail.com>
Committed: Fri Sep 12 10:40:03 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  12 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |   2 +-
 .../cluster/SimrSchedulerBackend.scala          |   6 +-
 .../cluster/SparkDeploySchedulerBackend.scala   |   4 +-
 .../scala/org/apache/spark/ui/UISuite.scala     |  44 +-
 pom.xml                                         |   1 -
 project/SparkBuild.scala                        |   2 +-
 .../spark/streaming/StreamingContext.scala      |  11 +-
 .../spark/streaming/StreamingSource.scala       |   2 +-
 .../spark/streaming/ui/StreamingTab.scala       |  25 +-
 .../org/apache/spark/streaming/UISuite.scala    |  16 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   | 443 -------------------
 .../cluster/YarnClientSchedulerBackend.scala    |   6 +-
 13 files changed, 37 insertions(+), 537 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f17b7957/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index abb6a83..0470fbe 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -220,14 +220,8 @@ class SparkContext(config: SparkConf) extends Logging {
     new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
 
   // Initialize the Spark UI, registering all associated listeners
-  private[spark] val ui: Option[SparkUI] =
-    if (conf.getBoolean("spark.ui.enabled", true)) {
-      Some(new SparkUI(this))
-    } else {
-      // For tests, do not enable the UI
-      None
-    }
-  ui.foreach(_.bind())
+  private[spark] val ui = new SparkUI(this)
+  ui.bind()
 
   /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
   val hadoopConfiguration: Configuration = {
@@ -1014,7 +1008,7 @@ class SparkContext(config: SparkConf) extends Logging {
   /** Shut down the SparkContext. */
   def stop() {
     postApplicationEnd()
-    ui.foreach(_.stop())
+    ui.stop()
     // Do this only if not stopped already - best case effort.
     // prevent NPE if stopped more than once.
     val dagSchedulerCopy = dagScheduler

http://git-wip-us.apache.org/repos/asf/spark/blob/f17b7957/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 04046e2..2a3711a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -292,7 +292,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
       logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
       conf.set("spark.ui.filters", filterName)
       conf.set(s"spark.$filterName.params", filterParams)
-      scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
+      JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f17b7957/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index b781842..4f7133c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.scheduler.cluster
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, FileSystem}
 
 import org.apache.spark.{Logging, SparkContext, SparkEnv}
@@ -45,17 +46,16 @@ private[spark] class SimrSchedulerBackend(
 
     val conf = new Configuration()
     val fs = FileSystem.get(conf)
-    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
 
     logInfo("Writing to HDFS file: "  + driverFilePath)
     logInfo("Writing Akka address: "  + driverUrl)
-    logInfo("Writing Spark UI Address: " + appUIAddress)
+    logInfo("Writing Spark UI Address: " + sc.ui.appUIAddress)
 
     // Create temporary file to prevent race condition where executors get empty driverUrl file
     val temp = fs.create(tmpPath, true)
     temp.writeUTF(driverUrl)
     temp.writeInt(maxCores)
-    temp.writeUTF(appUIAddress)
+    temp.writeUTF(sc.ui.appUIAddress)
     temp.close()
 
     // "Atomic" rename

http://git-wip-us.apache.org/repos/asf/spark/blob/f17b7957/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index c1d5ce0..32138e5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -63,10 +63,8 @@ private[spark] class SparkDeploySchedulerBackend(
     val javaOpts = sparkJavaOpts ++ extraJavaOpts
     val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
       args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
-    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
-    val eventLogDir = sc.eventLogger.map(_.logDir)
     val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
-      appUIAddress, eventLogDir)
+      sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
 
     client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
     client.start()

http://git-wip-us.apache.org/repos/asf/spark/blob/f17b7957/core/src/test/scala/org/apache/spark/ui/UISuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 2f56642..038746d 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -36,25 +36,11 @@ import scala.xml.Node
 
 class UISuite extends FunSuite {
 
-  /**
-   * Create a test SparkContext with the SparkUI enabled.
-   * It is safe to `get` the SparkUI directly from the SparkContext returned here.
-   */
-  private def newSparkContext(): SparkContext = {
-    val conf = new SparkConf()
-      .setMaster("local")
-      .setAppName("test")
-      .set("spark.ui.enabled", "true")
-    val sc = new SparkContext(conf)
-    assert(sc.ui.isDefined)
-    sc
-  }
-
   ignore("basic ui visibility") {
-    withSpark(newSparkContext()) { sc =>
+    withSpark(new SparkContext("local", "test")) { sc =>
       // test if the ui is visible, and all the expected tabs are visible
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
-        val html = Source.fromURL(sc.ui.get.appUIAddress).mkString
+        val html = Source.fromURL(sc.ui.appUIAddress).mkString
         assert(!html.contains("random data that should not be present"))
         assert(html.toLowerCase.contains("stages"))
         assert(html.toLowerCase.contains("storage"))
@@ -65,7 +51,7 @@ class UISuite extends FunSuite {
   }
 
   ignore("visibility at localhost:4040") {
-    withSpark(newSparkContext()) { sc =>
+    withSpark(new SparkContext("local", "test")) { sc =>
       // test if visible from http://localhost:4040
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
         val html = Source.fromURL("http://localhost:4040").mkString
@@ -75,8 +61,8 @@ class UISuite extends FunSuite {
   }
 
   ignore("attaching a new tab") {
-    withSpark(newSparkContext()) { sc =>
-      val sparkUI = sc.ui.get
+    withSpark(new SparkContext("local", "test")) { sc =>
+      val sparkUI = sc.ui
 
       val newTab = new WebUITab(sparkUI, "foo") {
         attachPage(new WebUIPage("") {
@@ -87,7 +73,7 @@ class UISuite extends FunSuite {
       }
       sparkUI.attachTab(newTab)
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
-        val html = Source.fromURL(sparkUI.appUIAddress).mkString
+        val html = Source.fromURL(sc.ui.appUIAddress).mkString
         assert(!html.contains("random data that should not be present"))
 
         // check whether new page exists
@@ -101,7 +87,7 @@ class UISuite extends FunSuite {
       }
 
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
-        val html = Source.fromURL(sparkUI.appUIAddress.stripSuffix("/") + "/foo").mkString
+        val html = Source.fromURL(sc.ui.appUIAddress.stripSuffix("/") + "/foo").mkString
         // check whether new page exists
         assert(html.contains("magic"))
       }
@@ -143,20 +129,16 @@ class UISuite extends FunSuite {
   }
 
   test("verify appUIAddress contains the scheme") {
-    withSpark(newSparkContext()) { sc =>
-      val ui = sc.ui.get
-      val uiAddress = ui.appUIAddress
-      val uiHostPort = ui.appUIHostPort
-      assert(uiAddress.equals("http://" + uiHostPort))
+    withSpark(new SparkContext("local", "test")) { sc =>
+      val uiAddress = sc.ui.appUIAddress
+      assert(uiAddress.equals("http://" + sc.ui.appUIHostPort))
     }
   }
 
   test("verify appUIAddress contains the port") {
-    withSpark(newSparkContext()) { sc =>
-      val ui = sc.ui.get
-      val splitUIAddress = ui.appUIAddress.split(':')
-      val boundPort = ui.boundPort
-      assert(splitUIAddress(2).toInt == boundPort)
+    withSpark(new SparkContext("local", "test")) { sc =>
+      val splitUIAddress = sc.ui.appUIAddress.split(':')
+      assert(splitUIAddress(2).toInt == sc.ui.boundPort)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f17b7957/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c33ea7d..66458e2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -885,7 +885,6 @@
               <java.awt.headless>true</java.awt.headless>
               <spark.test.home>${session.executionRootDirectory}</spark.test.home>
               <spark.testing>1</spark.testing>
-              <spark.ui.enabled>false</spark.ui.enabled>
             </systemProperties>
           </configuration>
           <executions>

http://git-wip-us.apache.org/repos/asf/spark/blob/f17b7957/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c968a75..486de93 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -337,7 +337,7 @@ object TestSettings {
     javaOptions in Test += "-Dspark.test.home=" + sparkHome,
     javaOptions in Test += "-Dspark.testing=1",
     javaOptions in Test += "-Dspark.ports.maxRetries=100",
-    javaOptions in Test += "-Dspark.ui.enabled=false",
+    javaOptions in Test += "-Dspark.ui.port=0",
     javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
     javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")
       .map { case (k,v) => s"-D$k=$v" }.toSeq,

http://git-wip-us.apache.org/repos/asf/spark/blob/f17b7957/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 4fc77bb..101cec1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -37,7 +37,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver}
 import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
+import org.apache.spark.streaming.ui.StreamingTab
 import org.apache.spark.util.MetadataCleaner
 
 /**
@@ -158,14 +158,7 @@ class StreamingContext private[streaming] (
 
   private[streaming] val waiter = new ContextWaiter
 
-  private[streaming] val progressListener = new StreamingJobProgressListener(this)
-
-  private[streaming] val uiTab: Option[StreamingTab] =
-    if (conf.getBoolean("spark.ui.enabled", true)) {
-      Some(new StreamingTab(this))
-    } else {
-      None
-    }
+  private[streaming] val uiTab = new StreamingTab(this)
 
   /** Register streaming source to metrics system */
   private val streamingSource = new StreamingSource(this)

http://git-wip-us.apache.org/repos/asf/spark/blob/f17b7957/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
index e35a568..75f0e87 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
@@ -26,7 +26,7 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
   override val metricRegistry = new MetricRegistry
   override val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName)
 
-  private val streamingListener = ssc.progressListener
+  private val streamingListener = ssc.uiTab.listener
 
   private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
       defaultValue: T) {

http://git-wip-us.apache.org/repos/asf/spark/blob/f17b7957/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index d9d04cd..34ac254 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -17,31 +17,18 @@
 
 package org.apache.spark.streaming.ui
 
-import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.Logging
 import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.ui.{SparkUI, SparkUITab}
+import org.apache.spark.ui.SparkUITab
 
-import StreamingTab._
-
-/**
- * Spark Web UI tab that shows statistics of a streaming job.
- * This assumes the given SparkContext has enabled its SparkUI.
- */
+/** Spark Web UI tab that shows statistics of a streaming job */
 private[spark] class StreamingTab(ssc: StreamingContext)
-  extends SparkUITab(getSparkUI(ssc), "streaming") with Logging {
+  extends SparkUITab(ssc.sc.ui, "streaming") with Logging {
 
-  val parent = getSparkUI(ssc)
-  val listener = ssc.progressListener
+  val parent = ssc.sc.ui
+  val listener = new StreamingJobProgressListener(ssc)
 
   ssc.addStreamingListener(listener)
   attachPage(new StreamingPage(this))
   parent.attachTab(this)
 }
-
-private object StreamingTab {
-  def getSparkUI(ssc: StreamingContext): SparkUI = {
-    ssc.sc.ui.getOrElse {
-      throw new SparkException("Parent SparkUI to attach this tab to not found!")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f17b7957/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
index 4c7e43c..2a0db75 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
@@ -24,22 +24,13 @@ import org.scalatest.FunSuite
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.SparkConf
-
 class UISuite extends FunSuite {
 
   // Ignored: See SPARK-1530
   ignore("streaming tab in spark UI") {
-    val conf = new SparkConf()
-      .setMaster("local")
-      .setAppName("test")
-      .set("spark.ui.enabled", "true")
-    val ssc = new StreamingContext(conf, Seconds(1))
-    assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
-    val ui = ssc.sc.ui.get
-
+    val ssc = new StreamingContext("local", "test", Seconds(1))
     eventually(timeout(10 seconds), interval(50 milliseconds)) {
-      val html = Source.fromURL(ui.appUIAddress).mkString
+      val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString
       assert(!html.contains("random data that should not be present"))
       // test if streaming tab exist
       assert(html.toLowerCase.contains("streaming"))
@@ -48,7 +39,8 @@ class UISuite extends FunSuite {
     }
 
     eventually(timeout(10 seconds), interval(50 milliseconds)) {
-      val html = Source.fromURL(ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
+      val html = Source.fromURL(
+        ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
       assert(html.toLowerCase.contains("batch"))
       assert(html.toLowerCase.contains("network"))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/f17b7957/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
deleted file mode 100644
index 878b6db..0000000
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ /dev/null
@@ -1,443 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.IOException
-import java.net.Socket
-import java.util.concurrent.atomic.AtomicReference
-
-import scala.collection.JavaConversions._
-import scala.util.Try
-
-import akka.actor._
-import akka.remote._
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.history.HistoryServer
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
-import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
-
-/**
- * Common application master functionality for Spark on Yarn.
- */
-private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
-  client: YarnRMClient) extends Logging {
-  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
-  // optimal as more containers are available. Might need to handle this better.
-
-  private val sparkConf = new SparkConf()
-  private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf)
-    .asInstanceOf[YarnConfiguration]
-  private val isDriver = args.userClass != null
-
-  // Default to numExecutors * 2, with minimum of 3
-  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
-    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
-
-  @volatile private var finished = false
-  @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
-
-  private var reporterThread: Thread = _
-  private var allocator: YarnAllocator = _
-
-  // Fields used in client mode.
-  private var actorSystem: ActorSystem = null
-  private var actor: ActorRef = _
-
-  // Fields used in cluster mode.
-  private val sparkContextRef = new AtomicReference[SparkContext](null)
-
-  final def run(): Int = {
-    val appAttemptId = client.getAttemptId()
-
-    if (isDriver) {
-      // Set the web ui port to be ephemeral for yarn so we don't conflict with
-      // other spark processes running on the same box
-      System.setProperty("spark.ui.port", "0")
-
-      // Set the master property to match the requested mode.
-      System.setProperty("spark.master", "yarn-cluster")
-
-      // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
-      System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
-    }
-
-    logInfo("ApplicationAttemptId: " + appAttemptId)
-
-    val cleanupHook = new Runnable {
-      override def run() {
-        // If the SparkContext is still registered, shut it down as a best case effort in case
-        // users do not call sc.stop or do System.exit().
-        val sc = sparkContextRef.get()
-        if (sc != null) {
-          logInfo("Invoking sc stop from shutdown hook")
-          sc.stop()
-          finish(FinalApplicationStatus.SUCCEEDED)
-        }
-
-        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
-        // running the AM.
-        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
-        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
-        if (finished || isLastAttempt) {
-          cleanupStagingDir()
-        }
-      }
-    }
-    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
-    ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
-
-    // Call this to force generation of secret so it gets populated into the
-    // Hadoop UGI. This has to happen before the startUserClass which does a
-    // doAs in order for the credentials to be passed on to the executor containers.
-    val securityMgr = new SecurityManager(sparkConf)
-
-    if (isDriver) {
-      runDriver(securityMgr)
-    } else {
-      runExecutorLauncher(securityMgr)
-    }
-
-    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
-      finish(finalStatus)
-      0
-    } else {
-      1
-    }
-  }
-
-  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
-    if (!finished) {
-      logInfo(s"Finishing ApplicationMaster with $status"  +
-        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
-      finished = true
-      finalStatus = status
-      try {
-        if (Thread.currentThread() != reporterThread) {
-          reporterThread.interrupt()
-          reporterThread.join()
-        }
-      } finally {
-        client.shutdown(status, Option(diagnostics).getOrElse(""))
-      }
-    }
-  }
-
-  private def sparkContextInitialized(sc: SparkContext) = {
-    sparkContextRef.synchronized {
-      sparkContextRef.compareAndSet(null, sc)
-      sparkContextRef.notifyAll()
-    }
-  }
-
-  private def sparkContextStopped(sc: SparkContext) = {
-    sparkContextRef.compareAndSet(sc, null)
-  }
-
-  private def registerAM(uiAddress: String, securityMgr: SecurityManager) = {
-    val sc = sparkContextRef.get()
-
-    val appId = client.getAttemptId().getApplicationId().toString()
-    val historyAddress =
-      sparkConf.getOption("spark.yarn.historyServer.address")
-        .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" }
-        .getOrElse("")
-
-    allocator = client.register(yarnConf,
-      if (sc != null) sc.getConf else sparkConf,
-      if (sc != null) sc.preferredNodeLocationData else Map(),
-      uiAddress,
-      historyAddress,
-      securityMgr)
-
-    allocator.allocateResources()
-    reporterThread = launchReporterThread()
-  }
-
-  private def runDriver(securityMgr: SecurityManager): Unit = {
-    addAmIpFilter()
-    val userThread = startUserClass()
-
-    // This a bit hacky, but we need to wait until the spark.driver.port property has
-    // been set by the Thread executing the user class.
-    val sc = waitForSparkContextInitialized()
-
-    // If there is no SparkContext at this point, just fail the app.
-    if (sc == null) {
-      finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
-    } else {
-      registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
-      try {
-        userThread.join()
-      } finally {
-        // In cluster mode, ask the reporter thread to stop since the user app is finished.
-        reporterThread.interrupt()
-      }
-    }
-  }
-
-  private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
-    actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
-      conf = sparkConf, securityManager = securityMgr)._1
-    actor = waitForSparkDriver()
-    addAmIpFilter()
-    registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
-
-    // In client mode the actor will stop the reporter thread.
-    reporterThread.join()
-    finalStatus = FinalApplicationStatus.SUCCEEDED
-  }
-
-  private def launchReporterThread(): Thread = {
-    // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
-    val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
-    // we want to be reasonably responsive without causing too many requests to RM.
-    val schedulerInterval =
-      sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
-
-    // must be <= expiryInterval / 2.
-    val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
-
-    val t = new Thread {
-      override def run() {
-        while (!finished) {
-          checkNumExecutorsFailed()
-          if (!finished) {
-            logDebug("Sending progress")
-            allocator.allocateResources()
-            try {
-              Thread.sleep(interval)
-            } catch {
-              case e: InterruptedException =>
-            }
-          }
-        }
-      }
-    }
-    // setting to daemon status, though this is usually not a good idea.
-    t.setDaemon(true)
-    t.setName("Reporter")
-    t.start()
-    logInfo("Started progress reporter thread - sleep time : " + interval)
-    t
-  }
-
-  /**
-   * Clean up the staging directory.
-   */
-  private def cleanupStagingDir() {
-    val fs = FileSystem.get(yarnConf)
-    var stagingDirPath: Path = null
-    try {
-      val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
-      if (!preserveFiles) {
-        stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
-        if (stagingDirPath == null) {
-          logError("Staging directory is null")
-          return
-        }
-        logInfo("Deleting staging directory " + stagingDirPath)
-        fs.delete(stagingDirPath, true)
-      }
-    } catch {
-      case ioe: IOException =>
-        logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
-    }
-  }
-
-  private def waitForSparkContextInitialized(): SparkContext = {
-    logInfo("Waiting for spark context initialization")
-    try {
-      sparkContextRef.synchronized {
-        var count = 0
-        val waitTime = 10000L
-        val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
-        while (sparkContextRef.get() == null && count < numTries && !finished) {
-          logInfo("Waiting for spark context initialization ... " + count)
-          count = count + 1
-          sparkContextRef.wait(waitTime)
-        }
-
-        val sparkContext = sparkContextRef.get()
-        assert(sparkContext != null || count >= numTries)
-        if (sparkContext == null) {
-          logError(
-            "Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".format(
-              count * waitTime, numTries))
-        }
-        sparkContext
-      }
-    }
-  }
-
-  private def waitForSparkDriver(): ActorRef = {
-    logInfo("Waiting for Spark driver to be reachable.")
-    var driverUp = false
-    val hostport = args.userArgs(0)
-    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
-    while (!driverUp) {
-      try {
-        val socket = new Socket(driverHost, driverPort)
-        socket.close()
-        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
-        driverUp = true
-      } catch {
-        case e: Exception =>
-          logError("Failed to connect to driver at %s:%s, retrying ...".
-            format(driverHost, driverPort))
-          Thread.sleep(100)
-      }
-    }
-    sparkConf.set("spark.driver.host", driverHost)
-    sparkConf.set("spark.driver.port", driverPort.toString)
-
-    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
-      SparkEnv.driverActorSystemName,
-      driverHost,
-      driverPort.toString,
-      CoarseGrainedSchedulerBackend.ACTOR_NAME)
-    actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
-  }
-
-  private def checkNumExecutorsFailed() = {
-    if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
-      finish(FinalApplicationStatus.FAILED, "Max number of executor failures reached.")
-
-      val sc = sparkContextRef.get()
-      if (sc != null) {
-        logInfo("Invoking sc stop from checkNumExecutorsFailed")
-        sc.stop()
-      }
-    }
-  }
-
-  /** Add the Yarn IP filter that is required for properly securing the UI. */
-  private def addAmIpFilter() = {
-    val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
-    val proxy = client.getProxyHostAndPort(yarnConf)
-    val parts = proxy.split(":")
-    val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
-    val uriBase = "http://" + proxy + proxyBase
-    val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
-
-    if (isDriver) {
-      System.setProperty("spark.ui.filters", amFilter)
-      System.setProperty(s"spark.$amFilter.params", params)
-    } else {
-      actor ! AddWebUIFilter(amFilter, params, proxyBase)
-    }
-  }
-
-  private def startUserClass(): Thread = {
-    logInfo("Starting the user JAR in a separate Thread")
-    System.setProperty("spark.executor.instances", args.numExecutors.toString)
-    val mainMethod = Class.forName(args.userClass, false,
-      Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
-
-    val t = new Thread {
-      override def run() {
-        var status = FinalApplicationStatus.FAILED
-        try {
-          // Copy
-          val mainArgs = new Array[String](args.userArgs.size)
-          args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
-          mainMethod.invoke(null, mainArgs)
-          // Some apps have "System.exit(0)" at the end.  The user thread will stop here unless
-          // it has an uncaught exception thrown out.  It needs a shutdown hook to set SUCCEEDED.
-          status = FinalApplicationStatus.SUCCEEDED
-        } finally {
-          logDebug("Finishing main")
-        }
-        finalStatus = status
-      }
-    }
-    t.setName("Driver")
-    t.start()
-    t
-  }
-
-  // Actor used to monitor the driver when running in client deploy mode.
-  private class MonitorActor(driverUrl: String) extends Actor {
-
-    var driver: ActorSelection = _
-
-    override def preStart() = {
-      logInfo("Listen to driver: " + driverUrl)
-      driver = context.actorSelection(driverUrl)
-      // Send a hello message to establish the connection, after which
-      // we can monitor Lifecycle Events.
-      driver ! "Hello"
-      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
-    }
-
-    override def receive = {
-      case x: DisassociatedEvent =>
-        logInfo(s"Driver terminated or disconnected! Shutting down. $x")
-        finish(FinalApplicationStatus.SUCCEEDED)
-      case x: AddWebUIFilter =>
-        logInfo(s"Add WebUI Filter. $x")
-        driver ! x
-    }
-
-  }
-
-}
-
-object ApplicationMaster extends Logging {
-
-  private var master: ApplicationMaster = _
-
-  def main(args: Array[String]) = {
-    SignalLogger.register(log)
-    val amArgs = new ApplicationMasterArguments(args)
-    SparkHadoopUtil.get.runAsSparkUser { () =>
-      master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs))
-      System.exit(master.run())
-    }
-  }
-
-  private[spark] def sparkContextInitialized(sc: SparkContext) = {
-    master.sparkContextInitialized(sc)
-  }
-
-  private[spark] def sparkContextStopped(sc: SparkContext) = {
-    master.sparkContextStopped(sc)
-  }
-
-}
-
-/**
- * This object does not provide any special functionality. It exists so that it's easy to tell
- * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps.
- */
-object ExecutorLauncher {
-
-  def main(args: Array[String]) = {
-    ApplicationMaster.main(args)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f17b7957/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 40d9bff..833e249 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -55,10 +55,8 @@ private[spark] class YarnClientSchedulerBackend(
     val driverHost = conf.get("spark.driver.host")
     val driverPort = conf.get("spark.driver.port")
     val hostport = driverHost + ":" + driverPort
-    sc.ui.foreach { ui =>
-      conf.set("spark.driver.appUIAddress", ui.appUIHostPort)
-      conf.set("spark.driver.appUIHistoryAddress", YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf))
-    }
+    conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
+    conf.set("spark.driver.appUIHistoryAddress", YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf))
 
     val argsArrayBuf = new ArrayBuffer[String]()
     argsArrayBuf += (


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org