You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/01/08 19:11:30 UTC

[spark] branch master updated: [SPARK-24920][CORE] Allow sharing Netty's memory pool allocators

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e103c4a  [SPARK-24920][CORE] Allow sharing Netty's memory pool allocators
e103c4a is described below

commit e103c4a5e72bab8862ff49d6d4c1e62e642fc412
Author: “attilapiros” <pi...@gmail.com>
AuthorDate: Tue Jan 8 13:11:11 2019 -0600

    [SPARK-24920][CORE] Allow sharing Netty's memory pool allocators
    
    ## What changes were proposed in this pull request?
    
    Introducing shared polled ByteBuf allocators.
    This feature can be enabled via the "spark.network.sharedByteBufAllocators.enabled" configuration.
    
    When it is on then only two pooled ByteBuf allocators are created:
    - one for transport servers where caching is allowed and
    - one for transport clients where caching is disabled
    
    This way the cache allowance remains as before.
    Both shareable pools are created with numCores parameter set to 0 (which defaults to the available processors) as conf.serverThreads() and conf.clientThreads() are module dependant and the lazy creation of this allocators would lead to unpredicted behaviour.
    
    When "spark.network.sharedByteBufAllocators.enabled" is false then a new allocator is created for every transport client and server separately as was before this PR.
    
    ## How was this patch tested?
    
    Existing unit tests.
    
    Closes #23278 from attilapiros/SPARK-24920.
    
    Authored-by: “attilapiros” <pi...@gmail.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../network/client/TransportClientFactory.java     | 11 +++--
 .../spark/network/server/TransportServer.java      | 17 +++++---
 .../org/apache/spark/network/util/NettyUtils.java  | 48 ++++++++++++++++++++++
 .../apache/spark/network/util/TransportConf.java   | 18 ++++++++
 .../spark/network/netty/SparkTransportConf.scala   | 25 +----------
 docs/configuration.md                              | 10 +++++
 6 files changed, 97 insertions(+), 32 deletions(-)

diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 16d242d..a8e2715 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -84,7 +84,7 @@ public class TransportClientFactory implements Closeable {
 
   private final Class<? extends Channel> socketChannelClass;
   private EventLoopGroup workerGroup;
-  private PooledByteBufAllocator pooledAllocator;
+  private final PooledByteBufAllocator pooledAllocator;
   private final NettyMemoryMetrics metrics;
 
   public TransportClientFactory(
@@ -103,8 +103,13 @@ public class TransportClientFactory implements Closeable {
         ioMode,
         conf.clientThreads(),
         conf.getModuleName() + "-client");
-    this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
-      conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
+    if (conf.sharedByteBufAllocators()) {
+      this.pooledAllocator = NettyUtils.getSharedPooledByteBufAllocator(
+          conf.preferDirectBufsForSharedByteBufAllocators(), false /* allowCache */);
+    } else {
+      this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
+          conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
+    }
     this.metrics = new NettyMemoryMetrics(
       this.pooledAllocator, conf.getModuleName() + "-client", conf);
   }
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
index eb5f10a..a0ecde2 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -54,6 +54,7 @@ public class TransportServer implements Closeable {
   private ServerBootstrap bootstrap;
   private ChannelFuture channelFuture;
   private int port = -1;
+  private final PooledByteBufAllocator pooledAllocator;
   private NettyMemoryMetrics metrics;
 
   /**
@@ -69,6 +70,13 @@ public class TransportServer implements Closeable {
     this.context = context;
     this.conf = context.getConf();
     this.appRpcHandler = appRpcHandler;
+    if (conf.sharedByteBufAllocators()) {
+      this.pooledAllocator = NettyUtils.getSharedPooledByteBufAllocator(
+          conf.preferDirectBufsForSharedByteBufAllocators(), true /* allowCache */);
+    } else {
+      this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
+          conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
+    }
     this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
 
     boolean shouldClose = true;
@@ -96,18 +104,15 @@ public class TransportServer implements Closeable {
       NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
     EventLoopGroup workerGroup = bossGroup;
 
-    PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
-      conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
-
     bootstrap = new ServerBootstrap()
       .group(bossGroup, workerGroup)
       .channel(NettyUtils.getServerChannelClass(ioMode))
-      .option(ChannelOption.ALLOCATOR, allocator)
+      .option(ChannelOption.ALLOCATOR, pooledAllocator)
       .option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
-      .childOption(ChannelOption.ALLOCATOR, allocator);
+      .childOption(ChannelOption.ALLOCATOR, pooledAllocator);
 
     this.metrics = new NettyMemoryMetrics(
-      allocator, conf.getModuleName() + "-server", conf);
+      pooledAllocator, conf.getModuleName() + "-server", conf);
 
     if (conf.backLog() > 0) {
       bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
index 33d6eb4..423cc0c 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
@@ -36,6 +36,22 @@ import io.netty.util.internal.PlatformDependent;
  * Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO.
  */
 public class NettyUtils {
+
+  /**
+   * Specifies an upper bound on the number of Netty threads that Spark requires by default.
+   * In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core
+   * that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes
+   * at a premium.
+   *
+   * Thus, this value should still retain maximum throughput and reduce wasted off-heap memory
+   * allocation. It can be overridden by setting the number of serverThreads and clientThreads
+   * manually in Spark's configuration.
+   */
+  private static int MAX_DEFAULT_NETTY_THREADS = 8;
+
+  private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator =
+      new PooledByteBufAllocator[2];
+
   /** Creates a new ThreadFactory which prefixes each thread with the given name. */
   public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
     return new DefaultThreadFactory(threadPoolPrefix, true);
@@ -96,6 +112,38 @@ public class NettyUtils {
   }
 
   /**
+   * Returns the default number of threads for both the Netty client and server thread pools.
+   * If numUsableCores is 0, we will use Runtime get an approximate number of available cores.
+   */
+  public static int defaultNumThreads(int numUsableCores) {
+    final int availableCores;
+    if (numUsableCores > 0) {
+      availableCores = numUsableCores;
+    } else {
+      availableCores = Runtime.getRuntime().availableProcessors();
+    }
+    return Math.min(availableCores, MAX_DEFAULT_NETTY_THREADS);
+  }
+
+  /**
+   * Returns the lazily created shared pooled ByteBuf allocator for the specified allowCache
+   * parameter value.
+   */
+  public static synchronized PooledByteBufAllocator getSharedPooledByteBufAllocator(
+      boolean allowDirectBufs,
+      boolean allowCache) {
+    final int index = allowCache ? 0 : 1;
+    if (_sharedPooledByteBufAllocator[index] == null) {
+      _sharedPooledByteBufAllocator[index] =
+        createPooledByteBufAllocator(
+          allowDirectBufs,
+          allowCache,
+          defaultNumThreads(0));
+    }
+    return _sharedPooledByteBufAllocator[index];
+  }
+
+  /**
    * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
    * are disabled for TransportClients because the ByteBufs are allocated by the event loop thread,
    * but released by the executor thread rather than the event loop thread. Those thread-local
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 201628b..89ee5ee 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -266,6 +266,23 @@ public class TransportConf {
   }
 
   /**
+   * Flag indicating whether to share the pooled ByteBuf allocators between the different Netty
+   * channels. If enabled then only two pooled ByteBuf allocators are created: one where caching
+   * is allowed (for transport servers) and one where not (for transport clients).
+   * When disabled a new allocator is created for each transport servers and clients.
+   */
+  public boolean sharedByteBufAllocators() {
+    return conf.getBoolean("spark.network.sharedByteBufAllocators.enabled", true);
+  }
+
+  /**
+  * If enabled then off-heap byte buffers will be prefered for the shared ByteBuf allocators.
+  */
+  public boolean preferDirectBufsForSharedByteBufAllocators() {
+    return conf.getBoolean("spark.network.io.preferDirectBufs", true);
+  }
+
+  /**
    * The commons-crypto configuration for the module.
    */
   public Properties cryptoConf() {
@@ -313,4 +330,5 @@ public class TransportConf {
       this.serverThreads() > 0 ? this.serverThreads() : 2 * NettyRuntime.availableProcessors();
     return (int) Math.ceil(threads * (chunkFetchHandlerThreadsPercent / 100.0));
   }
+
 }
diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
index 25f7bcb..3ba0a0a 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
@@ -20,7 +20,7 @@ package org.apache.spark.network.netty
 import scala.collection.JavaConverters._
 
 import org.apache.spark.SparkConf
-import org.apache.spark.network.util.{ConfigProvider, TransportConf}
+import org.apache.spark.network.util.{ConfigProvider, NettyUtils, TransportConf}
 
 /**
  * Provides a utility for transforming from a SparkConf inside a Spark JVM (e.g., Executor,
@@ -28,17 +28,6 @@ import org.apache.spark.network.util.{ConfigProvider, TransportConf}
  * like the number of cores that are allocated to this JVM.
  */
 object SparkTransportConf {
-  /**
-   * Specifies an upper bound on the number of Netty threads that Spark requires by default.
-   * In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core
-   * that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes
-   * at a premium.
-   *
-   * Thus, this value should still retain maximum throughput and reduce wasted off-heap memory
-   * allocation. It can be overridden by setting the number of serverThreads and clientThreads
-   * manually in Spark's configuration.
-   */
-  private val MAX_DEFAULT_NETTY_THREADS = 8
 
   /**
    * Utility for creating a [[TransportConf]] from a [[SparkConf]].
@@ -54,7 +43,7 @@ object SparkTransportConf {
     // Specify thread configuration based on our JVM's allocation of cores (rather than necessarily
     // assuming we have all the machine's cores).
     // NB: Only set if serverThreads/clientThreads not already set.
-    val numThreads = defaultNumThreads(numUsableCores)
+    val numThreads = NettyUtils.defaultNumThreads(numUsableCores)
     conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
     conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)
 
@@ -66,14 +55,4 @@ object SparkTransportConf {
       }
     })
   }
-
-  /**
-   * Returns the default number of threads for both the Netty client and server thread pools.
-   * If numUsableCores is 0, we will use Runtime get an approximate number of available cores.
-   */
-  private def defaultNumThreads(numUsableCores: Int): Int = {
-    val availableCores =
-      if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
-    math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
-  }
 }
diff --git a/docs/configuration.md b/docs/configuration.md
index ff9b802..33148ed 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1506,6 +1506,16 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.network.io.preferDirectBufs</code></td>
+  <td>true</td>
+  <td>
+    If enabled then off-heap buffer allocations are preferred by the shared allocators.
+    Off-heap buffers are used to reduce garbage collection during shuffle and cache
+    block transfer. For environments where off-heap memory is tightly limited, users may wish to
+    turn this off to force all allocations to be on-heap.
+    </td>
+</tr>
+<tr>
   <td><code>spark.port.maxRetries</code></td>
   <td>16</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org