You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2014/07/15 20:53:03 UTC
git commit: SPARK-1291: Link the spark UI to RM ui in yarn-client mode
Repository: spark
Updated Branches:
refs/heads/master 9dd635eb5 -> 72ea56da8
SPARK-1291: Link the spark UI to RM ui in yarn-client mode
Author: witgo <wi...@qq.com>
Closes #1112 from witgo/SPARK-1291 and squashes the following commits:
6022bcd [witgo] review commit
1fbb925 [witgo] add addAmIpFilter to yarn alpha
210299c [witgo] review commit
1b92a07 [witgo] review commit
6896586 [witgo] Add comments to addWebUIFilter
3e9630b [witgo] review commit
142ee29 [witgo] review commit
1fe7710 [witgo] Link the spark UI to RM ui in yarn-client mode
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72ea56da
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72ea56da
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72ea56da
Branch: refs/heads/master
Commit: 72ea56da8e383c61c6f18eeefef03b9af00f5158
Parents: 9dd635e
Author: witgo <wi...@qq.com>
Authored: Tue Jul 15 13:52:56 2014 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Tue Jul 15 13:52:56 2014 -0500
----------------------------------------------------------------------
.../cluster/CoarseGrainedClusterMessage.scala | 3 +++
.../cluster/CoarseGrainedSchedulerBackend.scala | 18 +++++++++++++++
.../scala/org/apache/spark/ui/UIUtils.scala | 11 +++++++++-
.../spark/deploy/yarn/ExecutorLauncher.scala | 22 ++++++++++++++++---
.../cluster/YarnClientSchedulerBackend.scala | 1 +
.../spark/deploy/yarn/ExecutorLauncher.scala | 23 +++++++++++++++++---
6 files changed, 71 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/72ea56da/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 318e165..6abf6d9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -66,4 +66,7 @@ private[spark] object CoarseGrainedClusterMessages {
case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
+ case class AddWebUIFilter(filterName:String, filterParams: String, proxyBase :String)
+ extends CoarseGrainedClusterMessage
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/72ea56da/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 0f5545e..9f085ee 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -31,6 +31,7 @@ import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
+import org.apache.spark.ui.JettyUtils
/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -136,6 +137,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
removeExecutor(executorId, reason)
sender ! true
+ case AddWebUIFilter(filterName, filterParams, proxyBase) =>
+ addWebUIFilter(filterName, filterParams, proxyBase)
+ sender ! true
case DisassociatedEvent(_, address, _) =>
addressToExecutorId.get(address).foreach(removeExecutor(_,
"remote Akka client disassociated"))
@@ -276,6 +280,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
}
false
}
+
+ // Add filters to the SparkUI
+ def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) {
+ if (proxyBase != null && proxyBase.nonEmpty) {
+ System.setProperty("spark.ui.proxyBase", proxyBase)
+ }
+
+ if (Seq(filterName, filterParams).forall(t => t != null && t.nonEmpty)) {
+ logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
+ conf.set("spark.ui.filters", filterName)
+ conf.set(s"spark.$filterName.params", filterParams)
+ JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf)
+ }
+ }
}
private[spark] object CoarseGrainedSchedulerBackend {
http://git-wip-us.apache.org/repos/asf/spark/blob/72ea56da/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 9cb50d9..e07aa2e 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -136,7 +136,16 @@ private[spark] object UIUtils extends Logging {
}
// Yarn has to go through a proxy so the base uri is provided and has to be on all links
- val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("")
+ def uiRoot: String = {
+ if (System.getenv("APPLICATION_WEB_PROXY_BASE") != null) {
+ System.getenv("APPLICATION_WEB_PROXY_BASE")
+ } else if (System.getProperty("spark.ui.proxyBase") != null) {
+ System.getProperty("spark.ui.proxyBase")
+ }
+ else {
+ ""
+ }
+ }
def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource
http://git-wip-us.apache.org/repos/asf/spark/blob/72ea56da/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index bfdb623..a86ad25 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -32,6 +32,7 @@ import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.deploy.SparkHadoopUtil
@@ -81,6 +82,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
case x: DisassociatedEvent =>
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
driverClosed = true
+ case x: AddWebUIFilter =>
+ logInfo(s"Add WebUI Filter. $x")
+ driver ! x
}
}
@@ -111,7 +115,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}
waitForSparkMaster()
-
+ addAmIpFilter()
// Allocate all containers
allocateExecutors()
@@ -171,7 +175,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
- logInfo("Registering the ApplicationMaster")
+ val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
+ logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
.asInstanceOf[RegisterApplicationMasterRequest]
appMasterRequest.setApplicationAttemptId(appAttemptId)
@@ -180,10 +185,21 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
appMasterRequest.setHost(Utils.localHostName())
appMasterRequest.setRpcPort(0)
// What do we provide here ? Might make sense to expose something sensible later ?
- appMasterRequest.setTrackingUrl("")
+ appMasterRequest.setTrackingUrl(appUIAddress)
resourceManager.registerApplicationMaster(appMasterRequest)
}
+ // add the yarn amIpFilter that Yarn requires for properly securing the UI
+ private def addAmIpFilter() {
+ val proxy = YarnConfiguration.getProxyHostAndPort(conf)
+ val parts = proxy.split(":")
+ val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+ val uriBase = "http://" + proxy + proxyBase
+ val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+ val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+ actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
+ }
+
private def waitForSparkMaster() {
logInfo("Waiting for spark driver to be reachable.")
var driverUp = false
http://git-wip-us.apache.org/repos/asf/spark/blob/72ea56da/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 0f9fdcf..1b37c4b 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -48,6 +48,7 @@ private[spark] class YarnClientSchedulerBackend(
val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
+ conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
http://git-wip-us.apache.org/repos/asf/spark/blob/72ea56da/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index f71ad03..5ac95f3 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -31,10 +31,12 @@ import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils
/**
* An application master that allocates executors on behalf of a driver that is running outside
@@ -82,6 +84,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
case x: DisassociatedEvent =>
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
driverClosed = true
+ case x: AddWebUIFilter =>
+ logInfo(s"Add WebUI Filter. $x")
+ driver ! x
}
}
@@ -99,6 +104,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
registerApplicationMaster()
waitForSparkMaster()
+ addAmIpFilter()
// Allocate all containers
allocateExecutors()
@@ -142,9 +148,20 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
- logInfo("Registering the ApplicationMaster")
- // TODO: Find out client's Spark UI address and fill in here?
- amClient.registerApplicationMaster(Utils.localHostName(), 0, "")
+ val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
+ logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
+ amClient.registerApplicationMaster(Utils.localHostName(), 0, appUIAddress)
+ }
+
+ // add the yarn amIpFilter that Yarn requires for properly securing the UI
+ private def addAmIpFilter() {
+ val proxy = WebAppUtils.getProxyHostAndPort(conf)
+ val parts = proxy.split(":")
+ val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+ val uriBase = "http://" + proxy + proxyBase
+ val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+ val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+ actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
}
private def waitForSparkMaster() {