You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/12 22:46:42 UTC
spark git commit: [SPARK-14513][CORE] Fix threads left behind after
stopping SparkContext
Repository: spark
Updated Branches:
refs/heads/master bcd207627 -> 3e53de4bd
[SPARK-14513][CORE] Fix threads left behind after stopping SparkContext
## What changes were proposed in this pull request?
Shutting down `QueuedThreadPool` used by Jetty `Server` to avoid threads leakage after SparkContext is stopped.
Note: If this fix is going to apply to the `branch-1.6`, one more patch on the `NettyRpcEnv` class is needed so that the `NettyRpcEnv._fileServer.shutdown` is called in the `NettyRpcEnv.cleanup` method. This is due to the removal of `_fileServer` field in the `NettyRpcEnv` class in the master branch. Please advice if a second PR is necessary for bring this fix back to `branch-1.6`
## How was this patch tested?
Ran the ./dev/run-tests locally
Author: Terence Yim <te...@cask.co>
Closes #12318 from chtyim/fixes/SPARK-14513-thread-leak.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e53de4b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e53de4b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e53de4b
Branch: refs/heads/master
Commit: 3e53de4bdd6d7b6de1fe3e5bfbdc53180aa9a737
Parents: bcd2076
Author: Terence Yim <te...@cask.co>
Authored: Tue Apr 12 13:46:39 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Apr 12 13:46:39 2016 -0700
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/HttpServer.scala | 7 +++++++
.../main/scala/org/apache/spark/ui/JettyUtils.scala | 14 +++++++++++++-
core/src/main/scala/org/apache/spark/ui/WebUI.scala | 2 +-
3 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3e53de4b/core/src/main/scala/org/apache/spark/HttpServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index 9fad1f6..982b6d6 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -25,6 +25,7 @@ import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.bio.SocketConnector
import org.eclipse.jetty.server.ssl.SslSocketConnector
import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder}
+import org.eclipse.jetty.util.component.LifeCycle
import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.util.thread.QueuedThreadPool
@@ -155,6 +156,12 @@ private[spark] class HttpServer(
throw new ServerStateException("Server is already stopped")
} else {
server.stop()
+ // Stop the ThreadPool if it supports stop() method (through LifeCycle).
+ // It is needed because stopping the Server won't stop the ThreadPool it uses.
+ val threadPool = server.getThreadPool
+ if (threadPool != null && threadPool.isInstanceOf[LifeCycle]) {
+ threadPool.asInstanceOf[LifeCycle].stop
+ }
port = -1
server = null
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3e53de4b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index c3c59f8..119165f 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -30,6 +30,7 @@ import org.eclipse.jetty.server.handler._
import org.eclipse.jetty.server.nio.SelectChannelConnector
import org.eclipse.jetty.server.ssl.SslSelectChannelConnector
import org.eclipse.jetty.servlet._
+import org.eclipse.jetty.util.component.LifeCycle
import org.eclipse.jetty.util.thread.QueuedThreadPool
import org.json4s.JValue
import org.json4s.jackson.JsonMethods.{pretty, render}
@@ -350,4 +351,15 @@ private[spark] object JettyUtils extends Logging {
private[spark] case class ServerInfo(
server: Server,
boundPort: Int,
- rootHandler: ContextHandlerCollection)
+ rootHandler: ContextHandlerCollection) {
+
+ def stop(): Unit = {
+ server.stop()
+ // Stop the ThreadPool if it supports stop() method (through LifeCycle).
+ // It is needed because stopping the Server won't stop the ThreadPool it uses.
+ val threadPool = server.getThreadPool
+ if (threadPool != null && threadPool.isInstanceOf[LifeCycle]) {
+ threadPool.asInstanceOf[LifeCycle].stop
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/3e53de4b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 250b7f2..3939b11 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -153,7 +153,7 @@ private[spark] abstract class WebUI(
def stop() {
assert(serverInfo.isDefined,
"Attempted to stop %s before binding to a server!".format(className))
- serverInfo.get.server.stop()
+ serverInfo.get.stop()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org