You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/08/09 06:35:34 UTC
spark git commit: [SPARK-21176][WEB UI] Use a single ProxyServlet to
proxy all workers and applications
Repository: spark
Updated Branches:
refs/heads/master f016f5c8f -> ae8a2b149
[SPARK-21176][WEB UI] Use a single ProxyServlet to proxy all workers and applications
## What changes were proposed in this pull request?
Currently, each application and each worker creates their own proxy servlet. Each proxy servlet is backed by its own HTTP client and a relatively large number of selector threads. This is excessive but was fixed (to an extent) by https://github.com/apache/spark/pull/18437.
However, a single HTTP client (backed by a single selector thread) should be enough to handle all proxy requests. This PR creates a single proxy servlet no matter how many applications and workers there are.
## How was this patch tested?
.
The unit tests for rewriting proxied locations and headers were updated. I then spun up a 100 node cluster to ensure that proxy'ing worked correctly
jiangxb1987 Please let me know if there's anything else I can do to help push this thru. Thanks!
Author: Anderson Osagie <os...@gmail.com>
Closes #18499 from aosagie/fix/minimize-proxy-threads.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae8a2b14
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae8a2b14
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae8a2b14
Branch: refs/heads/master
Commit: ae8a2b14966b1dfa10e620bb24ca6560778c20e7
Parents: f016f5c
Author: Anderson Osagie <os...@gmail.com>
Authored: Wed Aug 9 14:35:27 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Aug 9 14:35:27 2017 +0800
----------------------------------------------------------------------
.../org/apache/spark/deploy/master/Master.scala | 15 ++-----
.../spark/deploy/master/ui/MasterWebUI.scala | 21 +++++----
.../scala/org/apache/spark/ui/JettyUtils.scala | 45 +++++++++++---------
.../scala/org/apache/spark/ui/UISuite.scala | 20 ++++-----
4 files changed, 46 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ae8a2b14/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 4cc580e..e030cac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -133,6 +133,7 @@ private[deploy] class Master(
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
if (reverseProxy) {
masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
+ webUi.addProxy()
logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
s"Applications UIs are available at $masterWebUiUrl")
}
@@ -769,9 +770,6 @@ private[deploy] class Master(
workers += worker
idToWorker(worker.id) = worker
addressToWorker(workerAddress) = worker
- if (reverseProxy) {
- webUi.addProxyTargets(worker.id, worker.webUiAddress)
- }
true
}
@@ -780,9 +778,7 @@ private[deploy] class Master(
worker.setState(WorkerState.DEAD)
idToWorker -= worker.id
addressToWorker -= worker.endpoint.address
- if (reverseProxy) {
- webUi.removeProxyTargets(worker.id)
- }
+
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver.send(ExecutorUpdated(
@@ -844,9 +840,6 @@ private[deploy] class Master(
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
waitingApps += app
- if (reverseProxy) {
- webUi.addProxyTargets(app.id, app.desc.appUiUrl)
- }
}
private def finishApplication(app: ApplicationInfo) {
@@ -860,9 +853,7 @@ private[deploy] class Master(
idToApp -= app.id
endpointToApp -= app.driver
addressToApp -= app.driver.address
- if (reverseProxy) {
- webUi.removeProxyTargets(app.id)
- }
+
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach { a =>
http://git-wip-us.apache.org/repos/asf/spark/blob/ae8a2b14/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index e42f41b..35b7ddd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,10 +17,7 @@
package org.apache.spark.deploy.master.ui
-import scala.collection.mutable.HashMap
-
-import org.eclipse.jetty.servlet.ServletContextHandler
-
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.Logging
import org.apache.spark.ui.{SparkUI, WebUI}
@@ -38,7 +35,6 @@ class MasterWebUI(
val masterEndpointRef = master.self
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
- private val proxyHandlers = new HashMap[String, ServletContextHandler]
initialize()
@@ -54,16 +50,19 @@ class MasterWebUI(
"/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST")))
}
- def addProxyTargets(id: String, target: String): Unit = {
- val endTarget = target.stripSuffix("/")
- val handler = createProxyHandler("/proxy/" + id, endTarget)
+ def addProxy(): Unit = {
+ val handler = createProxyHandler(idToUiAddress)
attachHandler(handler)
- proxyHandlers(id) = handler
}
- def removeProxyTargets(id: String): Unit = {
- proxyHandlers.remove(id).foreach(detachHandler)
+ def idToUiAddress(id: String): Option[String] = {
+ val state = masterEndpointRef.askSync[MasterStateResponse](RequestMasterState)
+ val maybeWorkerUiAddress = state.workers.find(_.id == id).map(_.webUiAddress)
+ val maybeAppUiAddress = state.activeApps.find(_.id == id).map(_.desc.appUiUrl)
+
+ maybeWorkerUiAddress.orElse(maybeAppUiAddress)
}
+
}
private[master] object MasterWebUI {
http://git-wip-us.apache.org/repos/asf/spark/blob/ae8a2b14/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 0fa9671..880cf08 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -194,28 +194,32 @@ private[spark] object JettyUtils extends Logging {
}
/** Create a handler for proxying request to Workers and Application Drivers */
- def createProxyHandler(
- prefix: String,
- target: String): ServletContextHandler = {
+ def createProxyHandler(idToUiAddress: String => Option[String]): ServletContextHandler = {
val servlet = new ProxyServlet {
override def rewriteTarget(request: HttpServletRequest): String = {
- val rewrittenURI = createProxyURI(
- prefix, target, request.getRequestURI(), request.getQueryString())
- if (rewrittenURI == null) {
- return null
- }
- if (!validateDestination(rewrittenURI.getHost(), rewrittenURI.getPort())) {
- return null
+ val path = request.getPathInfo
+ if (path == null) return null
+
+ val prefixTrailingSlashIndex = path.indexOf('/', 1)
+ val prefix = if (prefixTrailingSlashIndex == -1) {
+ path
+ } else {
+ path.substring(0, prefixTrailingSlashIndex)
}
- rewrittenURI.toString()
+ val id = prefix.drop(1)
+
+ // Query master state for id's corresponding UI address
+ // If that address exists, turn it into a valid, target URI string or return null
+ idToUiAddress(id)
+ .map(createProxyURI(prefix, _, path, request.getQueryString))
+ .filter(uri => uri != null && validateDestination(uri.getHost, uri.getPort))
+ .map(_.toString)
+ .orNull
}
override def newHttpClient(): HttpClient = {
// SPARK-21176: Use the Jetty logic to calculate the number of selector threads (#CPUs/2),
// but limit it to 8 max.
- // Otherwise, it might happen that we exhaust the threadpool since in reverse proxy mode
- // a proxy is instantiated for each executor. If the head node has many processors, this
- // can quickly add up to an unreasonably high number of threads.
val numSelectors = math.max(1, math.min(8, Runtime.getRuntime().availableProcessors() / 2))
new HttpClient(new HttpClientTransportOverHTTP(numSelectors), null)
}
@@ -226,8 +230,8 @@ private[spark] object JettyUtils extends Logging {
headerName: String,
headerValue: String): String = {
if (headerName.equalsIgnoreCase("location")) {
- val newHeader = createProxyLocationHeader(
- prefix, headerValue, clientRequest, serverResponse.getRequest().getURI())
+ val newHeader = createProxyLocationHeader(headerValue, clientRequest,
+ serverResponse.getRequest().getURI())
if (newHeader != null) {
return newHeader
}
@@ -239,8 +243,8 @@ private[spark] object JettyUtils extends Logging {
val contextHandler = new ServletContextHandler
val holder = new ServletHolder(servlet)
- contextHandler.setContextPath(prefix)
- contextHandler.addServlet(holder, "/")
+ contextHandler.setContextPath("/proxy")
+ contextHandler.addServlet(holder, "/*")
contextHandler
}
@@ -438,7 +442,7 @@ private[spark] object JettyUtils extends Logging {
val rest = path.substring(prefix.length())
if (!rest.isEmpty()) {
- if (!rest.startsWith("/")) {
+ if (!rest.startsWith("/") && !uri.endsWith("/")) {
uri.append("/")
}
uri.append(rest)
@@ -458,14 +462,13 @@ private[spark] object JettyUtils extends Logging {
}
def createProxyLocationHeader(
- prefix: String,
headerValue: String,
clientRequest: HttpServletRequest,
targetUri: URI): String = {
val toReplace = targetUri.getScheme() + "://" + targetUri.getAuthority()
if (headerValue.startsWith(toReplace)) {
clientRequest.getScheme() + "://" + clientRequest.getHeader("host") +
- prefix + headerValue.substring(toReplace.length())
+ clientRequest.getPathInfo() + headerValue.substring(toReplace.length())
} else {
null
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ae8a2b14/core/src/test/scala/org/apache/spark/ui/UISuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 0c3d4ca..0428903 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -200,36 +200,34 @@ class UISuite extends SparkFunSuite {
}
test("verify proxy rewrittenURI") {
- val prefix = "/proxy/worker-id"
+ val prefix = "/worker-id"
val target = "http://localhost:8081"
- val path = "/proxy/worker-id/json"
+ val path = "/worker-id/json"
var rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, null)
assert(rewrittenURI.toString() === "http://localhost:8081/json")
rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, "test=done")
assert(rewrittenURI.toString() === "http://localhost:8081/json?test=done")
- rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id", null)
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-id", null)
assert(rewrittenURI.toString() === "http://localhost:8081")
- rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id/test%2F", null)
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-id/test%2F", null)
assert(rewrittenURI.toString() === "http://localhost:8081/test%2F")
- rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id/%F0%9F%98%84", null)
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-id/%F0%9F%98%84", null)
assert(rewrittenURI.toString() === "http://localhost:8081/%F0%9F%98%84")
- rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-noid/json", null)
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-noid/json", null)
assert(rewrittenURI === null)
}
test("verify rewriting location header for reverse proxy") {
val clientRequest = mock(classOf[HttpServletRequest])
var headerValue = "http://localhost:4040/jobs"
- val prefix = "/proxy/worker-id"
val targetUri = URI.create("http://localhost:4040")
when(clientRequest.getScheme()).thenReturn("http")
when(clientRequest.getHeader("host")).thenReturn("localhost:8080")
- var newHeader = JettyUtils.createProxyLocationHeader(
- prefix, headerValue, clientRequest, targetUri)
+ when(clientRequest.getPathInfo()).thenReturn("/proxy/worker-id")
+ var newHeader = JettyUtils.createProxyLocationHeader(headerValue, clientRequest, targetUri)
assert(newHeader.toString() === "http://localhost:8080/proxy/worker-id/jobs")
headerValue = "http://localhost:4041/jobs"
- newHeader = JettyUtils.createProxyLocationHeader(
- prefix, headerValue, clientRequest, targetUri)
+ newHeader = JettyUtils.createProxyLocationHeader(headerValue, clientRequest, targetUri)
assert(newHeader === null)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org