You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/27 00:00:37 UTC

git commit: [SPARK-2704] Name threads in ConnectionManager and mark them as daemon.

Repository: spark
Updated Branches:
  refs/heads/master c183b92c3 -> 12901643b


[SPARK-2704] Name threads in ConnectionManager and mark them as daemon.

handleMessageExecutor, handleReadWriteExecutor, and handleConnectExecutor are not marked as daemon and not named. I think there exists some condition in which Spark programs won't terminate because of this.

Stack dump attached in https://issues.apache.org/jira/browse/SPARK-2704

Author: Reynold Xin <rx...@apache.org>

Closes #1604 from rxin/daemon and squashes the following commits:

98d6a6c [Reynold Xin] [SPARK-2704] Name threads in ConnectionManager and mark them as daemon.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12901643
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12901643
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12901643

Branch: refs/heads/master
Commit: 12901643b7e808aa75cf0b19e2d0c3d40b1a978d
Parents: c183b92
Author: Reynold Xin <rx...@apache.org>
Authored: Sat Jul 26 15:00:32 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Sat Jul 26 15:00:32 2014 -0700

----------------------------------------------------------------------
 .../spark/network/ConnectionManager.scala       |  9 ++++---
 .../scala/org/apache/spark/util/Utils.scala     | 27 ++++++++++++--------
 2 files changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/12901643/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 8a1cdb8..566e8a4 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -62,13 +62,15 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
     conf.getInt("spark.core.connection.handler.threads.min", 20),
     conf.getInt("spark.core.connection.handler.threads.max", 60),
     conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS,
-    new LinkedBlockingDeque[Runnable]())
+    new LinkedBlockingDeque[Runnable](),
+    Utils.namedThreadFactory("handle-message-executor"))
 
   private val handleReadWriteExecutor = new ThreadPoolExecutor(
     conf.getInt("spark.core.connection.io.threads.min", 4),
     conf.getInt("spark.core.connection.io.threads.max", 32),
     conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
-    new LinkedBlockingDeque[Runnable]())
+    new LinkedBlockingDeque[Runnable](),
+    Utils.namedThreadFactory("handle-read-write-executor"))
 
   // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks :
   // which should be executed asap
@@ -76,7 +78,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
     conf.getInt("spark.core.connection.connect.threads.min", 1),
     conf.getInt("spark.core.connection.connect.threads.max", 8),
     conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS,
-    new LinkedBlockingDeque[Runnable]())
+    new LinkedBlockingDeque[Runnable](),
+    Utils.namedThreadFactory("handle-connect-executor"))
 
   private val serverChannel = ServerSocketChannel.open()
   // used to track the SendingConnections waiting to do SASL negotiation

http://git-wip-us.apache.org/repos/asf/spark/blob/12901643/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1a4f4eb..8cbb905 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL, URLConnection}
 import java.nio.ByteBuffer
 import java.util.{Locale, Random, UUID}
-import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
+import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
 
 import scala.collection.JavaConversions._
 import scala.collection.Map
@@ -553,19 +553,19 @@ private[spark] object Utils extends Logging {
     new ThreadFactoryBuilder().setDaemon(true)
 
   /**
-   * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
-   * unique, sequentially assigned integer.
+   * Create a thread factory that names threads with a prefix and also sets the threads to daemon.
    */
-  def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
-    val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
-    Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
+  def namedThreadFactory(prefix: String): ThreadFactory = {
+    daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
   }
 
   /**
-   * Return the string to tell how long has passed in milliseconds.
+   * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
+   * unique, sequentially assigned integer.
    */
-  def getUsedTimeMs(startTimeMs: Long): String = {
-    " " + (System.currentTimeMillis - startTimeMs) + " ms"
+  def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
+    val threadFactory = namedThreadFactory(prefix)
+    Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
   }
 
   /**
@@ -573,10 +573,17 @@ private[spark] object Utils extends Logging {
    * unique, sequentially assigned integer.
    */
   def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
-    val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
+    val threadFactory = namedThreadFactory(prefix)
     Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
   }
 
+  /**
+   * Return the string to tell how long has passed in milliseconds.
+   */
+  def getUsedTimeMs(startTimeMs: Long): String = {
+    " " + (System.currentTimeMillis - startTimeMs) + " ms"
+  }
+
   private def listFilesSafely(file: File): Seq[File] = {
     val files = file.listFiles()
     if (files == null) {