You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/10/26 17:36:40 UTC
spark git commit: [SPARK-13747][SQL] Fix concurrent executions in
ForkJoinPool for SQL
Repository: spark
Updated Branches:
refs/heads/master 312ea3f7f -> 7ac70e7ba
[SPARK-13747][SQL] Fix concurrent executions in ForkJoinPool for SQL
## What changes were proposed in this pull request?
Calling `Await.result` will allow other tasks to be run on the same thread when using ForkJoinPool. However, SQL uses a `ThreadLocal` execution id to trace Spark jobs launched by a query, which doesn't work perfectly in ForkJoinPool.
This PR just uses `Awaitable.result` instead to prevent ForkJoinPool from running other tasks in the current waiting thread.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <sh...@databricks.com>
Closes #15520 from zsxwing/SPARK-13747.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ac70e7b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ac70e7b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ac70e7b
Branch: refs/heads/master
Commit: 7ac70e7ba8d610a45c21a70dc28e4c989c19451b
Parents: 312ea3f
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Oct 26 10:36:36 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Oct 26 10:36:36 2016 -0700
----------------------------------------------------------------------
.../org/apache/spark/util/ThreadUtils.scala | 21 ++++++++++++++++++++
scalastyle-config.xml | 1 +
.../sql/execution/basicPhysicalOperators.scala | 2 +-
.../exchange/BroadcastExchangeExec.scala | 3 ++-
4 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7ac70e7b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 5a6dbc8..d093e7b 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -194,4 +194,25 @@ private[spark] object ThreadUtils {
throw new SparkException("Exception thrown in awaitResult: ", t)
}
}
+
+ /**
+ * Calls [[Awaitable.result]] directly to avoid using `ForkJoinPool`'s `BlockingContext`, wraps
+ * and re-throws any exceptions with nice stack track.
+ *
+ * Codes running in the user's thread may be in a thread of Scala ForkJoinPool. As concurrent
+ * executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this method
+ * basically prevents ForkJoinPool from running other tasks in the current waiting thread.
+ */
+ @throws(classOf[SparkException])
+ def awaitResultInForkJoinSafely[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ try {
+ // `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
+ // See SPARK-13747.
+ val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+ awaitable.result(Duration.Inf)(awaitPermission)
+ } catch {
+ case NonFatal(t) =>
+ throw new SparkException("Exception thrown in awaitResult: ", t)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7ac70e7b/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 7fe0697..81d57d7 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -200,6 +200,7 @@ This file is divided into 3 sections:
// scalastyle:off awaitresult
Await.result(...)
// scalastyle:on awaitresult
+ If your codes use ThreadLocal and may run in threads created by the user, use ThreadUtils.awaitResultInForkJoinSafely instead.
]]></customMessage>
</check>
http://git-wip-us.apache.org/repos/asf/spark/blob/7ac70e7b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 37d750e..a5291e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -570,7 +570,7 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
}
override def executeCollect(): Array[InternalRow] = {
- ThreadUtils.awaitResult(relationFuture, Duration.Inf)
+ ThreadUtils.awaitResultInForkJoinSafely(relationFuture, Duration.Inf)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7ac70e7b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index 7be5d31..ce5013d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -128,7 +128,8 @@ case class BroadcastExchangeExec(
}
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
- ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
+ ThreadUtils.awaitResultInForkJoinSafely(relationFuture, timeout)
+ .asInstanceOf[broadcast.Broadcast[T]]
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org