You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/11/26 05:57:28 UTC

spark git commit: [SPARK-4516] Cap default number of Netty threads at 8

Repository: spark
Updated Branches:
  refs/heads/master b5fb1410c -> f5f2d2738


[SPARK-4516] Cap default number of Netty threads at 8

In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes at a premium.

Thus, this value should still retain maximum throughput and reduce wasted off-heap memory allocation. It can be overridden by setting the number of serverThreads and clientThreads manually in Spark's configuration.

Author: Aaron Davidson <aa...@databricks.com>

Closes #3469 from aarondav/fewer-pools2 and squashes the following commits:

087c59f [Aaron Davidson] [SPARK-4516] Cap default number of Netty threads at 8


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5f2d273
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5f2d273
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5f2d273

Branch: refs/heads/master
Commit: f5f2d27385c243959f03a9d78a149d5f405b2f50
Parents: b5fb141
Author: Aaron Davidson <aa...@databricks.com>
Authored: Tue Nov 25 23:57:04 2014 -0500
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Nov 25 23:57:04 2014 -0500

----------------------------------------------------------------------
 .../network/netty/SparkTransportConf.scala      | 44 ++++++++++++++++----
 1 file changed, 37 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f5f2d273/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
index ce4225c..cef2030 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
@@ -20,8 +20,25 @@ package org.apache.spark.network.netty
 import org.apache.spark.SparkConf
 import org.apache.spark.network.util.{TransportConf, ConfigProvider}
 
+/**
+ * Provides a utility for transforming from a SparkConf inside a Spark JVM (e.g., Executor,
+ * Driver, or a standalone shuffle service) into a TransportConf with details on our environment
+ * like the number of cores that are allocated to this JVM.
+ */
 object SparkTransportConf {
   /**
+   * Specifies an upper bound on the number of Netty threads that Spark requires by default.
+   * In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core
+   * that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes
+   * at a premium.
+   *
+   * Thus, this value should still retain maximum throughput and reduce wasted off-heap memory
+   * allocation. It can be overridden by setting the number of serverThreads and clientThreads
+   * manually in Spark's configuration.
+   */
+  private val MAX_DEFAULT_NETTY_THREADS = 8
+
+  /**
    * Utility for creating a [[TransportConf]] from a [[SparkConf]].
    * @param numUsableCores if nonzero, this will restrict the server and client threads to only
    *                       use the given number of cores, rather than all of the machine's cores.
@@ -29,15 +46,28 @@ object SparkTransportConf {
    */
   def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = {
     val conf = _conf.clone
-    if (numUsableCores > 0) {
-      // Only set if serverThreads/clientThreads not already set.
-      conf.set("spark.shuffle.io.serverThreads",
-        conf.get("spark.shuffle.io.serverThreads", numUsableCores.toString))
-      conf.set("spark.shuffle.io.clientThreads",
-        conf.get("spark.shuffle.io.clientThreads", numUsableCores.toString))
-    }
+
+    // Specify thread configuration based on our JVM's allocation of cores (rather than necessarily
+    // assuming we have all the machine's cores).
+    // NB: Only set if serverThreads/clientThreads not already set.
+    val numThreads = defaultNumThreads(numUsableCores)
+    conf.set("spark.shuffle.io.serverThreads",
+      conf.get("spark.shuffle.io.serverThreads", numThreads.toString))
+    conf.set("spark.shuffle.io.clientThreads",
+      conf.get("spark.shuffle.io.clientThreads", numThreads.toString))
+
     new TransportConf(new ConfigProvider {
       override def get(name: String): String = conf.get(name)
     })
   }
+
+  /**
+   * Returns the default number of threads for both the Netty client and server thread pools.
+   * If numUsableCores is 0, we will use Runtime get an approximate number of available cores.
+   */
+  private def defaultNumThreads(numUsableCores: Int): Int = {
+    val availableCores =
+      if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
+    math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org