You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/10/17 09:53:20 UTC

git commit: [SPARK-3606] [yarn] Correctly configure AmIpFilter for Yarn HA (1.1 vers...

Repository: spark
Updated Branches:
  refs/heads/branch-1.1 35875e9ec -> 0d958f163


[SPARK-3606] [yarn] Correctly configure AmIpFilter for Yarn HA (1.1 vers...

...ion).

This is a backport of SPARK-3606 to branch-1.1. Some of the code had to be
duplicated since branch-1.1 doesn't have the cleanup work that was done to
the Yarn codebase.

I don't know whether the version issue in yarn/alpha/pom.xml was intentional,
but I couldn't compile the code without fixing it.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #2497 from vanzin/SPARK-3606-1.1 and squashes the following commits:

4fd3c27 [Marcelo Vanzin] Remove unused imports.
75cde8c [Marcelo Vanzin] Scala is weird.
b27ebda [Marcelo Vanzin] Review feedback.
72ceafb [Marcelo Vanzin] Undelete needed import.
61162a6 [Marcelo Vanzin] Use separate config for each param instead of json.
3b7205f [Marcelo Vanzin] Review feedback.
b3b3e50 [Marcelo Vanzin] [SPARK-3606] [yarn] Correctly configure AmIpFilter for Yarn HA (1.1 version).


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d958f16
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d958f16
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d958f16

Branch: refs/heads/branch-1.1
Commit: 0d958f163014e2b612ec445c80dfe69ff29d9f1a
Parents: 35875e9
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri Oct 17 00:53:15 2014 -0700
Committer: Andrew Or <an...@gmail.com>
Committed: Fri Oct 17 00:53:15 2014 -0700

----------------------------------------------------------------------
 .../cluster/CoarseGrainedClusterMessage.scala   |  2 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala | 12 +++--
 .../scala/org/apache/spark/ui/JettyUtils.scala  | 15 +++---
 .../spark/deploy/yarn/ExecutorLauncher.scala    |  2 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   | 12 ++---
 .../spark/deploy/yarn/ExecutorLauncher.scala    |  6 +--
 .../spark/deploy/yarn/YarnStableUtils.scala     | 54 ++++++++++++++++++++
 7 files changed, 76 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 6abf6d9..fb8160a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -66,7 +66,7 @@ private[spark] object CoarseGrainedClusterMessages {
 
   case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
 
-  case class AddWebUIFilter(filterName:String, filterParams: String, proxyBase :String)
+  case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase :String)
     extends CoarseGrainedClusterMessage
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 04046e2..e8a3a3b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -51,12 +51,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
-  // Submit tasks only after (registered resources / total expected resources) 
+  // Submit tasks only after (registered resources / total expected resources)
   // is equal to at least this value, that is double between 0 and 1.
   var minRegisteredRatio =
     math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
   // Submit tasks after maxRegisteredWaitingTime milliseconds
-  // if minRegisteredRatio has not yet been reached  
+  // if minRegisteredRatio has not yet been reached
   val maxRegisteredWaitingTime =
     conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
   val createTime = System.currentTimeMillis()
@@ -283,15 +283,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
   }
 
   // Add filters to the SparkUI
-  def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) {
+  def addWebUIFilter(filterName: String, filterParams: Map[String, String], proxyBase: String) {
     if (proxyBase != null && proxyBase.nonEmpty) {
       System.setProperty("spark.ui.proxyBase", proxyBase)
     }
 
-    if (Seq(filterName, filterParams).forall(t => t != null && t.nonEmpty)) {
+    val hasFilter = (filterName != null && filterName.nonEmpty &&
+      filterParams != null && filterParams.nonEmpty)
+    if (hasFilter) {
       logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
       conf.set("spark.ui.filters", filterName)
-      conf.set(s"spark.$filterName.params", filterParams)
+      filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) }
       scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 6b46892..6339012 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -21,9 +21,7 @@ import java.net.{InetSocketAddress, URL}
 import javax.servlet.DispatcherType
 import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
 
-import scala.annotation.tailrec
 import scala.language.implicitConversions
-import scala.util.{Failure, Success, Try}
 import scala.xml.Node
 
 import org.eclipse.jetty.server.Server
@@ -148,14 +146,19 @@ private[spark] object JettyUtils extends Logging {
           holder.setClassName(filter)
           // Get any parameters for each filter
           val paramName = "spark." + filter + ".params"
-          val params = conf.get(paramName, "").split(',').map(_.trim()).toSet
-          params.foreach {
-            case param : String =>
+          val params = conf.get(paramName, "").split(',').map(_.trim()).toSet.foreach {
+            param : String =>
               if (!param.isEmpty) {
                 val parts = param.split("=")
                 if (parts.length == 2) holder.setInitParameter(parts(0), parts(1))
-             }
+              }
           }
+
+          val prefix = s"spark.$filter.param."
+          conf.getAll
+            .filter { case (k, v) => k.length() > prefix.length() && k.startsWith(prefix) }
+            .foreach { case (k, v) => holder.setInitParameter(k.substring(prefix.length()), v) }
+
           val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR,
             DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST)
           handlers.foreach { case(handler) => handler.addFilter(holder, "/*", enumDispatcher) }

http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 155dd88..e9289aa 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -185,7 +185,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     val parts = proxy.split(":")
     val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
     val uriBase = "http://" + proxy + proxyBase
-    val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+    val amFilter = Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
     val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
     actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e4d60c6..378304f 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -130,14 +129,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
   private def addAmIpFilter() {
     val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
     System.setProperty("spark.ui.filters", amFilter)
-    val proxy = WebAppUtils.getProxyHostAndPort(conf)
-    val parts : Array[String] = proxy.split(":")
-    val uriBase = "http://" + proxy +
-      System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
-
-    val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
-    System.setProperty(
-      "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
+    val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+    val params = YarnStableUtils.getAmIpFilterParams(yarnConf, proxyBase)
+    params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
   }
 
   private def registerApplicationMaster(): RegisterApplicationMasterResponse = {

http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index e093fe4..38e9f9c 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -33,7 +33,6 @@ import org.apache.spark.scheduler.SplitInfo
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils
 
 /**
  * An application master that allocates executors on behalf of a driver that is running outside
@@ -144,11 +143,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
 
   // add the yarn amIpFilter that Yarn requires for properly securing the UI
   private def addAmIpFilter() {
-    val proxy = WebAppUtils.getProxyHostAndPort(conf)
-    val parts = proxy.split(":")
     val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
-    val uriBase = "http://" + proxy + proxyBase
-    val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+    val amFilter = YarnStableUtils.getAmIpFilterParams(yarnConf, proxyBase)
     val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
     actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala
new file mode 100644
index 0000000..ea81faf
--- /dev/null
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils
+
+private[yarn] object YarnStableUtils {
+
+  def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] = {
+    // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2,
+    // so not all stable releases have it.
+    val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration])
+        .invoke(null, conf).asInstanceOf[String]).getOrElse("http://")
+
+    // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses.
+    try {
+      val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
+        classOf[Configuration])
+      val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
+      val hosts = proxies.map { proxy => proxy.split(":")(0) }
+      val uriBases = proxies.map { proxy => prefix + proxy + proxyBase }
+      Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
+    } catch {
+      case e: NoSuchMethodException =>
+        val proxy = WebAppUtils.getProxyHostAndPort(conf)
+        val parts = proxy.split(":")
+        val uriBase = prefix + proxy + proxyBase
+        Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org