You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/09/02 08:28:24 UTC

git commit: SPARK-2636: Expose job ID in JobWaiter API

Repository: spark
Updated Branches:
  refs/heads/master 44d3a6a75 -> fbf2678c1


SPARK-2636: Expose job ID in JobWaiter API

This PR adds the async actions to the Java API. User can call these async actions to get the FutureAction and use JobWaiter (for SimpleFutureAction) to retrieve job Id.

Author: lirui <ru...@intel.com>

Closes #2176 from lirui-intel/SPARK-2636 and squashes the following commits:

ccaafb7 [lirui] SPARK-2636: fix java doc
5536d55 [lirui] SPARK-2636: mark the async API as experimental
e2e01d5 [lirui] SPARK-2636: add mima exclude
0ca320d [lirui] SPARK-2636: fix method name & javadoc
3fa39f7 [lirui] SPARK-2636: refine the patch
af4f5d9 [lirui] SPARK-2636: remove unused imports
843276c [lirui] SPARK-2636: only keep foreachAsync in the java API
fbf5744 [lirui] SPARK-2636: add more async actions for java api
1b25abc [lirui] SPARK-2636: expose some fields in JobWaiter
d09f732 [lirui] SPARK-2636: fix build
eb1ee79 [lirui] SPARK-2636: change some parameters in SimpleFutureAction to member field
6e2b87b [lirui] SPARK-2636: add java API for async actions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbf2678c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbf2678c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbf2678c

Branch: refs/heads/master
Commit: fbf2678c16acc0071ebd1cbdd165702635be5f0c
Parents: 44d3a6a
Author: lirui <ru...@intel.com>
Authored: Mon Sep 1 23:28:19 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Mon Sep 1 23:28:19 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/FutureAction.scala   |  3 +++
 .../org/apache/spark/api/java/JavaRDDLike.scala      | 15 ++++++++++++++-
 .../scala/org/apache/spark/rdd/AsyncRDDActions.scala |  3 ++-
 .../scala/org/apache/spark/scheduler/JobWaiter.scala |  2 +-
 project/MimaExcludes.scala                           |  3 +++
 5 files changed, 23 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fbf2678c/core/src/main/scala/org/apache/spark/FutureAction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 1e4dec8..75ea535 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -149,6 +149,9 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
       case JobFailed(e: Exception) => scala.util.Failure(e)
     }
   }
+
+  /** Get the corresponding job id for this action. */
+  def jobId = jobWaiter.jobId
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fbf2678c/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index f917cfd..545bc0e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
 import com.google.common.base.Optional
 import org.apache.hadoop.io.compress.CompressionCodec
 
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext}
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.JavaPairRDD._
 import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
@@ -574,4 +574,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
 
   def name(): String = rdd.name
 
+  /**
+   * :: Experimental ::
+   * The asynchronous version of the foreach action.
+   *
+   * @param f the function to apply to all the elements of the RDD
+   * @return a FutureAction for the action
+   */
+  @Experimental
+  def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = {
+    import org.apache.spark.SparkContext._
+    rdd.foreachAsync(x => f.call(x))
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fbf2678c/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index aed951a..b62f3fb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -112,7 +112,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
    * Applies a function f to all elements of this RDD.
    */
   def foreachAsync(f: T => Unit): FutureAction[Unit] = {
-    self.context.submitJob[T, Unit, Unit](self, _.foreach(f), Range(0, self.partitions.size),
+    val cleanF = self.context.clean(f)
+    self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size),
       (index, data) => Unit, Unit)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fbf2678c/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
index e9bfee2..29879b3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -23,7 +23,7 @@ package org.apache.spark.scheduler
  */
 private[spark] class JobWaiter[T](
     dagScheduler: DAGScheduler,
-    jobId: Int,
+    val jobId: Int,
     totalTasks: Int,
     resultHandler: (Int, T) => Unit)
   extends JobListener {

http://git-wip-us.apache.org/repos/asf/spark/blob/fbf2678c/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index fe8ffe6..a2f1b35 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -41,6 +41,9 @@ object MimaExcludes {
           Seq(
             // Adding new method to JavaRDLike trait - we should probably mark this as a developer API.
             ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"),
+            // Should probably mark this as Experimental
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.java.JavaRDDLike.foreachAsync"),
             // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
             // for countApproxDistinct* functions, which does not work in Java. We later removed
             // them, and use the following to tell Mima to not care about them.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org