You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/02/22 22:38:05 UTC

spark git commit: [SPARK-19554][UI, YARN] Allow SHS URL to be used for tracking in YARN RM.

Repository: spark
Updated Branches:
  refs/heads/master 37112fcfc -> 4661d30b9


[SPARK-19554][UI,YARN] Allow SHS URL to be used for tracking in YARN RM.

Allow an application to use the History Server URL as the tracking
URL in the YARN RM, so there's still a link to the web UI somewhere
in YARN even if the driver's UI is disabled. This is useful, for
example, if an admin wants to disable the driver UI by default for
applications, since it's harder to secure it (since it involves non
trivial ssl certificate and auth management that admins may not want
to expose to user apps).

This needs to be opt-in, because of the way the YARN proxy works, so
a new configuration was added to enable the option.

The YARN RM will proxy requests to live AMs instead of redirecting
the client, so pages in the SHS UI will not render correctly since
they'll reference invalid paths in the RM UI. The proxy base support
in the SHS cannot be used since that would prevent direct access to
the SHS.

So, to solve this problem, for the feature to work end-to-end, a new
YARN-specific filter was added that detects whether the requests come
from the proxy and redirects the client appropriatly. The SHS admin has
to add this filter manually if they want the feature to work.

Tested with new unit test, and by running with the documented configuration
set in a test cluster. Also verified the driver UI is used when it's
enabled.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #16946 from vanzin/SPARK-19554.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4661d30b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4661d30b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4661d30b

Branch: refs/heads/master
Commit: 4661d30b988bf773ab45a15b143efb2908d33743
Parents: 37112fc
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Wed Feb 22 14:37:53 2017 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Feb 22 14:37:53 2017 -0800

----------------------------------------------------------------------
 docs/running-on-yarn.md                         | 15 ++++
 .../spark/deploy/yarn/ApplicationMaster.scala   |  7 +-
 .../deploy/yarn/YarnProxyRedirectFilter.scala   | 81 ++++++++++++++++++++
 .../apache/spark/deploy/yarn/YarnRMClient.scala |  8 +-
 .../org/apache/spark/deploy/yarn/config.scala   |  7 ++
 .../yarn/YarnProxyRedirectFilterSuite.scala     | 55 +++++++++++++
 6 files changed, 167 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4661d30b/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index cf95b95..e9ddaa7 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -604,3 +604,18 @@ spark.yarn.am.extraJavaOptions -Dsun.security.krb5.debug=true -Dsun.security.spn
 
 Finally, if the log level for `org.apache.spark.deploy.yarn.Client` is set to `DEBUG`, the log
 will include a list of all tokens obtained, and their expiry details
+
+## Using the Spark History Server to replace the Spark Web UI
+
+It is possible to use the Spark History Server application page as the tracking URL for running
+applications when the application UI is disabled. This may be desirable on secure clusters, or to
+reduce the memory usage of the Spark driver. To set up tracking through the Spark History Server,
+do the following:
+
+- On the application side, set <code>spark.yarn.historyServer.allowTracking=true</code> in Spark's
+  configuration. This will tell Spark to use the history server's URL as the tracking URL if
+  the application's UI is disabled.
+- On the Spark History Server, add <code>org.apache.spark.deploy.yarn.YarnProxyRedirectFilter</code>
+  to the list of filters in the <code>spark.ui.filters</code> configuration.
+
+Be aware that the history server information may not be up-to-date with the application's state.

http://git-wip-us.apache.org/repos/asf/spark/blob/4661d30b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 9df43ae..864c834 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -332,7 +332,7 @@ private[spark] class ApplicationMaster(
       _sparkConf: SparkConf,
       _rpcEnv: RpcEnv,
       driverRef: RpcEndpointRef,
-      uiAddress: String,
+      uiAddress: Option[String],
       securityMgr: SecurityManager) = {
     val appId = client.getAttemptId().getApplicationId().toString()
     val attemptId = client.getAttemptId().getAttemptId().toString()
@@ -408,8 +408,7 @@ private[spark] class ApplicationMaster(
           sc.getConf.get("spark.driver.host"),
           sc.getConf.get("spark.driver.port"),
           isClusterMode = true)
-        registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl).getOrElse(""),
-          securityMgr)
+        registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
       } else {
         // Sanity check; should never happen in normal operation, since sc should only be null
         // if the user app did not create a SparkContext.
@@ -435,7 +434,7 @@ private[spark] class ApplicationMaster(
       clientMode = true)
     val driverRef = waitForSparkDriver()
     addAmIpFilter()
-    registerAM(sparkConf, rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""),
+    registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"),
       securityMgr)
 
     // In client mode the actor will stop the reporter thread.

http://git-wip-us.apache.org/repos/asf/spark/blob/4661d30b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilter.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilter.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilter.scala
new file mode 100644
index 0000000..ae625df
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilter.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import javax.servlet._
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.apache.spark.internal.Logging
+
+/**
+ * A filter to be used in the Spark History Server for redirecting YARN proxy requests to the
+ * main SHS address. This is useful for applications that are using the history server as the
+ * tracking URL, since the SHS-generated pages cannot be rendered in that case without extra
+ * configuration to set up a proxy base URI (meaning the SHS cannot be ever used directly).
+ */
+class YarnProxyRedirectFilter extends Filter with Logging {
+
+  import YarnProxyRedirectFilter._
+
+  override def destroy(): Unit = { }
+
+  override def init(config: FilterConfig): Unit = { }
+
+  override def doFilter(req: ServletRequest, res: ServletResponse, chain: FilterChain): Unit = {
+    val hreq = req.asInstanceOf[HttpServletRequest]
+
+    // The YARN proxy will send a request with the "proxy-user" cookie set to the YARN's client
+    // user name. We don't expect any other clients to set this cookie, since the SHS does not
+    // use cookies for anything.
+    Option(hreq.getCookies()).flatMap(_.find(_.getName() == COOKIE_NAME)) match {
+      case Some(_) =>
+        doRedirect(hreq, res.asInstanceOf[HttpServletResponse])
+
+      case _ =>
+        chain.doFilter(req, res)
+    }
+  }
+
+  private def doRedirect(req: HttpServletRequest, res: HttpServletResponse): Unit = {
+    val redirect = req.getRequestURL().toString()
+
+    // Need a client-side redirect instead of an HTTP one, otherwise the YARN proxy itself
+    // will handle the redirect and get into an infinite loop.
+    val content = s"""
+      |<html xmlns="http://www.w3.org/1999/xhtml">
+      |<head>
+      |  <title>Spark History Server Redirect</title>
+      |  <meta http-equiv="refresh" content="0;URL='$redirect'" />
+      |</head>
+      |<body>
+      |  <p>The requested page can be found at: <a href="$redirect">$redirect</a>.</p>
+      |</body>
+      |</html>
+      """.stripMargin
+
+    logDebug(s"Redirecting YARN proxy request to $redirect.")
+    res.setStatus(HttpServletResponse.SC_OK)
+    res.setContentType("text/html")
+    res.getWriter().write(content)
+  }
+
+}
+
+private[spark] object YarnProxyRedirectFilter {
+  val COOKIE_NAME = "proxy-user"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4661d30b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 163dfb5..53fb467 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -55,7 +55,7 @@ private[spark] class YarnRMClient extends Logging {
       driverRef: RpcEndpointRef,
       conf: YarnConfiguration,
       sparkConf: SparkConf,
-      uiAddress: String,
+      uiAddress: Option[String],
       uiHistoryAddress: String,
       securityMgr: SecurityManager,
       localResources: Map[String, LocalResource]
@@ -65,9 +65,13 @@ private[spark] class YarnRMClient extends Logging {
     amClient.start()
     this.uiHistoryAddress = uiHistoryAddress
 
+    val trackingUrl = uiAddress.getOrElse {
+      if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else ""
+    }
+
     logInfo("Registering the ApplicationMaster")
     synchronized {
-      amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
+      amClient.registerApplicationMaster(Utils.localHostName(), 0, trackingUrl)
       registered = true
     }
     new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,

http://git-wip-us.apache.org/repos/asf/spark/blob/4661d30b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index f19a5b2..d8c96c3 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -82,6 +82,13 @@ package object config {
     .stringConf
     .createOptional
 
+  private[spark] val ALLOW_HISTORY_SERVER_TRACKING_URL =
+    ConfigBuilder("spark.yarn.historyServer.allowTracking")
+      .doc("Allow using the History Server URL for the application as the tracking URL for the " +
+        "application when the Web UI is not enabled.")
+      .booleanConf
+      .createWithDefault(false)
+
   /* File distribution. */
 
   private[spark] val SPARK_ARCHIVE = ConfigBuilder("spark.yarn.archive")

http://git-wip-us.apache.org/repos/asf/spark/blob/4661d30b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilterSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilterSuite.scala
new file mode 100644
index 0000000..54dbe9d
--- /dev/null
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilterSuite.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.{PrintWriter, StringWriter}
+import javax.servlet.FilterChain
+import javax.servlet.http.{Cookie, HttpServletRequest, HttpServletResponse}
+
+import org.mockito.Mockito._
+
+import org.apache.spark.SparkFunSuite
+
+class YarnProxyRedirectFilterSuite extends SparkFunSuite {
+
+  test("redirect proxied requests, pass-through others") {
+    val requestURL = "http://example.com:1234/foo?"
+    val filter = new YarnProxyRedirectFilter()
+    val cookies = Array(new Cookie(YarnProxyRedirectFilter.COOKIE_NAME, "dr.who"))
+
+    val req = mock(classOf[HttpServletRequest])
+
+    // First request mocks a YARN proxy request (with the cookie set), second one has no cookies.
+    when(req.getCookies()).thenReturn(cookies, null)
+    when(req.getRequestURL()).thenReturn(new StringBuffer(requestURL))
+
+    val res = mock(classOf[HttpServletResponse])
+    when(res.getWriter()).thenReturn(new PrintWriter(new StringWriter()))
+
+    val chain = mock(classOf[FilterChain])
+
+    // First request is proxied.
+    filter.doFilter(req, res, chain)
+    verify(chain, never()).doFilter(req, res)
+
+    // Second request is not, so should invoke the filter chain.
+    filter.doFilter(req, res, chain)
+    verify(chain, times(1)).doFilter(req, res)
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org