You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/12/06 08:30:18 UTC
git commit: Merge pull request #232 from markhamstra/FiniteWait
Updated Branches:
refs/heads/branch-0.8 17ca8a1c5 -> d77c3371b
Merge pull request #232 from markhamstra/FiniteWait
jobWaiter.synchronized before jobWaiter.wait
...else ``IllegalMonitorStateException`` in ``SimpleFutureAction#ready``.
(cherry picked from commit 078049877e123fe7e4c4553e36055de572cab7c4)
Signed-off-by: Reynold Xin <rx...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d77c3371
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d77c3371
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d77c3371
Branch: refs/heads/branch-0.8
Commit: d77c3371b6b189351a2c23cb533777bbd08684c8
Parents: 17ca8a1
Author: Reynold Xin <rx...@apache.org>
Authored: Thu Dec 5 23:29:42 2013 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Dec 5 23:30:11 2013 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/FutureAction.scala | 2 +-
.../org/apache/spark/scheduler/JobWaiter.scala | 1 +
.../apache/spark/rdd/AsyncRDDActionsSuite.scala | 26 ++++++++++++++++++++
3 files changed, 28 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d77c3371/core/src/main/scala/org/apache/spark/FutureAction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 1ad9240..c6b4ac5 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -99,7 +99,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = {
if (!atMost.isFinite()) {
awaitResult()
- } else {
+ } else jobWaiter.synchronized {
val finishTime = System.currentTimeMillis() + atMost.toMillis
while (!isCompleted) {
val time = System.currentTimeMillis()
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d77c3371/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
index 58f238d..b026f86 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -31,6 +31,7 @@ private[spark] class JobWaiter[T](
private var finishedTasks = 0
// Is the job as a whole finished (succeeded or failed)?
+ @volatile
private var _jobFinished = totalTasks == 0
def jobFinished = _jobFinished
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d77c3371/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
index da032b1..0d4c10d 100644
--- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.rdd
import java.util.concurrent.Semaphore
+import scala.concurrent.{Await, TimeoutException}
+import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global
import org.scalatest.{BeforeAndAfterAll, FunSuite}
@@ -173,4 +175,28 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll with Timeouts
sem.acquire(2)
}
}
+
+ /**
+ * Awaiting FutureAction results
+ */
+ test("FutureAction result, infinite wait") {
+ val f = sc.parallelize(1 to 100, 4)
+ .countAsync()
+ assert(Await.result(f, Duration.Inf) === 100)
+ }
+
+ test("FutureAction result, finite wait") {
+ val f = sc.parallelize(1 to 100, 4)
+ .countAsync()
+ assert(Await.result(f, Duration(30, "seconds")) === 100)
+ }
+
+ test("FutureAction result, timeout") {
+ val f = sc.parallelize(1 to 100, 4)
+ .mapPartitions(itr => { Thread.sleep(20); itr })
+ .countAsync()
+ intercept[TimeoutException] {
+ Await.result(f, Duration(20, "milliseconds"))
+ }
+ }
}