You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/12/02 10:40:14 UTC

spark git commit: [SPARK-3580][CORE] Add Consistent Method To Get Number of RDD Partitions Across Different Languages

Repository: spark
Updated Branches:
  refs/heads/master 4375eb3f4 -> 128c29035


[SPARK-3580][CORE] Add Consistent Method To Get Number of RDD Partitions Across Different Languages

I have tried to address all the comments in pull request https://github.com/apache/spark/pull/2447.

Note that the second commit (using the new method in all internal code of all components) is quite intrusive and could be omitted.

Author: Jeroen Schot <je...@surfsara.nl>

Closes #9767 from schot/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/128c2903
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/128c2903
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/128c2903

Branch: refs/heads/master
Commit: 128c29035b4e7383cc3a9a6c7a9ab6136205ac6c
Parents: 4375eb3
Author: Jeroen Schot <je...@surfsara.nl>
Authored: Wed Dec 2 09:40:07 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Dec 2 09:40:07 2015 +0000

----------------------------------------------------------------------
 .../scala/org/apache/spark/api/java/JavaRDDLike.scala  |  5 +++++
 core/src/main/scala/org/apache/spark/rdd/RDD.scala     |  8 +++++++-
 core/src/test/java/org/apache/spark/JavaAPISuite.java  | 13 +++++++++++++
 .../src/test/scala/org/apache/spark/rdd/RDDSuite.scala |  1 +
 project/MimaExcludes.scala                             |  4 ++++
 5 files changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/128c2903/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 1e9d4f1..0e4d7dc 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -28,6 +28,7 @@ import com.google.common.base.Optional
 import org.apache.hadoop.io.compress.CompressionCodec
 
 import org.apache.spark._
+import org.apache.spark.annotation.Since
 import org.apache.spark.api.java.JavaPairRDD._
 import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
 import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
@@ -62,6 +63,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   /** Set of partitions in this RDD. */
   def partitions: JList[Partition] = rdd.partitions.toSeq.asJava
 
+  /** Return the number of partitions in this RDD. */
+  @Since("1.6.0")
+  def getNumPartitions: Int = rdd.getNumPartitions
+
   /** The partitioner of this RDD. */
   def partitioner: Optional[Partitioner] = JavaUtils.optionToOptional(rdd.partitioner)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/128c2903/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 8b3731d..9fe9d83 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.TextOutputFormat
 
 import org.apache.spark._
 import org.apache.spark.Partitioner._
-import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.annotation.{Since, DeveloperApi}
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.partial.BoundedDouble
 import org.apache.spark.partial.CountEvaluator
@@ -243,6 +243,12 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
+    * Returns the number of partitions of this RDD.
+    */
+  @Since("1.6.0")
+  final def getNumPartitions: Int = partitions.length
+
+  /**
    * Get the preferred locations of a partition, taking into account whether the
    * RDD is checkpointed.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/128c2903/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 4d4e982..11f1248 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -973,6 +973,19 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
   }
 
+  @Test
+  public void getNumPartitions(){
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
+    JavaDoubleRDD rdd2 = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0), 2);
+    JavaPairRDD<String, Integer> rdd3 = sc.parallelizePairs(Arrays.asList(
+            new Tuple2<>("a", 1),
+            new Tuple2<>("aa", 2),
+            new Tuple2<>("aaa", 3)
+    ), 2);
+    Assert.assertEquals(3, rdd1.getNumPartitions());
+    Assert.assertEquals(2, rdd2.getNumPartitions());
+    Assert.assertEquals(2, rdd3.getNumPartitions());
+  }
 
   @Test
   public void repartition() {

http://git-wip-us.apache.org/repos/asf/spark/blob/128c2903/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 5f718ea..46ed5c0 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -34,6 +34,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
 
   test("basic operations") {
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+    assert(nums.getNumPartitions === 2)
     assert(nums.collect().toList === List(1, 2, 3, 4))
     assert(nums.toLocalIterator.toList === List(1, 2, 3, 4))
     val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)

http://git-wip-us.apache.org/repos/asf/spark/blob/128c2903/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 566bfe8..d3a3c0c 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -155,6 +155,10 @@ object MimaExcludes {
           "org.apache.spark.storage.BlockManagerMessages$GetRpcHostPortForExecutor"),
         ProblemFilters.exclude[MissingClassProblem](
           "org.apache.spark.storage.BlockManagerMessages$GetRpcHostPortForExecutor$")
+      ) ++ Seq(
+        // SPARK-3580 Add getNumPartitions method to JavaRDD
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.api.java.JavaRDDLike.getNumPartitions")
       )
     case v if v.startsWith("1.5") =>
       Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org