You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/12/18 21:53:49 UTC

spark git commit: [SPARK-3607] ConnectionManager threads.max configs on the thread pools don't work

Repository: spark
Updated Branches:
  refs/heads/master d9956f86a -> 3720057b8


[SPARK-3607] ConnectionManager threads.max configs on the thread pools don't work

Hi all - cleaned up the code to get rid of the unused parameter and added some discussion of the ThreadPoolExecutor parameters to explain why we can use a single threadCount instead of providing a min/max.

Author: Ilya Ganelin <il...@capitalone.com>

Closes #3664 from ilganeli/SPARK-3607C and squashes the following commits:

3c05690 [Ilya Ganelin] Updated documentation and refactored code to extract shared variables


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3720057b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3720057b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3720057b

Branch: refs/heads/master
Commit: 3720057b8e7c15c2c0464b5bb7243bc22323f4e8
Parents: d9956f8
Author: Ilya Ganelin <il...@capitalone.com>
Authored: Thu Dec 18 12:53:18 2014 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Dec 18 12:53:18 2014 -0800

----------------------------------------------------------------------
 .../spark/network/nio/ConnectionManager.scala   | 27 +++++++++++++-------
 1 file changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3720057b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index df4b085..243b71c 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -83,9 +83,21 @@ private[nio] class ConnectionManager(
 
   private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60)
 
+  // Get the thread counts from the Spark Configuration.
+  // 
+  // Even though the ThreadPoolExecutor constructor takes both a minimum and maximum value,
+  // we only query for the minimum value because we are using LinkedBlockingDeque.
+  // 
+  // The JavaDoc for ThreadPoolExecutor points out that when using a LinkedBlockingDeque (which is 
+  // an unbounded queue) no more than corePoolSize threads will ever be created, so only the "min"
+  // parameter is necessary.
+  private val handlerThreadCount = conf.getInt("spark.core.connection.handler.threads.min", 20)
+  private val ioThreadCount = conf.getInt("spark.core.connection.io.threads.min", 4)
+  private val connectThreadCount = conf.getInt("spark.core.connection.connect.threads.min", 1)
+
   private val handleMessageExecutor = new ThreadPoolExecutor(
-    conf.getInt("spark.core.connection.handler.threads.min", 20),
-    conf.getInt("spark.core.connection.handler.threads.max", 60),
+    handlerThreadCount,
+    handlerThreadCount,
     conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable](),
     Utils.namedThreadFactory("handle-message-executor")) {
@@ -96,12 +108,11 @@ private[nio] class ConnectionManager(
         logError("Error in handleMessageExecutor is not handled properly", t)
       }
     }
-
   }
 
   private val handleReadWriteExecutor = new ThreadPoolExecutor(
-    conf.getInt("spark.core.connection.io.threads.min", 4),
-    conf.getInt("spark.core.connection.io.threads.max", 32),
+    ioThreadCount,
+    ioThreadCount,
     conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable](),
     Utils.namedThreadFactory("handle-read-write-executor")) {
@@ -112,14 +123,13 @@ private[nio] class ConnectionManager(
         logError("Error in handleReadWriteExecutor is not handled properly", t)
       }
     }
-
   }
 
   // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks :
   // which should be executed asap
   private val handleConnectExecutor = new ThreadPoolExecutor(
-    conf.getInt("spark.core.connection.connect.threads.min", 1),
-    conf.getInt("spark.core.connection.connect.threads.max", 8),
+    connectThreadCount,
+    connectThreadCount,
     conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable](),
     Utils.namedThreadFactory("handle-connect-executor")) {
@@ -130,7 +140,6 @@ private[nio] class ConnectionManager(
         logError("Error in handleConnectExecutor is not handled properly", t)
       }
     }
-
   }
 
   private val serverChannel = ServerSocketChannel.open()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org