You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/04/27 20:55:07 UTC

spark git commit: [SPARK-20461][CORE][SS] Use UninterruptibleThread for Executor and fix the potential hang in CachedKafkaConsumer

Repository: spark
Updated Branches:
  refs/heads/master 606432a13 -> 01c999e7f


[SPARK-20461][CORE][SS] Use UninterruptibleThread for Executor and fix the potential hang in CachedKafkaConsumer

## What changes were proposed in this pull request?

This PR changes Executor's threads to `UninterruptibleThread` so that we can use `runUninterruptibly` in `CachedKafkaConsumer`. However, this is just best effort to avoid hanging forever. If the user uses`CachedKafkaConsumer` in another thread (e.g., create a new thread or Future), the potential hang may still happen.

## How was this patch tested?

The new added test.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #17761 from zsxwing/int.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01c999e7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01c999e7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01c999e7

Branch: refs/heads/master
Commit: 01c999e7f94d5e6c2fce67304dc62351dfbdf963
Parents: 606432a
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Thu Apr 27 13:55:03 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Apr 27 13:55:03 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/executor/Executor.scala     | 19 +++++++++++++++++--
 .../spark/util/UninterruptibleThread.scala       |  8 +++++++-
 .../apache/spark/executor/ExecutorSuite.scala    | 13 +++++++++++++
 .../spark/sql/kafka010/CachedKafkaConsumer.scala | 15 +++++++++++++--
 4 files changed, 50 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/01c999e7/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 18f0439..51b6c37 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -23,13 +23,15 @@ import java.lang.management.ManagementFactory
 import java.net.{URI, URL}
 import java.nio.ByteBuffer
 import java.util.Properties
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent._
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
 import scala.util.control.NonFatal
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
@@ -84,7 +86,20 @@ private[spark] class Executor(
   }
 
   // Start worker thread pool
-  private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
+  private val threadPool = {
+    val threadFactory = new ThreadFactoryBuilder()
+      .setDaemon(true)
+      .setNameFormat("Executor task launch worker-%d")
+      .setThreadFactory(new ThreadFactory {
+        override def newThread(r: Runnable): Thread =
+          // Use UninterruptibleThread to run tasks so that we can allow running codes without being
+          // interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622,
+          // will hang forever if some methods are interrupted.
+          new UninterruptibleThread(r, "unused") // thread name will be set by ThreadFactoryBuilder
+      })
+      .build()
+    Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
+  }
   private val executorSource = new ExecutorSource(threadPool, executorId)
   // Pool used for threads that supervise task killing / cancellation
   private val taskReaperPool = ThreadUtils.newDaemonCachedThreadPool("Task reaper")

http://git-wip-us.apache.org/repos/asf/spark/blob/01c999e7/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala
index f0b68f0..27922b3 100644
--- a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala
+++ b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala
@@ -27,7 +27,13 @@ import javax.annotation.concurrent.GuardedBy
  *
  * Note: "runUninterruptibly" should be called only in `this` thread.
  */
-private[spark] class UninterruptibleThread(name: String) extends Thread(name) {
+private[spark] class UninterruptibleThread(
+    target: Runnable,
+    name: String) extends Thread(target, name) {
+
+  def this(name: String) {
+    this(null, name)
+  }
 
   /** A monitor to protect "uninterruptible" and "interrupted" */
   private val uninterruptibleLock = new Object

http://git-wip-us.apache.org/repos/asf/spark/blob/01c999e7/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index f47e574..efcad14 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -44,6 +44,7 @@ import org.apache.spark.scheduler.{FakeTask, ResultTask, TaskDescription}
 import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.UninterruptibleThread
 
 class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar with Eventually {
 
@@ -158,6 +159,18 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug
     assert(failReason.isInstanceOf[FetchFailed])
   }
 
+  test("Executor's worker threads should be UninterruptibleThread") {
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName("executor thread test")
+      .set("spark.ui.enabled", "false")
+    sc = new SparkContext(conf)
+    val executorThread = sc.parallelize(Seq(1), 1).map { _ =>
+      Thread.currentThread.getClass.getName
+    }.collect().head
+    assert(executorThread === classOf[UninterruptibleThread].getName)
+  }
+
   test("SPARK-19276: OOMs correctly handled with a FetchFailure") {
     // when there is a fatal error like an OOM, we don't do normal fetch failure handling, since it
     // may be a false positive.  And we should call the uncaught exception handler.

http://git-wip-us.apache.org/repos/asf/spark/blob/01c999e7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index 6d76904..bf6c090 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.spark.{SparkEnv, SparkException, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.kafka010.KafkaSource._
+import org.apache.spark.util.UninterruptibleThread
 
 
 /**
@@ -62,11 +63,20 @@ private[kafka010] case class CachedKafkaConsumer private(
 
   case class AvailableOffsetRange(earliest: Long, latest: Long)
 
+  private def runUninterruptiblyIfPossible[T](body: => T): T = Thread.currentThread match {
+    case ut: UninterruptibleThread =>
+      ut.runUninterruptibly(body)
+    case _ =>
+      logWarning("CachedKafkaConsumer is not running in UninterruptibleThread. " +
+        "It may hang when CachedKafkaConsumer's methods are interrupted because of KAFKA-1894")
+      body
+  }
+
   /**
    * Return the available offset range of the current partition. It's a pair of the earliest offset
    * and the latest offset.
    */
-  def getAvailableOffsetRange(): AvailableOffsetRange = {
+  def getAvailableOffsetRange(): AvailableOffsetRange = runUninterruptiblyIfPossible {
     consumer.seekToBeginning(Set(topicPartition).asJava)
     val earliestOffset = consumer.position(topicPartition)
     consumer.seekToEnd(Set(topicPartition).asJava)
@@ -92,7 +102,8 @@ private[kafka010] case class CachedKafkaConsumer private(
       offset: Long,
       untilOffset: Long,
       pollTimeoutMs: Long,
-      failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
+      failOnDataLoss: Boolean):
+    ConsumerRecord[Array[Byte], Array[Byte]] = runUninterruptiblyIfPossible {
     require(offset < untilOffset,
       s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]")
     logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org