You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/04/22 20:09:07 UTC

spark git commit: [SPARK-7052][Core] Add ThreadUtils and move thread methods from Utils to ThreadUtils

Repository: spark
Updated Branches:
  refs/heads/master bdc5c16e7 -> 33b85620f


[SPARK-7052][Core] Add ThreadUtils and move thread methods from Utils to ThreadUtils

As per rxin 's suggestion in https://github.com/apache/spark/pull/5392/files#r28757176

What's more, there is a race condition in the global shared `daemonThreadFactoryBuilder`. `daemonThreadFactoryBuilder` may be modified by multiple threads. This PR removed the global `daemonThreadFactoryBuilder` and created a new `ThreadFactoryBuilder` every time.

Author: zsxwing <zs...@gmail.com>

Closes #5631 from zsxwing/thread-utils and squashes the following commits:

9fe5b0e [zsxwing] Add ThreadUtils and move thread methods from Utils to ThreadUtils


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33b85620
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33b85620
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33b85620

Branch: refs/heads/master
Commit: 33b85620f910c404873d362d27cca1223084913a
Parents: bdc5c16
Author: zsxwing <zs...@gmail.com>
Authored: Wed Apr 22 11:08:59 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Apr 22 11:08:59 2015 -0700

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       |  8 +--
 .../org/apache/spark/HeartbeatReceiver.scala    | 11 ++--
 .../deploy/history/FsHistoryProvider.scala      |  4 +-
 .../org/apache/spark/executor/Executor.scala    |  7 +-
 .../spark/network/nio/ConnectionManager.scala   | 12 ++--
 .../apache/spark/scheduler/DAGScheduler.scala   |  4 +-
 .../spark/scheduler/TaskResultGetter.scala      |  4 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |  6 +-
 .../cluster/YarnSchedulerBackend.scala          |  4 +-
 .../spark/scheduler/local/LocalBackend.scala    |  8 +--
 .../storage/BlockManagerMasterEndpoint.scala    |  4 +-
 .../storage/BlockManagerSlaveEndpoint.scala     |  4 +-
 .../org/apache/spark/util/ThreadUtils.scala     | 67 ++++++++++++++++++++
 .../scala/org/apache/spark/util/Utils.scala     | 29 ---------
 .../apache/spark/util/ThreadUtilsSuite.scala    | 57 +++++++++++++++++
 .../streaming/kafka/KafkaInputDStream.scala     |  5 +-
 .../streaming/kafka/ReliableKafkaReceiver.scala |  4 +-
 .../receiver/ReceivedBlockHandler.scala         |  4 +-
 .../streaming/util/WriteAheadLogManager.scala   |  4 +-
 19 files changed, 170 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 4e7bf51..b986fa8 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -17,12 +17,12 @@
 
 package org.apache.spark
 
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable
 
 import org.apache.spark.scheduler._
-import org.apache.spark.util.{Clock, SystemClock, Utils}
+import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
 
 /**
  * An agent that dynamically allocates and removes executors based on the workload.
@@ -132,8 +132,8 @@ private[spark] class ExecutorAllocationManager(
   private val listener = new ExecutorAllocationListener
 
   // Executor that handles the scheduling task.
-  private val executor = Executors.newSingleThreadScheduledExecutor(
-    Utils.namedThreadFactory("spark-dynamic-executor-allocation"))
+  private val executor =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
 
   /**
    * Verify that the settings specified through the config are valid.

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index e3bd16f..68d05d5 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark
 
-import java.util.concurrent.{ScheduledFuture, TimeUnit, Executors}
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
 
 import scala.collection.mutable
 
@@ -25,7 +25,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext}
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
  * A heartbeat from executors to the driver. This is a shared message used by several internal
@@ -76,11 +76,10 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
   
   private var timeoutCheckingTask: ScheduledFuture[_] = null
 
-  private val timeoutCheckingThread = Executors.newSingleThreadScheduledExecutor(
-    Utils.namedThreadFactory("heartbeat-timeout-checking-thread"))
+  private val timeoutCheckingThread =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread")
 
-  private val killExecutorThread = Executors.newSingleThreadExecutor(
-    Utils.namedThreadFactory("kill-executor-thread"))
+  private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")
 
   override def onStart(): Unit = {
     timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 47bdd77..9847d59 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -32,7 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.scheduler._
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 
 
@@ -99,7 +99,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
    */
   private val replayExecutor: ExecutorService = {
     if (!conf.contains("spark.testing")) {
-      Executors.newSingleThreadExecutor(Utils.namedThreadFactory("log-replay-executor"))
+      ThreadUtils.newDaemonSingleThreadExecutor("log-replay-executor")
     } else {
       MoreExecutors.sameThreadExecutor()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 327d155..5fc04df 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -21,7 +21,7 @@ import java.io.File
 import java.lang.management.ManagementFactory
 import java.net.URL
 import java.nio.ByteBuffer
-import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -76,7 +76,7 @@ private[spark] class Executor(
   }
 
   // Start worker thread pool
-  private val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
+  private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
   private val executorSource = new ExecutorSource(threadPool, executorId)
 
   if (!isLocal) {
@@ -110,8 +110,7 @@ private[spark] class Executor(
   private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
 
   // Executor for the heartbeat task.
-  private val heartbeater = Executors.newSingleThreadScheduledExecutor(
-    Utils.namedThreadFactory("driver-heartbeater"))
+  private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
 
   startDriverHeartbeater()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index 1a68e62..16e9059 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -36,7 +36,7 @@ import io.netty.util.{Timeout, TimerTask, HashedWheelTimer}
 
 import org.apache.spark._
 import org.apache.spark.network.sasl.{SparkSaslClient, SparkSaslServer}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 import scala.util.Try
 import scala.util.control.NonFatal
@@ -79,7 +79,7 @@ private[nio] class ConnectionManager(
 
   private val selector = SelectorProvider.provider.openSelector()
   private val ackTimeoutMonitor =
-    new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor"))
+    new HashedWheelTimer(ThreadUtils.namedThreadFactory("AckTimeoutMonitor"))
 
   private val ackTimeout =
     conf.getTimeAsSeconds("spark.core.connection.ack.wait.timeout",
@@ -102,7 +102,7 @@ private[nio] class ConnectionManager(
     handlerThreadCount,
     conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable](),
-    Utils.namedThreadFactory("handle-message-executor")) {
+    ThreadUtils.namedThreadFactory("handle-message-executor")) {
 
     override def afterExecute(r: Runnable, t: Throwable): Unit = {
       super.afterExecute(r, t)
@@ -117,7 +117,7 @@ private[nio] class ConnectionManager(
     ioThreadCount,
     conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable](),
-    Utils.namedThreadFactory("handle-read-write-executor")) {
+    ThreadUtils.namedThreadFactory("handle-read-write-executor")) {
 
     override def afterExecute(r: Runnable, t: Throwable): Unit = {
       super.afterExecute(r, t)
@@ -134,7 +134,7 @@ private[nio] class ConnectionManager(
     connectThreadCount,
     conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable](),
-    Utils.namedThreadFactory("handle-connect-executor")) {
+    ThreadUtils.namedThreadFactory("handle-connect-executor")) {
 
     override def afterExecute(r: Runnable, t: Throwable): Unit = {
       super.afterExecute(r, t)
@@ -160,7 +160,7 @@ private[nio] class ConnectionManager(
   private val registerRequests = new SynchronizedQueue[SendingConnection]
 
   implicit val futureExecContext = ExecutionContext.fromExecutor(
-    Utils.newDaemonCachedThreadPool("Connection manager future execution context"))
+    ThreadUtils.newDaemonCachedThreadPool("Connection manager future execution context"))
 
   @volatile
   private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message] = null

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4a32f89..8c4bff4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
 
 import java.io.NotSerializableException
 import java.util.Properties
-import java.util.concurrent.{TimeUnit, Executors}
+import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map, Stack}
@@ -129,7 +129,7 @@ class DAGScheduler(
   private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
 
   private val messageScheduler =
-    Executors.newScheduledThreadPool(1, Utils.namedThreadFactory("dag-scheduler-message"))
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
 
   private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
   taskScheduler.setDAGScheduler(this)

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 3938580..391827c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -26,7 +26,7 @@ import scala.util.control.NonFatal
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
  * Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
@@ -35,7 +35,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
   extends Logging {
 
   private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4)
-  private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
+  private val getTaskResultExecutor = ThreadUtils.newDaemonFixedThreadPool(
     THREADS, "task-result-getter")
 
   protected val serializer = new ThreadLocal[SerializerInstance] {

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 63987df..9656fb7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler.cluster
 
-import java.util.concurrent.{TimeUnit, Executors}
+import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -26,7 +26,7 @@ import org.apache.spark.rpc._
 import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState}
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
+import org.apache.spark.util.{ThreadUtils, SerializableBuffer, AkkaUtils, Utils}
 
 /**
  * A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -73,7 +73,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     private val addressToExecutorId = new HashMap[RpcAddress, String]
 
     private val reviveThread =
-      Executors.newSingleThreadScheduledExecutor(Utils.namedThreadFactory("driver-revive-thread"))
+      ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
 
     override def onStart() {
       // Periodically revive offers to allow delay scheduling to work

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 1406a36..d987c7d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -24,7 +24,7 @@ import org.apache.spark.rpc._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.ui.JettyUtils
-import org.apache.spark.util.{RpcUtils, Utils}
+import org.apache.spark.util.{ThreadUtils, RpcUtils}
 
 import scala.util.control.NonFatal
 
@@ -97,7 +97,7 @@ private[spark] abstract class YarnSchedulerBackend(
     private var amEndpoint: Option[RpcEndpointRef] = None
 
     private val askAmThreadPool =
-      Utils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
+      ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
     implicit val askAmExecutor = ExecutionContext.fromExecutor(askAmThreadPool)
 
     override def receive: PartialFunction[Any, Unit] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 50ba0b9..ac5b524 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -18,14 +18,14 @@
 package org.apache.spark.scheduler.local
 
 import java.nio.ByteBuffer
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.concurrent.TimeUnit
 
 import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.{Executor, ExecutorBackend}
 import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 private case class ReviveOffers()
 
@@ -47,8 +47,8 @@ private[spark] class LocalEndpoint(
     private val totalCores: Int)
   extends ThreadSafeRpcEndpoint with Logging {
 
-  private val reviveThread = Executors.newSingleThreadScheduledExecutor(
-    Utils.namedThreadFactory("local-revive-thread"))
+  private val reviveThread =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("local-revive-thread")
 
   private var freeCores = totalCores
 

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 28c73a7..4682167 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -28,7 +28,7 @@ import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.BlockManagerMessages._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
  * BlockManagerMasterEndpoint is an [[ThreadSafeRpcEndpoint]] on the master node to track statuses
@@ -51,7 +51,7 @@ class BlockManagerMasterEndpoint(
   // Mapping from block id to the set of block managers that have the block.
   private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
 
-  private val askThreadPool = Utils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
+  private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
   private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
index 8980fa8..543df4e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
@@ -20,7 +20,7 @@ package org.apache.spark.storage
 import scala.concurrent.{ExecutionContext, Future}
 
 import org.apache.spark.rpc.{RpcEnv, RpcCallContext, RpcEndpoint}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.ThreadUtils
 import org.apache.spark.{Logging, MapOutputTracker, SparkEnv}
 import org.apache.spark.storage.BlockManagerMessages._
 
@@ -36,7 +36,7 @@ class BlockManagerSlaveEndpoint(
   extends RpcEndpoint with Logging {
 
   private val asyncThreadPool =
-    Utils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool")
+    ThreadUtils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool")
   private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool)
 
   // Operations that involve removing blocks may be slow and should be done asynchronously

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
new file mode 100644
index 0000000..098a4b7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.util
+
+import java.util.concurrent._
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+
+private[spark] object ThreadUtils {
+
+  /**
+   * Create a thread factory that names threads with a prefix and also sets the threads to daemon.
+   */
+  def namedThreadFactory(prefix: String): ThreadFactory = {
+    new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build()
+  }
+
+  /**
+   * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
+   * unique, sequentially assigned integer.
+   */
+  def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
+    val threadFactory = namedThreadFactory(prefix)
+    Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
+  }
+
+  /**
+   * Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
+   * unique, sequentially assigned integer.
+   */
+  def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
+    val threadFactory = namedThreadFactory(prefix)
+    Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
+  }
+
+  /**
+   * Wrapper over newSingleThreadExecutor.
+   */
+  def newDaemonSingleThreadExecutor(threadName: String): ExecutorService = {
+    val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
+    Executors.newSingleThreadExecutor(threadFactory)
+  }
+
+  /**
+   * Wrapper over newSingleThreadScheduledExecutor.
+   */
+  def newDaemonSingleThreadScheduledExecutor(threadName: String): ScheduledExecutorService = {
+    val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
+    Executors.newSingleThreadScheduledExecutor(threadFactory)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7b0de1a..2feb734 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -35,7 +35,6 @@ import scala.util.control.{ControlThrowable, NonFatal}
 
 import com.google.common.io.{ByteStreams, Files}
 import com.google.common.net.InetAddresses
-import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.commons.lang3.SystemUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
@@ -897,34 +896,6 @@ private[spark] object Utils extends Logging {
     hostPortParseResults.get(hostPort)
   }
 
-  private val daemonThreadFactoryBuilder: ThreadFactoryBuilder =
-    new ThreadFactoryBuilder().setDaemon(true)
-
-  /**
-   * Create a thread factory that names threads with a prefix and also sets the threads to daemon.
-   */
-  def namedThreadFactory(prefix: String): ThreadFactory = {
-    daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
-  }
-
-  /**
-   * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
-   * unique, sequentially assigned integer.
-   */
-  def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
-    val threadFactory = namedThreadFactory(prefix)
-    Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
-  }
-
-  /**
-   * Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
-   * unique, sequentially assigned integer.
-   */
-  def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
-    val threadFactory = namedThreadFactory(prefix)
-    Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
-  }
-
   /**
    * Return the string to tell how long has passed in milliseconds.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
new file mode 100644
index 0000000..a3aa3e9
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.util
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import org.scalatest.FunSuite
+
+class ThreadUtilsSuite extends FunSuite {
+
+  test("newDaemonSingleThreadExecutor") {
+    val executor = ThreadUtils.newDaemonSingleThreadExecutor("this-is-a-thread-name")
+    @volatile var threadName = ""
+    executor.submit(new Runnable {
+      override def run(): Unit = {
+        threadName = Thread.currentThread().getName()
+      }
+    })
+    executor.shutdown()
+    executor.awaitTermination(10, TimeUnit.SECONDS)
+    assert(threadName === "this-is-a-thread-name")
+  }
+
+  test("newDaemonSingleThreadScheduledExecutor") {
+    val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("this-is-a-thread-name")
+    try {
+      val latch = new CountDownLatch(1)
+      @volatile var threadName = ""
+      executor.schedule(new Runnable {
+        override def run(): Unit = {
+          threadName = Thread.currentThread().getName()
+          latch.countDown()
+        }
+      }, 1, TimeUnit.MILLISECONDS)
+      latch.await(10, TimeUnit.SECONDS)
+      assert(threadName === "this-is-a-thread-name")
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 4d26b64..cca0fac 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.util.Utils
+import org.apache.spark.util.ThreadUtils
 
 /**
  * Input stream that pulls messages from a Kafka Broker.
@@ -111,7 +111,8 @@ class KafkaReceiver[
     val topicMessageStreams = consumerConnector.createMessageStreams(
       topics, keyDecoder, valueDecoder)
 
-    val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
+    val executorPool =
+      ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
     try {
       // Start the messages handler for each partition
       topicMessageStreams.values.foreach { streams =>

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
index c4a44c1..ea87e96 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
@@ -33,7 +33,7 @@ import org.I0Itec.zkclient.ZkClient
 import org.apache.spark.{Logging, SparkEnv}
 import org.apache.spark.storage.{StorageLevel, StreamBlockId}
 import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.ThreadUtils
 
 /**
  * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
@@ -121,7 +121,7 @@ class ReliableKafkaReceiver[
     zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
       consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
 
-    messageHandlerThreadPool = Utils.newDaemonFixedThreadPool(
+    messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
       topics.values.sum, "KafkaMessageHandler")
 
     blockGenerator.start()

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index dcdc27d..297bf04 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.{Logging, SparkConf, SparkException}
 import org.apache.spark.storage._
 import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogManager}
-import org.apache.spark.util.{Clock, SystemClock, Utils}
+import org.apache.spark.util.{ThreadUtils, Clock, SystemClock}
 
 /** Trait that represents the metadata related to storage of blocks */
 private[streaming] trait ReceivedBlockStoreResult {
@@ -150,7 +150,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
   // For processing futures used in parallel block storing into block manager and write ahead log
   // # threads = 2, so that both writing to BM and WAL can proceed in parallel
   implicit private val executionContext = ExecutionContext.fromExecutorService(
-    Utils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))
+    ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))
 
   /**
    * This implementation stores the block into the block manager as well as a write ahead log.

http://git-wip-us.apache.org/repos/asf/spark/blob/33b85620/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
index 6bdfe45..38a93cc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
@@ -25,7 +25,7 @@ import scala.language.postfixOps
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.spark.Logging
-import org.apache.spark.util.{Clock, SystemClock, Utils}
+import org.apache.spark.util.{ThreadUtils, Clock, SystemClock}
 import WriteAheadLogManager._
 
 /**
@@ -60,7 +60,7 @@ private[streaming] class WriteAheadLogManager(
     if (callerName.nonEmpty) s" for $callerName" else ""
   private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
   implicit private val executionContext = ExecutionContext.fromExecutorService(
-    Utils.newDaemonFixedThreadPool(1, threadpoolName))
+    ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))
   override protected val logName = s"WriteAheadLogManager $callerNameTag"
 
   private var currentLogPath: Option[String] = None


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org