You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2015/12/03 20:06:31 UTC
spark git commit: [SPARK-12101][CORE] Fix thread pools that cannot
cache tasks in Worker and AppClient
Repository: spark
Updated Branches:
refs/heads/master 7bc9e1db2 -> 649be4fa4
[SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worker and AppClient
`SynchronousQueue` cannot cache any task. This issue is similar to #9978. It's an easy fix. Just use the fixed `ThreadUtils.newDaemonCachedThreadPool`.
Author: Shixiong Zhu <sh...@databricks.com>
Closes #10108 from zsxwing/fix-threadpool.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/649be4fa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/649be4fa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/649be4fa
Branch: refs/heads/master
Commit: 649be4fa4532dcd3001df8345f9f7e970a3fbc65
Parents: 7bc9e1d
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Thu Dec 3 11:06:25 2015 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Thu Dec 3 11:06:25 2015 -0800
----------------------------------------------------------------------
.../org/apache/spark/deploy/client/AppClient.scala | 10 ++++------
.../scala/org/apache/spark/deploy/worker/Worker.scala | 10 ++++------
.../org/apache/spark/deploy/yarn/YarnAllocator.scala | 14 ++++----------
3 files changed, 12 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/649be4fa/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index df6ba7d..1e2f469 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -68,12 +68,10 @@ private[spark] class AppClient(
// A thread pool for registering with masters. Because registering with a master is a blocking
// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
// time so that we can register with all masters.
- private val registerMasterThreadPool = new ThreadPoolExecutor(
- 0,
- masterRpcAddresses.length, // Make sure we can register with all masters at the same time
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue[Runnable](),
- ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
+ private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+ "appclient-register-master-threadpool",
+ masterRpcAddresses.length // Make sure we can register with all masters at the same time
+ )
// A scheduled executor for scheduling the registration actions
private val registrationRetryThread =
http://git-wip-us.apache.org/repos/asf/spark/blob/649be4fa/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 418faf8..1afc1ff 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -146,12 +146,10 @@ private[deploy] class Worker(
// A thread pool for registering with masters. Because registering with a master is a blocking
// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
// time so that we can register with all masters.
- private val registerMasterThreadPool = new ThreadPoolExecutor(
- 0,
- masterRpcAddresses.size, // Make sure we can register with all masters at the same time
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue[Runnable](),
- ThreadUtils.namedThreadFactory("worker-register-master-threadpool"))
+ private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+ "worker-register-master-threadpool",
+ masterRpcAddresses.size // Make sure we can register with all masters at the same time
+ )
var coresUsed = 0
var memoryUsed = 0
http://git-wip-us.apache.org/repos/asf/spark/blob/649be4fa/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 73cd903..4e044aa 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -25,8 +25,6 @@ import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConverters._
-import com.google.common.util.concurrent.ThreadFactoryBuilder
-
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
@@ -40,7 +38,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
-import org.apache.spark.util.Utils
+import org.apache.spark.util.ThreadUtils
/**
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
@@ -117,13 +115,9 @@ private[yarn] class YarnAllocator(
// Resource capability requested for each executors
private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
- private val launcherPool = new ThreadPoolExecutor(
- // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
- sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
- 1, TimeUnit.MINUTES,
- new LinkedBlockingQueue[Runnable](),
- new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
- launcherPool.allowCoreThreadTimeOut(true)
+ private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
+ "ContainerLauncher",
+ sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25))
// For testing
private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org