You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/07/18 20:36:15 UTC

spark git commit: [SPARK-21408][CORE] Better default number of RPC dispatch threads.

Repository: spark
Updated Branches:
  refs/heads/master cde64add1 -> 264b0f36c


[SPARK-21408][CORE] Better default number of RPC dispatch threads.

Instead of using the host's cpu count, use the number of cores allocated
for the Spark process when sizing the RPC dispatch thread pool. This avoids
creating large thread pools on large machines when the number of allocated
cores is small.

Tested by verifying number of threads with spark.executor.cores set
to 1 and 4; same thing for YARN AM.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #18639 from vanzin/SPARK-21408.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/264b0f36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/264b0f36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/264b0f36

Branch: refs/heads/master
Commit: 264b0f36cedacd9a22b45a3e14b2186230432be6
Parents: cde64ad
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Tue Jul 18 13:36:10 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Jul 18 13:36:10 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkEnv.scala         | 2 +-
 core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala       | 6 ++++--
 .../main/scala/org/apache/spark/rpc/netty/Dispatcher.scala  | 9 +++++++--
 .../main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala | 7 ++++---
 .../scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala | 4 ++--
 .../org/apache/spark/deploy/yarn/ApplicationMaster.scala    | 6 ++++--
 6 files changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/264b0f36/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 45ed986..2492815 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -243,7 +243,7 @@ object SparkEnv extends Logging {
 
     val systemName = if (isDriver) driverSystemName else executorSystemName
     val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
-      securityManager, clientMode = !isDriver)
+      securityManager, numUsableCores, !isDriver)
 
     // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
     if (isDriver) {

http://git-wip-us.apache.org/repos/asf/spark/blob/264b0f36/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index 530743c..de2cc56 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -40,7 +40,7 @@ private[spark] object RpcEnv {
       conf: SparkConf,
       securityManager: SecurityManager,
       clientMode: Boolean = false): RpcEnv = {
-    create(name, host, host, port, conf, securityManager, clientMode)
+    create(name, host, host, port, conf, securityManager, 0, clientMode)
   }
 
   def create(
@@ -50,9 +50,10 @@ private[spark] object RpcEnv {
       port: Int,
       conf: SparkConf,
       securityManager: SecurityManager,
+      numUsableCores: Int,
       clientMode: Boolean): RpcEnv = {
     val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
-      clientMode)
+      numUsableCores, clientMode)
     new NettyRpcEnvFactory().create(config)
   }
 }
@@ -201,4 +202,5 @@ private[spark] case class RpcEnvConfig(
     advertiseAddress: String,
     port: Int,
     securityManager: SecurityManager,
+    numUsableCores: Int,
     clientMode: Boolean)

http://git-wip-us.apache.org/repos/asf/spark/blob/264b0f36/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index e94babb..904c4d0 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -32,8 +32,11 @@ import org.apache.spark.util.ThreadUtils
 
 /**
  * A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
+ *
+ * @param numUsableCores Number of CPU cores allocated to the process, for sizing the thread pool.
+ *                       If 0, will consider the available CPUs on the host.
  */
-private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
+private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
 
   private class EndpointData(
       val name: String,
@@ -192,8 +195,10 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
 
   /** Thread pool used for dispatching messages. */
   private val threadpool: ThreadPoolExecutor = {
+    val availableCores =
+      if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
     val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
-      math.max(2, Runtime.getRuntime.availableProcessors()))
+      math.max(2, availableCores))
     val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
     for (i <- 0 until numThreads) {
       pool.execute(new MessageLoop)

http://git-wip-us.apache.org/repos/asf/spark/blob/264b0f36/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 6489849..1777e7a 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -44,14 +44,15 @@ private[netty] class NettyRpcEnv(
     val conf: SparkConf,
     javaSerializerInstance: JavaSerializerInstance,
     host: String,
-    securityManager: SecurityManager) extends RpcEnv(conf) with Logging {
+    securityManager: SecurityManager,
+    numUsableCores: Int) extends RpcEnv(conf) with Logging {
 
   private[netty] val transportConf = SparkTransportConf.fromSparkConf(
     conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
     "rpc",
     conf.getInt("spark.rpc.io.threads", 0))
 
-  private val dispatcher: Dispatcher = new Dispatcher(this)
+  private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
 
   private val streamManager = new NettyStreamManager(this)
 
@@ -451,7 +452,7 @@ private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
       new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
     val nettyEnv =
       new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
-        config.securityManager)
+        config.securityManager, config.numUsableCores)
     if (!config.clientMode) {
       val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
         nettyEnv.startServer(config.bindAddress, actualPort)

http://git-wip-us.apache.org/repos/asf/spark/blob/264b0f36/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
index 2b1bce4..7771637 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
@@ -31,7 +31,7 @@ class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
       port: Int,
       clientMode: Boolean = false): RpcEnv = {
     val config = RpcEnvConfig(conf, "test", "localhost", "localhost", port,
-      new SecurityManager(conf), clientMode)
+      new SecurityManager(conf), 0, clientMode)
     new NettyRpcEnvFactory().create(config)
   }
 
@@ -47,7 +47,7 @@ class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
   test("advertise address different from bind address") {
     val sparkConf = new SparkConf()
     val config = RpcEnvConfig(sparkConf, "test", "localhost", "example.com", 0,
-      new SecurityManager(sparkConf), false)
+      new SecurityManager(sparkConf), 0, false)
     val env = new NettyRpcEnvFactory().create(config)
     try {
       assert(env.address.hostPort.startsWith("example.com:"))

http://git-wip-us.apache.org/repos/asf/spark/blob/264b0f36/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 6ff210a..fc92502 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -459,8 +459,10 @@ private[spark] class ApplicationMaster(
   }
 
   private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
-    rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, -1, sparkConf, securityMgr,
-      clientMode = true)
+    val hostname = Utils.localHostName
+    val amCores = sparkConf.get(AM_CORES)
+    rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr,
+      amCores, true)
     val driverRef = waitForSparkDriver()
     addAmIpFilter()
     registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org