You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/11/13 03:46:42 UTC
spark git commit: [SPARK-4370] [Core] Limit number of Netty cores
based on executor size
Repository: spark
Updated Branches:
refs/heads/master 23f5bdf06 -> b9e1c2eb9
[SPARK-4370] [Core] Limit number of Netty cores based on executor size
Author: Aaron Davidson <aa...@databricks.com>
Closes #3155 from aarondav/conf and squashes the following commits:
7045e77 [Aaron Davidson] Add mesos comment
4770f6e [Aaron Davidson] [SPARK-4370] [Core] Limit number of Netty cores based on executor size
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9e1c2eb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9e1c2eb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9e1c2eb
Branch: refs/heads/master
Commit: b9e1c2eb9b6f7fb609718ef20048a8da452d881b
Parents: 23f5bdf
Author: Aaron Davidson <aa...@databricks.com>
Authored: Wed Nov 12 18:46:37 2014 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Nov 12 18:46:37 2014 -0800
----------------------------------------------------------------------
.../main/scala/org/apache/spark/SparkEnv.scala | 12 ++++--
.../worker/StandaloneWorkerShuffleService.scala | 2 +-
.../executor/CoarseGrainedExecutorBackend.scala | 4 +-
.../org/apache/spark/executor/Executor.scala | 3 +-
.../spark/executor/MesosExecutorBackend.scala | 17 +++++++--
.../netty/NettyBlockTransferService.scala | 4 +-
.../network/netty/SparkTransportConf.scala | 19 ++++++++--
.../spark/scheduler/local/LocalBackend.scala | 2 +-
.../org/apache/spark/storage/BlockManager.scala | 12 +++---
.../spark/ExternalShuffleServiceSuite.scala | 2 +-
.../netty/NettyBlockTransferSecuritySuite.scala | 4 +-
.../storage/BlockManagerReplicationSuite.scala | 4 +-
.../spark/storage/BlockManagerSuite.scala | 5 ++-
.../network/client/TransportClientFactory.java | 33 +----------------
.../spark/network/server/TransportServer.java | 4 +-
.../apache/spark/network/util/NettyUtils.java | 39 ++++++++++++++++++++
.../streaming/ReceivedBlockHandlerSuite.scala | 2 +-
17 files changed, 104 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index e7454be..e464b32 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -168,9 +168,11 @@ object SparkEnv extends Logging {
executorId: String,
hostname: String,
port: Int,
+ numCores: Int,
isLocal: Boolean,
actorSystem: ActorSystem = null): SparkEnv = {
- create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem)
+ create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem,
+ numUsableCores = numCores)
}
/**
@@ -184,7 +186,8 @@ object SparkEnv extends Logging {
isDriver: Boolean,
isLocal: Boolean,
listenerBus: LiveListenerBus = null,
- defaultActorSystem: ActorSystem = null): SparkEnv = {
+ defaultActorSystem: ActorSystem = null,
+ numUsableCores: Int = 0): SparkEnv = {
// Listener bus is only used on the driver
if (isDriver) {
@@ -276,7 +279,7 @@ object SparkEnv extends Logging {
val blockTransferService =
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
case "netty" =>
- new NettyBlockTransferService(conf, securityManager)
+ new NettyBlockTransferService(conf, securityManager, numUsableCores)
case "nio" =>
new NioBlockTransferService(conf, securityManager)
}
@@ -287,7 +290,8 @@ object SparkEnv extends Logging {
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
- serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
+ serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,
+ numUsableCores)
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
index d044e1d..b979896 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
@@ -39,7 +39,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
- private val transportConf = SparkTransportConf.fromSparkConf(sparkConf)
+ private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
private val transportContext: TransportContext = {
val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler
http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 3711824..5f46f3b 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -57,9 +57,9 @@ private[spark] class CoarseGrainedExecutorBackend(
override def receiveWithLogging = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
- // Make this host instead of hostPort ?
val (hostname, _) = Utils.parseHostPort(hostPort)
- executor = new Executor(executorId, hostname, sparkProperties, isLocal = false, actorSystem)
+ executor = new Executor(executorId, hostname, sparkProperties, cores, isLocal = false,
+ actorSystem)
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index caf4d76..4c378a2 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -43,6 +43,7 @@ private[spark] class Executor(
executorId: String,
slaveHostname: String,
properties: Seq[(String, String)],
+ numCores: Int,
isLocal: Boolean = false,
actorSystem: ActorSystem = null)
extends Logging
@@ -83,7 +84,7 @@ private[spark] class Executor(
if (!isLocal) {
val port = conf.getInt("spark.executor.port", 0)
val _env = SparkEnv.createExecutorEnv(
- conf, executorId, slaveHostname, port, isLocal, actorSystem)
+ conf, executorId, slaveHostname, port, numCores, isLocal, actorSystem)
SparkEnv.set(_env)
_env.metricsSystem.registerSource(executorSource)
_env.blockManager.initialize(conf.getAppId)
http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index bca0b15..f15e6bc 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -19,6 +19,8 @@ package org.apache.spark.executor
import java.nio.ByteBuffer
+import scala.collection.JavaConversions._
+
import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary}
import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
@@ -50,14 +52,23 @@ private[spark] class MesosExecutorBackend
executorInfo: ExecutorInfo,
frameworkInfo: FrameworkInfo,
slaveInfo: SlaveInfo) {
- logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
+
+ // Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend.
+ val cpusPerTask = executorInfo.getResourcesList
+ .find(_.getName == "cpus")
+ .map(_.getScalar.getValue.toInt)
+ .getOrElse(0)
+ val executorId = executorInfo.getExecutorId.getValue
+
+ logInfo(s"Registered with Mesos as executor ID $executorId with $cpusPerTask cpus")
this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
executor = new Executor(
- executorInfo.getExecutorId.getValue,
+ executorId,
slaveInfo.getHostname,
- properties)
+ properties,
+ cpusPerTask)
}
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index f8a7f64..0027cbb 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -35,13 +35,13 @@ import org.apache.spark.util.Utils
/**
* A BlockTransferService that uses Netty to fetch a set of blocks at at time.
*/
-class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager)
+class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager, numCores: Int)
extends BlockTransferService {
// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
private val serializer = new JavaSerializer(conf)
private val authEnabled = securityManager.isAuthenticationEnabled()
- private val transportConf = SparkTransportConf.fromSparkConf(conf)
+ private val transportConf = SparkTransportConf.fromSparkConf(conf, numCores)
private[this] var transportContext: TransportContext = _
private[this] var server: TransportServer = _
http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
index 9fa4fa7..ce4225c 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
@@ -20,11 +20,22 @@ package org.apache.spark.network.netty
import org.apache.spark.SparkConf
import org.apache.spark.network.util.{TransportConf, ConfigProvider}
-/**
- * Utility for creating a [[TransportConf]] from a [[SparkConf]].
- */
object SparkTransportConf {
- def fromSparkConf(conf: SparkConf): TransportConf = {
+ /**
+ * Utility for creating a [[TransportConf]] from a [[SparkConf]].
+ * @param numUsableCores if nonzero, this will restrict the server and client threads to only
+ * use the given number of cores, rather than all of the machine's cores.
+ * This restriction will only occur if these properties are not already set.
+ */
+ def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = {
+ val conf = _conf.clone
+ if (numUsableCores > 0) {
+ // Only set if serverThreads/clientThreads not already set.
+ conf.set("spark.shuffle.io.serverThreads",
+ conf.get("spark.shuffle.io.serverThreads", numUsableCores.toString))
+ conf.set("spark.shuffle.io.clientThreads",
+ conf.get("spark.shuffle.io.clientThreads", numUsableCores.toString))
+ }
new TransportConf(new ConfigProvider {
override def get(name: String): String = conf.get(name)
})
http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index c026483..a2f1f14 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -51,7 +51,7 @@ private[spark] class LocalActor(
private val localExecutorHostname = "localhost"
val executor = new Executor(
- localExecutorId, localExecutorHostname, scheduler.conf.getAll, isLocal = true)
+ localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true)
override def receiveWithLogging = {
case ReviveOffers =>
http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 39434f4..308c59e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -73,7 +73,8 @@ private[spark] class BlockManager(
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
- securityManager: SecurityManager)
+ securityManager: SecurityManager,
+ numUsableCores: Int)
extends BlockDataManager with Logging {
val diskBlockManager = new DiskBlockManager(this, conf)
@@ -121,8 +122,8 @@ private[spark] class BlockManager(
// Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTranserService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
- new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), securityManager,
- securityManager.isAuthenticationEnabled())
+ val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores)
+ new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled())
} else {
blockTransferService
}
@@ -174,9 +175,10 @@ private[spark] class BlockManager(
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
- securityManager: SecurityManager) = {
+ securityManager: SecurityManager,
+ numUsableCores: Int) = {
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
- conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
+ conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 9623d66..55799f5 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -38,7 +38,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
var rpcHandler: ExternalShuffleBlockHandler = _
override def beforeAll() {
- val transportConf = SparkTransportConf.fromSparkConf(conf)
+ val transportConf = SparkTransportConf.fromSparkConf(conf, numUsableCores = 2)
rpcHandler = new ExternalShuffleBlockHandler(transportConf)
val transportContext = new TransportContext(transportConf, rpcHandler)
server = transportContext.createServer()
http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index 530f5d6..94bfa67 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -104,11 +104,11 @@ class NettyBlockTransferSecuritySuite extends FunSuite with MockitoSugar with Sh
when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer)
val securityManager0 = new SecurityManager(conf0)
- val exec0 = new NettyBlockTransferService(conf0, securityManager0)
+ val exec0 = new NettyBlockTransferService(conf0, securityManager0, numCores = 1)
exec0.init(blockManager)
val securityManager1 = new SecurityManager(conf1)
- val exec1 = new NettyBlockTransferService(conf1, securityManager1)
+ val exec1 = new NettyBlockTransferService(conf1, securityManager1, numCores = 1)
exec1.init(blockManager)
val result = fetchBlock(exec0, exec1, "1", blockId) match {
http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index f63e772..c2903c8 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -62,7 +62,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NioBlockTransferService(conf, securityMgr)
val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
- mapOutputTracker, shuffleManager, transfer, securityMgr)
+ mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
store.initialize("app-id")
allStores += store
store
@@ -263,7 +263,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
when(failableTransfer.hostName).thenReturn("some-hostname")
when(failableTransfer.port).thenReturn(1000)
val failableStore = new BlockManager("failable-store", actorSystem, master, serializer,
- 10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr)
+ 10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0)
failableStore.initialize("app-id")
allStores += failableStore // so that this gets stopped after test
assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId))
http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 9529502..5554efb 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -74,7 +74,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NioBlockTransferService(conf, securityMgr)
val manager = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
- mapOutputTracker, shuffleManager, transfer, securityMgr)
+ mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
manager.initialize("app-id")
manager
}
@@ -795,7 +795,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
// Use Java serializer so we can create an unserializable error.
val transfer = new NioBlockTransferService(conf, securityMgr)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, actorSystem, master,
- new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr)
+ new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr,
+ 0)
// The put should fail since a1 is not serializable.
class UnserializableClass
http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 397d3a8..76bce85 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -118,7 +118,8 @@ public class TransportClientFactory implements Closeable {
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs());
// Use pooled buffers to reduce temporary buffer allocation
- bootstrap.option(ChannelOption.ALLOCATOR, createPooledByteBufAllocator());
+ bootstrap.option(ChannelOption.ALLOCATOR, NettyUtils.createPooledByteBufAllocator(
+ conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()));
final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>();
@@ -190,34 +191,4 @@ public class TransportClientFactory implements Closeable {
workerGroup = null;
}
}
-
- /**
- * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
- * are disabled because the ByteBufs are allocated by the event loop thread, but released by the
- * executor thread rather than the event loop thread. Those thread-local caches actually delay
- * the recycling of buffers, leading to larger memory usage.
- */
- private PooledByteBufAllocator createPooledByteBufAllocator() {
- return new PooledByteBufAllocator(
- conf.preferDirectBufs() && PlatformDependent.directBufferPreferred(),
- getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"),
- getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"),
- getPrivateStaticField("DEFAULT_PAGE_SIZE"),
- getPrivateStaticField("DEFAULT_MAX_ORDER"),
- 0, // tinyCacheSize
- 0, // smallCacheSize
- 0 // normalCacheSize
- );
- }
-
- /** Used to get defaults from Netty's private static fields. */
- private int getPrivateStaticField(String name) {
- try {
- Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
- f.setAccessible(true);
- return f.getInt(null);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index 579676c..625c325 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -72,8 +72,8 @@ public class TransportServer implements Closeable {
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
EventLoopGroup workerGroup = bossGroup;
- PooledByteBufAllocator allocator = new PooledByteBufAllocator(
- conf.preferDirectBufs() && PlatformDependent.directBufferPreferred());
+ PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
+ conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
index 2a7664f..5c654a6 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
@@ -17,9 +17,11 @@
package org.apache.spark.network.util;
+import java.lang.reflect.Field;
import java.util.concurrent.ThreadFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
@@ -32,6 +34,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.util.internal.PlatformDependent;
/**
* Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO.
@@ -103,4 +106,40 @@ public class NettyUtils {
}
return "<unknown remote>";
}
+
+ /**
+ * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
+ * are disabled because the ByteBufs are allocated by the event loop thread, but released by the
+ * executor thread rather than the event loop thread. Those thread-local caches actually delay
+ * the recycling of buffers, leading to larger memory usage.
+ */
+ public static PooledByteBufAllocator createPooledByteBufAllocator(
+ boolean allowDirectBufs,
+ boolean allowCache,
+ int numCores) {
+ if (numCores == 0) {
+ numCores = Runtime.getRuntime().availableProcessors();
+ }
+ return new PooledByteBufAllocator(
+ allowDirectBufs && PlatformDependent.directBufferPreferred(),
+ Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores),
+ Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0),
+ getPrivateStaticField("DEFAULT_PAGE_SIZE"),
+ getPrivateStaticField("DEFAULT_MAX_ORDER"),
+ allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0,
+ allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0,
+ allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0
+ );
+ }
+
+ /** Used to get defaults from Netty's private static fields. */
+ private static int getPrivateStaticField(String name) {
+ try {
+ Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
+ f.setAccessible(true);
+ return f.getInt(null);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b9e1c2eb/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 9efe15d..3661e16 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -73,7 +73,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, serializer,
blockManagerSize, conf, mapOutputTracker, shuffleManager,
- new NioBlockTransferService(conf, securityMgr), securityMgr)
+ new NioBlockTransferService(conf, securityMgr), securityMgr, 0)
blockManager.initialize("app-id")
tempDirectory = Files.createTempDir()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org