You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/01/10 04:04:59 UTC

[1/2] git commit: Add some missing Java API methods

Updated Branches:
  refs/heads/master a9d533333 -> 4b074fac0


Add some missing Java API methods


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/142921c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/142921c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/142921c6

Branch: refs/heads/master
Commit: 142921c6c0d0d9134d5fa0e307575d8f6749fb9a
Parents: 26cdb5f
Author: Matei Zaharia <ma...@databricks.com>
Authored: Thu Jan 9 00:05:53 2014 -0800
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Thu Jan 9 18:11:12 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 12 ++++-
 .../apache/spark/api/java/JavaDoubleRDD.scala   | 12 +++--
 .../org/apache/spark/api/java/JavaPairRDD.scala |  6 +++
 .../org/apache/spark/api/java/JavaRDD.scala     |  6 +++
 .../org/apache/spark/api/java/JavaRDDLike.scala |  6 +++
 .../spark/api/java/JavaSparkContext.scala       | 54 +++++++++++++++++++-
 6 files changed, 90 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/142921c6/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f91392b..66c226e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -244,6 +244,10 @@ class SparkContext(
     localProperties.set(new Properties())
   }
 
+  /**
+   * Set a local property that affects jobs submitted from this thread, such as the
+   * Spark fair scheduler pool.
+   */
   def setLocalProperty(key: String, value: String) {
     if (localProperties.get() == null) {
       localProperties.set(new Properties())
@@ -255,6 +259,10 @@ class SparkContext(
     }
   }
 
+  /**
+   * Get a local property set in this thread, or null if it is missing. See
+   * [[org.apache.spark.SparkContext.setLocalProperty]].
+   */
   def getLocalProperty(key: String): String =
     Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
 
@@ -265,7 +273,7 @@ class SparkContext(
   }
 
   /**
-   * Assigns a group id to all the jobs started by this thread until the group id is set to a
+   * Assigns a group ID to all the jobs started by this thread until the group ID is set to a
    * different value or cleared.
    *
    * Often, a unit of execution in an application consists of multiple Spark actions or jobs.
@@ -288,7 +296,7 @@ class SparkContext(
     setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
   }
 
-  /** Clear the job group id and its description. */
+  /** Clear the current thread's job group ID and its description. */
   def clearJobGroup() {
     setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null)
     setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/142921c6/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index da30cf6..b0dedc6 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -207,13 +207,13 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
    *  e.g. for the array
    *  [1,10,20,50] the buckets are [1,10) [10,20) [20,50]
    *  e.g 1<=x<10 , 10<=x<20, 20<=x<50
-   *  And on the input of 1 and 50 we would have a histogram of 1,0,0 
-   * 
+   *  And on the input of 1 and 50 we would have a histogram of 1,0,0
+   *
    * Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can be switched
    * from an O(log n) inseration to O(1) per element. (where n = # buckets) if you set evenBuckets
    * to true.
    * buckets must be sorted and not contain any duplicates.
-   * buckets array must be at least two elements 
+   * buckets array must be at least two elements
    * All NaN entries are treated the same. If you have a NaN bucket it must be
    * the maximum value of the last position and all NaN entries will be counted
    * in that bucket.
@@ -225,6 +225,12 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
   def histogram(buckets: Array[Double], evenBuckets: Boolean): Array[Long] = {
     srdd.histogram(buckets.map(_.toDouble), evenBuckets)
   }
+
+  /** Assign a name to this RDD */
+  def setName(name: String): JavaDoubleRDD = {
+    srdd.setName(name)
+    this
+  }
 }
 
 object JavaDoubleRDD {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/142921c6/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 55c8745..0fb7e19 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -647,6 +647,12 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
   def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
     rdd.countApproxDistinctByKey(relativeSD, numPartitions)
   }
+
+  /** Assign a name to this RDD */
+  def setName(name: String): JavaPairRDD[K, V] = {
+    rdd.setName(name)
+    this
+  }
 }
 
 object JavaPairRDD {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/142921c6/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 037cd1c..7d48ce0 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -127,6 +127,12 @@ JavaRDDLike[T, JavaRDD[T]] {
     wrapRDD(rdd.subtract(other, p))
 
   override def toString = rdd.toString
+
+  /** Assign a name to this RDD */
+  def setName(name: String): JavaRDD[T] = {
+    rdd.setName(name)
+    this
+  }
 }
 
 object JavaRDD {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/142921c6/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 924d8af..ebbbbd8 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -245,6 +245,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   }
 
   /**
+   * Return an array that contains all of the elements in this RDD.
+   */
+  def toArray(): JList[T] = collect()
+
+  /**
    * Return an array that contains all of the elements in a specific partition of this RDD.
    */
   def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = {
@@ -455,4 +460,5 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
 
+  def name(): String = rdd.name
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/142921c6/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index e93b10f..7a6f044 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -425,6 +425,51 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
   def clearCallSite() {
     sc.clearCallSite()
   }
+
+  /**
+   * Set a local property that affects jobs submitted from this thread, such as the
+   * Spark fair scheduler pool.
+   */
+  def setLocalProperty(key: String, value: String): Unit = sc.setLocalProperty(key, value)
+
+  /**
+   * Get a local property set in this thread, or null if it is missing. See
+   * [[org.apache.spark.api.java.JavaSparkContext.setLocalProperty]].
+   */
+  def getLocalProperty(key: String): String = sc.getLocalProperty(key)
+
+  /**
+   * Assigns a group ID to all the jobs started by this thread until the group ID is set to a
+   * different value or cleared.
+   *
+   * Often, a unit of execution in an application consists of multiple Spark actions or jobs.
+   * Application programmers can use this method to group all those jobs together and give a
+   * group description. Once set, the Spark web UI will associate such jobs with this group.
+   *
+   * The application can also use [[org.apache.spark.api.java.JavaSparkContext.cancelJobGroup]]
+   * to cancel all running jobs in this group. For example,
+   * {{{
+   * // In the main thread:
+   * sc.setJobGroup("some_job_to_cancel", "some job description");
+   * rdd.map(...).count();
+   *
+   * // In a separate thread:
+   * sc.cancelJobGroup("some_job_to_cancel");
+   * }}}
+   */
+  def setJobGroup(groupId: String, description: String): Unit = sc.setJobGroup(groupId, description)
+
+  /** Clear the current thread's job group ID and its description. */
+  def clearJobGroup(): Unit = sc.clearJobGroup()
+
+  /**
+   * Cancel active jobs for the specified group. See
+   * [[org.apache.spark.api.java.JavaSparkContext.setJobGroup]] for more information.
+   */
+  def cancelJobGroup(groupId: String): Unit = sc.cancelJobGroup(groupId)
+
+  /** Cancel all jobs that have been scheduled or are running. */
+  def cancelAllJobs(): Unit = sc.cancelAllJobs()
 }
 
 object JavaSparkContext {
@@ -436,5 +481,12 @@ object JavaSparkContext {
    * Find the JAR from which a given class was loaded, to make it easy for users to pass
    * their JARs to SparkContext.
    */
-  def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray
+  def jarOfClass(cls: Class[_]): Array[String] = SparkContext.jarOfClass(cls).toArray
+
+  /**
+   * Find the JAR that contains the class of a particular object, to make it easy for users
+   * to pass their JARs to SparkContext. In most cases you can call jarOfObject(this) in
+   * your driver program.
+   */
+  def jarOfObject(obj: AnyRef): Array[String] = SparkContext.jarOfObject(obj).toArray
 }


[2/2] git commit: Merge pull request #374 from mateiz/completeness

Posted by rx...@apache.org.
Merge pull request #374 from mateiz/completeness

Add some missing Java API methods

These are primarily for setting job groups, canceling jobs, and setting names on RDDs. Seemed like useful stuff to expose in Java.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4b074fac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4b074fac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4b074fac

Branch: refs/heads/master
Commit: 4b074fac054848ebd3397a3cce0a3e7871d3860c
Parents: a9d5333 142921c
Author: Reynold Xin <rx...@apache.org>
Authored: Thu Jan 9 19:03:55 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Jan 9 19:03:55 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 12 ++++-
 .../apache/spark/api/java/JavaDoubleRDD.scala   | 12 +++--
 .../org/apache/spark/api/java/JavaPairRDD.scala |  6 +++
 .../org/apache/spark/api/java/JavaRDD.scala     |  6 +++
 .../org/apache/spark/api/java/JavaRDDLike.scala |  6 +++
 .../spark/api/java/JavaSparkContext.scala       | 54 +++++++++++++++++++-
 6 files changed, 90 insertions(+), 6 deletions(-)
----------------------------------------------------------------------