You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/04/27 20:55:07 UTC
spark git commit: [SPARK-20461][CORE][SS] Use UninterruptibleThread
for Executor and fix the potential hang in CachedKafkaConsumer
Repository: spark
Updated Branches:
refs/heads/master 606432a13 -> 01c999e7f
[SPARK-20461][CORE][SS] Use UninterruptibleThread for Executor and fix the potential hang in CachedKafkaConsumer
## What changes were proposed in this pull request?
This PR changes Executor's threads to `UninterruptibleThread` so that we can use `runUninterruptibly` in `CachedKafkaConsumer`. However, this is just best effort to avoid hanging forever. If the user uses`CachedKafkaConsumer` in another thread (e.g., create a new thread or Future), the potential hang may still happen.
## How was this patch tested?
The new added test.
Author: Shixiong Zhu <sh...@databricks.com>
Closes #17761 from zsxwing/int.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01c999e7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01c999e7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01c999e7
Branch: refs/heads/master
Commit: 01c999e7f94d5e6c2fce67304dc62351dfbdf963
Parents: 606432a
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Thu Apr 27 13:55:03 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Apr 27 13:55:03 2017 -0700
----------------------------------------------------------------------
.../org/apache/spark/executor/Executor.scala | 19 +++++++++++++++++--
.../spark/util/UninterruptibleThread.scala | 8 +++++++-
.../apache/spark/executor/ExecutorSuite.scala | 13 +++++++++++++
.../spark/sql/kafka010/CachedKafkaConsumer.scala | 15 +++++++++++++--
4 files changed, 50 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/01c999e7/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 18f0439..51b6c37 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -23,13 +23,15 @@ import java.lang.management.ManagementFactory
import java.net.{URI, URL}
import java.nio.ByteBuffer
import java.util.Properties
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent._
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.control.NonFatal
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
@@ -84,7 +86,20 @@ private[spark] class Executor(
}
// Start worker thread pool
- private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
+ private val threadPool = {
+ val threadFactory = new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("Executor task launch worker-%d")
+ .setThreadFactory(new ThreadFactory {
+ override def newThread(r: Runnable): Thread =
+ // Use UninterruptibleThread to run tasks so that we can allow running codes without being
+ // interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622,
+ // will hang forever if some methods are interrupted.
+ new UninterruptibleThread(r, "unused") // thread name will be set by ThreadFactoryBuilder
+ })
+ .build()
+ Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
+ }
private val executorSource = new ExecutorSource(threadPool, executorId)
// Pool used for threads that supervise task killing / cancellation
private val taskReaperPool = ThreadUtils.newDaemonCachedThreadPool("Task reaper")
http://git-wip-us.apache.org/repos/asf/spark/blob/01c999e7/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala
index f0b68f0..27922b3 100644
--- a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala
+++ b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala
@@ -27,7 +27,13 @@ import javax.annotation.concurrent.GuardedBy
*
* Note: "runUninterruptibly" should be called only in `this` thread.
*/
-private[spark] class UninterruptibleThread(name: String) extends Thread(name) {
+private[spark] class UninterruptibleThread(
+ target: Runnable,
+ name: String) extends Thread(target, name) {
+
+ def this(name: String) {
+ this(null, name)
+ }
/** A monitor to protect "uninterruptible" and "interrupted" */
private val uninterruptibleLock = new Object
http://git-wip-us.apache.org/repos/asf/spark/blob/01c999e7/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index f47e574..efcad14 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -44,6 +44,7 @@ import org.apache.spark.scheduler.{FakeTask, ResultTask, TaskDescription}
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.UninterruptibleThread
class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar with Eventually {
@@ -158,6 +159,18 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug
assert(failReason.isInstanceOf[FetchFailed])
}
+ test("Executor's worker threads should be UninterruptibleThread") {
+ val conf = new SparkConf()
+ .setMaster("local")
+ .setAppName("executor thread test")
+ .set("spark.ui.enabled", "false")
+ sc = new SparkContext(conf)
+ val executorThread = sc.parallelize(Seq(1), 1).map { _ =>
+ Thread.currentThread.getClass.getName
+ }.collect().head
+ assert(executorThread === classOf[UninterruptibleThread].getName)
+ }
+
test("SPARK-19276: OOMs correctly handled with a FetchFailure") {
// when there is a fatal error like an OOM, we don't do normal fetch failure handling, since it
// may be a false positive. And we should call the uncaught exception handler.
http://git-wip-us.apache.org/repos/asf/spark/blob/01c999e7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index 6d76904..bf6c090 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.kafka010.KafkaSource._
+import org.apache.spark.util.UninterruptibleThread
/**
@@ -62,11 +63,20 @@ private[kafka010] case class CachedKafkaConsumer private(
case class AvailableOffsetRange(earliest: Long, latest: Long)
+ private def runUninterruptiblyIfPossible[T](body: => T): T = Thread.currentThread match {
+ case ut: UninterruptibleThread =>
+ ut.runUninterruptibly(body)
+ case _ =>
+ logWarning("CachedKafkaConsumer is not running in UninterruptibleThread. " +
+ "It may hang when CachedKafkaConsumer's methods are interrupted because of KAFKA-1894")
+ body
+ }
+
/**
* Return the available offset range of the current partition. It's a pair of the earliest offset
* and the latest offset.
*/
- def getAvailableOffsetRange(): AvailableOffsetRange = {
+ def getAvailableOffsetRange(): AvailableOffsetRange = runUninterruptiblyIfPossible {
consumer.seekToBeginning(Set(topicPartition).asJava)
val earliestOffset = consumer.position(topicPartition)
consumer.seekToEnd(Set(topicPartition).asJava)
@@ -92,7 +102,8 @@ private[kafka010] case class CachedKafkaConsumer private(
offset: Long,
untilOffset: Long,
pollTimeoutMs: Long,
- failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
+ failOnDataLoss: Boolean):
+ ConsumerRecord[Array[Byte], Array[Byte]] = runUninterruptiblyIfPossible {
require(offset < untilOffset,
s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]")
logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org