You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/12/06 08:29:47 UTC

[1/3] git commit: jobWaiter.synchronized before jobWaiter.wait

Updated Branches:
  refs/heads/master 5d460253d -> 078049877


jobWaiter.synchronized before jobWaiter.wait


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/aebb123f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/aebb123f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/aebb123f

Branch: refs/heads/master
Commit: aebb123fd3b4bf0d57d867f33ca0325340ee42e4
Parents: 5d46025
Author: Mark Hamstra <ma...@gmail.com>
Authored: Thu Dec 5 17:16:44 2013 -0800
Committer: Mark Hamstra <ma...@gmail.com>
Committed: Thu Dec 5 17:16:44 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/FutureAction.scala        | 2 +-
 core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala | 1 +
 2 files changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/aebb123f/core/src/main/scala/org/apache/spark/FutureAction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 1ad9240..c6b4ac5 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -99,7 +99,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
   override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = {
     if (!atMost.isFinite()) {
       awaitResult()
-    } else {
+    } else jobWaiter.synchronized {
       val finishTime = System.currentTimeMillis() + atMost.toMillis
       while (!isCompleted) {
         val time = System.currentTimeMillis()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/aebb123f/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
index 58f238d..b026f86 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -31,6 +31,7 @@ private[spark] class JobWaiter[T](
   private var finishedTasks = 0
 
   // Is the job as a whole finished (succeeded or failed)?
+  @volatile
   private var _jobFinished = totalTasks == 0
 
   def jobFinished = _jobFinished


[2/3] git commit: FutureAction result tests

Posted by rx...@apache.org.
FutureAction result tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ee888f6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ee888f6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ee888f6b

Branch: refs/heads/master
Commit: ee888f6b251c4f06f2edf15267d12e42e28fd22f
Parents: aebb123
Author: Mark Hamstra <ma...@gmail.com>
Authored: Thu Dec 5 21:53:40 2013 -0800
Committer: Mark Hamstra <ma...@gmail.com>
Committed: Thu Dec 5 23:01:18 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/rdd/AsyncRDDActionsSuite.scala | 26 ++++++++++++++++++++
 1 file changed, 26 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ee888f6b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
index da032b1..0d4c10d 100644
--- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.rdd
 
 import java.util.concurrent.Semaphore
 
+import scala.concurrent.{Await, TimeoutException}
+import scala.concurrent.duration.Duration
 import scala.concurrent.ExecutionContext.Implicits.global
 
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
@@ -173,4 +175,28 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll with Timeouts
       sem.acquire(2)
     }
   }
+
+  /**
+   * Awaiting FutureAction results
+   */
+  test("FutureAction result, infinite wait") {
+    val f = sc.parallelize(1 to 100, 4)
+              .countAsync()
+    assert(Await.result(f, Duration.Inf) === 100)
+  }
+
+  test("FutureAction result, finite wait") {
+    val f = sc.parallelize(1 to 100, 4)
+              .countAsync()
+    assert(Await.result(f, Duration(30, "seconds")) === 100)
+  }
+
+  test("FutureAction result, timeout") {
+    val f = sc.parallelize(1 to 100, 4)
+              .mapPartitions(itr => { Thread.sleep(20); itr })
+              .countAsync()
+    intercept[TimeoutException] {
+      Await.result(f, Duration(20, "milliseconds"))
+    }
+  }
 }


[3/3] git commit: Merge pull request #232 from markhamstra/FiniteWait

Posted by rx...@apache.org.
Merge pull request #232 from markhamstra/FiniteWait

jobWaiter.synchronized before jobWaiter.wait

...else ``IllegalMonitorStateException`` in ``SimpleFutureAction#ready``.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/07804987
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/07804987
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/07804987

Branch: refs/heads/master
Commit: 078049877e123fe7e4c4553e36055de572cab7c4
Parents: 5d46025 ee888f6
Author: Reynold Xin <rx...@apache.org>
Authored: Thu Dec 5 23:29:42 2013 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Dec 5 23:29:42 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/FutureAction.scala   |  2 +-
 .../org/apache/spark/scheduler/JobWaiter.scala  |  1 +
 .../apache/spark/rdd/AsyncRDDActionsSuite.scala | 26 ++++++++++++++++++++
 3 files changed, 28 insertions(+), 1 deletion(-)
----------------------------------------------------------------------