You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/03/10 02:34:35 UTC

spark git commit: [SPARK-13747][SQL] Fix concurrent query with fork-join pool

Repository: spark
Updated Branches:
  refs/heads/master dbf2a7cfa -> 37fcda3e6


[SPARK-13747][SQL] Fix concurrent query with fork-join pool

## What changes were proposed in this pull request?

Fix this use case, which was already fixed in SPARK-10548 in 1.6 but was broken in master due to #9264:

```
(1 to 100).par.foreach { _ => sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count() }
```

This threw `IllegalArgumentException` consistently before this patch. For more detail, see the JIRA.

## How was this patch tested?

New test in `SQLExecutionSuite`.

Author: Andrew Or <an...@databricks.com>

Closes #11586 from andrewor14/fix-concurrent-sql.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37fcda3e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37fcda3e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37fcda3e

Branch: refs/heads/master
Commit: 37fcda3e6cf1707fb7a348a4d47231849ef8abf6
Parents: dbf2a7c
Author: Andrew Or <an...@databricks.com>
Authored: Wed Mar 9 17:34:28 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Mar 9 17:34:28 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/DAGScheduler.scala     |  7 ++++++-
 .../spark/sql/execution/SQLExecutionSuite.scala       | 14 ++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/37fcda3e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index e2eaef5..b576d4c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -613,7 +613,12 @@ class DAGScheduler(
       properties: Properties): Unit = {
     val start = System.nanoTime
     val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
-    Await.ready(waiter.completionFuture, atMost = Duration.Inf)
+    // Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,
+    // which causes concurrent SQL executions to fail if a fork-join pool is used. Note that
+    // due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's
+    // safe to pass in null here. For more detail, see SPARK-13747.
+    val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+    waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
     waiter.completionFuture.value.get match {
       case scala.util.Success(_) =>
         logInfo("Job %d finished: %s, took %f s".format

http://git-wip-us.apache.org/repos/asf/spark/blob/37fcda3e/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
index 824d89e..c9f517c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
@@ -49,6 +49,20 @@ class SQLExecutionSuite extends SparkFunSuite {
     }
   }
 
+  test("concurrent query execution with fork-join pool (SPARK-13747)") {
+    val sc = new SparkContext("local[*]", "test")
+    val sqlContext = new SQLContext(sc)
+    import sqlContext.implicits._
+    try {
+      // Should not throw IllegalArgumentException
+      (1 to 100).par.foreach { _ =>
+        sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count()
+      }
+    } finally {
+      sc.stop()
+    }
+  }
+
   /**
    * Trigger SPARK-10548 by mocking a parent and its child thread executing queries concurrently.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org