You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/12/18 21:53:49 UTC
spark git commit: [SPARK-3607] ConnectionManager threads.max configs
on the thread pools don't work
Repository: spark
Updated Branches:
refs/heads/master d9956f86a -> 3720057b8
[SPARK-3607] ConnectionManager threads.max configs on the thread pools don't work
Hi all - cleaned up the code to get rid of the unused parameter and added some discussion of the ThreadPoolExecutor parameters to explain why we can use a single threadCount instead of providing a min/max.
Author: Ilya Ganelin <il...@capitalone.com>
Closes #3664 from ilganeli/SPARK-3607C and squashes the following commits:
3c05690 [Ilya Ganelin] Updated documentation and refactored code to extract shared variables
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3720057b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3720057b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3720057b
Branch: refs/heads/master
Commit: 3720057b8e7c15c2c0464b5bb7243bc22323f4e8
Parents: d9956f8
Author: Ilya Ganelin <il...@capitalone.com>
Authored: Thu Dec 18 12:53:18 2014 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Dec 18 12:53:18 2014 -0800
----------------------------------------------------------------------
.../spark/network/nio/ConnectionManager.scala | 27 +++++++++++++-------
1 file changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3720057b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index df4b085..243b71c 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -83,9 +83,21 @@ private[nio] class ConnectionManager(
private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60)
+ // Get the thread counts from the Spark Configuration.
+ //
+ // Even though the ThreadPoolExecutor constructor takes both a minimum and maximum value,
+ // we only query for the minimum value because we are using LinkedBlockingDeque.
+ //
+ // The JavaDoc for ThreadPoolExecutor points out that when using a LinkedBlockingDeque (which is
+ // an unbounded queue) no more than corePoolSize threads will ever be created, so only the "min"
+ // parameter is necessary.
+ private val handlerThreadCount = conf.getInt("spark.core.connection.handler.threads.min", 20)
+ private val ioThreadCount = conf.getInt("spark.core.connection.io.threads.min", 4)
+ private val connectThreadCount = conf.getInt("spark.core.connection.connect.threads.min", 1)
+
private val handleMessageExecutor = new ThreadPoolExecutor(
- conf.getInt("spark.core.connection.handler.threads.min", 20),
- conf.getInt("spark.core.connection.handler.threads.max", 60),
+ handlerThreadCount,
+ handlerThreadCount,
conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable](),
Utils.namedThreadFactory("handle-message-executor")) {
@@ -96,12 +108,11 @@ private[nio] class ConnectionManager(
logError("Error in handleMessageExecutor is not handled properly", t)
}
}
-
}
private val handleReadWriteExecutor = new ThreadPoolExecutor(
- conf.getInt("spark.core.connection.io.threads.min", 4),
- conf.getInt("spark.core.connection.io.threads.max", 32),
+ ioThreadCount,
+ ioThreadCount,
conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable](),
Utils.namedThreadFactory("handle-read-write-executor")) {
@@ -112,14 +123,13 @@ private[nio] class ConnectionManager(
logError("Error in handleReadWriteExecutor is not handled properly", t)
}
}
-
}
// Use a different, yet smaller, thread pool - infrequently used with very short lived tasks :
// which should be executed asap
private val handleConnectExecutor = new ThreadPoolExecutor(
- conf.getInt("spark.core.connection.connect.threads.min", 1),
- conf.getInt("spark.core.connection.connect.threads.max", 8),
+ connectThreadCount,
+ connectThreadCount,
conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable](),
Utils.namedThreadFactory("handle-connect-executor")) {
@@ -130,7 +140,6 @@ private[nio] class ConnectionManager(
logError("Error in handleConnectExecutor is not handled properly", t)
}
}
-
}
private val serverChannel = ServerSocketChannel.open()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org