You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/10/17 09:53:20 UTC
git commit: [SPARK-3606] [yarn] Correctly configure AmIpFilter for
Yarn HA (1.1 vers...
Repository: spark
Updated Branches:
refs/heads/branch-1.1 35875e9ec -> 0d958f163
[SPARK-3606] [yarn] Correctly configure AmIpFilter for Yarn HA (1.1 vers...
...ion).
This is a backport of SPARK-3606 to branch-1.1. Some of the code had to be
duplicated since branch-1.1 doesn't have the cleanup work that was done to
the Yarn codebase.
I don't know whether the version issue in yarn/alpha/pom.xml was intentional,
but I couldn't compile the code without fixing it.
Author: Marcelo Vanzin <va...@cloudera.com>
Closes #2497 from vanzin/SPARK-3606-1.1 and squashes the following commits:
4fd3c27 [Marcelo Vanzin] Remove unused imports.
75cde8c [Marcelo Vanzin] Scala is weird.
b27ebda [Marcelo Vanzin] Review feedback.
72ceafb [Marcelo Vanzin] Undelete needed import.
61162a6 [Marcelo Vanzin] Use separate config for each param instead of json.
3b7205f [Marcelo Vanzin] Review feedback.
b3b3e50 [Marcelo Vanzin] [SPARK-3606] [yarn] Correctly configure AmIpFilter for Yarn HA (1.1 version).
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d958f16
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d958f16
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d958f16
Branch: refs/heads/branch-1.1
Commit: 0d958f163014e2b612ec445c80dfe69ff29d9f1a
Parents: 35875e9
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri Oct 17 00:53:15 2014 -0700
Committer: Andrew Or <an...@gmail.com>
Committed: Fri Oct 17 00:53:15 2014 -0700
----------------------------------------------------------------------
.../cluster/CoarseGrainedClusterMessage.scala | 2 +-
.../cluster/CoarseGrainedSchedulerBackend.scala | 12 +++--
.../scala/org/apache/spark/ui/JettyUtils.scala | 15 +++---
.../spark/deploy/yarn/ExecutorLauncher.scala | 2 +-
.../spark/deploy/yarn/ApplicationMaster.scala | 12 ++---
.../spark/deploy/yarn/ExecutorLauncher.scala | 6 +--
.../spark/deploy/yarn/YarnStableUtils.scala | 54 ++++++++++++++++++++
7 files changed, 76 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 6abf6d9..fb8160a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -66,7 +66,7 @@ private[spark] object CoarseGrainedClusterMessages {
case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
- case class AddWebUIFilter(filterName:String, filterParams: String, proxyBase :String)
+ case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase :String)
extends CoarseGrainedClusterMessage
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 04046e2..e8a3a3b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -51,12 +51,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
- // Submit tasks only after (registered resources / total expected resources)
+ // Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio =
math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
// Submit tasks after maxRegisteredWaitingTime milliseconds
- // if minRegisteredRatio has not yet been reached
+ // if minRegisteredRatio has not yet been reached
val maxRegisteredWaitingTime =
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
val createTime = System.currentTimeMillis()
@@ -283,15 +283,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
}
// Add filters to the SparkUI
- def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) {
+ def addWebUIFilter(filterName: String, filterParams: Map[String, String], proxyBase: String) {
if (proxyBase != null && proxyBase.nonEmpty) {
System.setProperty("spark.ui.proxyBase", proxyBase)
}
- if (Seq(filterName, filterParams).forall(t => t != null && t.nonEmpty)) {
+ val hasFilter = (filterName != null && filterName.nonEmpty &&
+ filterParams != null && filterParams.nonEmpty)
+ if (hasFilter) {
logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
conf.set("spark.ui.filters", filterName)
- conf.set(s"spark.$filterName.params", filterParams)
+ filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) }
scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 6b46892..6339012 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -21,9 +21,7 @@ import java.net.{InetSocketAddress, URL}
import javax.servlet.DispatcherType
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
-import scala.annotation.tailrec
import scala.language.implicitConversions
-import scala.util.{Failure, Success, Try}
import scala.xml.Node
import org.eclipse.jetty.server.Server
@@ -148,14 +146,19 @@ private[spark] object JettyUtils extends Logging {
holder.setClassName(filter)
// Get any parameters for each filter
val paramName = "spark." + filter + ".params"
- val params = conf.get(paramName, "").split(',').map(_.trim()).toSet
- params.foreach {
- case param : String =>
+ val params = conf.get(paramName, "").split(',').map(_.trim()).toSet.foreach {
+ param : String =>
if (!param.isEmpty) {
val parts = param.split("=")
if (parts.length == 2) holder.setInitParameter(parts(0), parts(1))
- }
+ }
}
+
+ val prefix = s"spark.$filter.param."
+ conf.getAll
+ .filter { case (k, v) => k.length() > prefix.length() && k.startsWith(prefix) }
+ .foreach { case (k, v) => holder.setInitParameter(k.substring(prefix.length()), v) }
+
val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR,
DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST)
handlers.foreach { case(handler) => handler.addFilter(holder, "/*", enumDispatcher) }
http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 155dd88..e9289aa 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -185,7 +185,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val parts = proxy.split(":")
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
val uriBase = "http://" + proxy + proxyBase
- val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+ val amFilter = Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e4d60c6..378304f 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
@@ -130,14 +129,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private def addAmIpFilter() {
val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
System.setProperty("spark.ui.filters", amFilter)
- val proxy = WebAppUtils.getProxyHostAndPort(conf)
- val parts : Array[String] = proxy.split(":")
- val uriBase = "http://" + proxy +
- System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
-
- val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
- System.setProperty(
- "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
+ val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+ val params = YarnStableUtils.getAmIpFilterParams(yarnConf, proxyBase)
+ params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
}
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index e093fe4..38e9f9c 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -33,7 +33,6 @@ import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils
/**
* An application master that allocates executors on behalf of a driver that is running outside
@@ -144,11 +143,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// add the yarn amIpFilter that Yarn requires for properly securing the UI
private def addAmIpFilter() {
- val proxy = WebAppUtils.getProxyHostAndPort(conf)
- val parts = proxy.split(":")
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
- val uriBase = "http://" + proxy + proxyBase
- val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+ val amFilter = YarnStableUtils.getAmIpFilterParams(yarnConf, proxyBase)
val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala
new file mode 100644
index 0000000..ea81faf
--- /dev/null
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils
+
+private[yarn] object YarnStableUtils {
+
+ def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] = {
+ // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2,
+ // so not all stable releases have it.
+ val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration])
+ .invoke(null, conf).asInstanceOf[String]).getOrElse("http://")
+
+ // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses.
+ try {
+ val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
+ classOf[Configuration])
+ val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
+ val hosts = proxies.map { proxy => proxy.split(":")(0) }
+ val uriBases = proxies.map { proxy => prefix + proxy + proxyBase }
+ Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
+ } catch {
+ case e: NoSuchMethodException =>
+ val proxy = WebAppUtils.getProxyHostAndPort(conf)
+ val parts = proxy.split(":")
+ val uriBase = prefix + proxy + proxyBase
+ Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org