You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/12/19 22:35:18 UTC

[1/4] git commit: Add collectPartition to JavaRDD interface. Also remove takePartition from PythonRDD and use collectPartition in rdd.py.

Updated Branches:
  refs/heads/master 440e531a5 -> 7990c5637


Add collectPartition to JavaRDD interface.
Also remove takePartition from PythonRDD and use collectPartition in rdd.py.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/af0cd6bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/af0cd6bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/af0cd6bd

Branch: refs/heads/master
Commit: af0cd6bd27dda73b326bcb6a66addceadebf5e54
Parents: 7a8169b
Author: Shivaram Venkataraman <sh...@eecs.berkeley.edu>
Authored: Wed Dec 18 11:40:07 2013 -0800
Committer: Shivaram Venkataraman <sh...@eecs.berkeley.edu>
Committed: Wed Dec 18 11:40:07 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaRDDLike.scala | 11 +++++++-
 .../org/apache/spark/api/python/PythonRDD.scala |  4 ---
 .../scala/org/apache/spark/JavaAPISuite.java    | 28 ++++++++++++++++++++
 python/pyspark/context.py                       |  3 ---
 python/pyspark/rdd.py                           |  2 +-
 5 files changed, 39 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/af0cd6bd/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 9e912d3..1d71875 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -26,7 +26,7 @@ import com.google.common.base.Optional
 import org.apache.hadoop.io.compress.CompressionCodec
 
 import org.apache.spark.{SparkContext, Partition, TaskContext}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDD, PartitionPruningRDD}
 import org.apache.spark.api.java.JavaPairRDD._
 import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
 import org.apache.spark.partial.{PartialResult, BoundedDouble}
@@ -245,6 +245,15 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   }
 
   /**
+   * Return an array that contains all of the elements in a specific partition of this RDD.
+   */
+  def collectPartition(partitionId: Int): JList[T] = {
+    import scala.collection.JavaConversions._
+    val partition = new PartitionPruningRDD[T](rdd, _ == partitionId)
+    new java.util.ArrayList(partition.collect().toSeq)
+  }
+
+  /**
    * Reduces the elements of this RDD using the specified commutative and associative binary operator.
    */
   def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/af0cd6bd/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index a659cc0..ca42c76 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -235,10 +235,6 @@ private[spark] object PythonRDD {
     file.close()
   }
 
-  def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = {
-    implicit val cm : ClassTag[T] = rdd.elementClassTag
-    rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator
-  }
 }
 
 private class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/af0cd6bd/core/src/test/scala/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 4234f6e..2862ed3 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -897,4 +897,32 @@ public class JavaAPISuite implements Serializable {
         new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
 
   }
+
+  @Test
+  public void collectPartition() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
+
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+      @Override
+      public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+        return new Tuple2<Integer, Integer>(i, i % 2);
+      }
+    });
+
+    Assert.assertEquals(Arrays.asList(1, 2), rdd1.collectPartition(0));
+    Assert.assertEquals(Arrays.asList(3, 4), rdd1.collectPartition(1));
+    Assert.assertEquals(Arrays.asList(5, 6, 7), rdd1.collectPartition(2));
+
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
+                                      new Tuple2<Integer, Integer>(2, 0)),
+                        rdd2.collectPartition(0));
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
+                                      new Tuple2<Integer, Integer>(4, 0)),
+                        rdd2.collectPartition(1));
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
+                                      new Tuple2<Integer, Integer>(6, 0),
+                                      new Tuple2<Integer, Integer>(7, 1)),
+                        rdd2.collectPartition(2));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/af0cd6bd/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index cbd41e5..0604f68 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -43,7 +43,6 @@ class SparkContext(object):
     _gateway = None
     _jvm = None
     _writeToFile = None
-    _takePartition = None
     _next_accum_id = 0
     _active_spark_context = None
     _lock = Lock()
@@ -134,8 +133,6 @@ class SparkContext(object):
                 SparkContext._jvm = SparkContext._gateway.jvm
                 SparkContext._writeToFile = \
                     SparkContext._jvm.PythonRDD.writeToFile
-                SparkContext._takePartition = \
-                    SparkContext._jvm.PythonRDD.takePartition
 
             if instance:
                 if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/af0cd6bd/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 61720dc..d81b7c9 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -577,7 +577,7 @@ class RDD(object):
         mapped = self.mapPartitions(takeUpToNum)
         items = []
         for partition in range(mapped._jrdd.splits().size()):
-            iterator = self.ctx._takePartition(mapped._jrdd.rdd(), partition)
+            iterator = mapped._jrdd.collectPartition(partition).iterator()
             items.extend(mapped._collect_iterator_through_file(iterator))
             if len(items) >= num:
                 break


[2/4] git commit: Make collectPartitions take an array of partitions Change the implementation to use runJob instead of PartitionPruningRDD. Also update the unit tests and the python take implementation to use the new interface.

Posted by rx...@apache.org.
Make collectPartitions take an array of partitions
Change the implementation to use runJob instead of PartitionPruningRDD.
Also update the unit tests and the python take implementation
to use the new interface.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d3234f97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d3234f97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d3234f97

Branch: refs/heads/master
Commit: d3234f9726db3917af4688ba70933938b078b0bd
Parents: af0cd6b
Author: Shivaram Venkataraman <sh...@eecs.berkeley.edu>
Authored: Thu Dec 19 11:40:34 2013 -0800
Committer: Shivaram Venkataraman <sh...@eecs.berkeley.edu>
Committed: Thu Dec 19 11:40:34 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaRDDLike.scala  |  8 ++++----
 .../scala/org/apache/spark/JavaAPISuite.java     | 19 ++++++++++++-------
 python/pyspark/rdd.py                            |  7 ++++++-
 3 files changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d3234f97/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 1d71875..458d9dc 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -26,7 +26,7 @@ import com.google.common.base.Optional
 import org.apache.hadoop.io.compress.CompressionCodec
 
 import org.apache.spark.{SparkContext, Partition, TaskContext}
-import org.apache.spark.rdd.{RDD, PartitionPruningRDD}
+import org.apache.spark.rdd.RDD
 import org.apache.spark.api.java.JavaPairRDD._
 import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
 import org.apache.spark.partial.{PartialResult, BoundedDouble}
@@ -247,10 +247,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   /**
    * Return an array that contains all of the elements in a specific partition of this RDD.
    */
-  def collectPartition(partitionId: Int): JList[T] = {
+  def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = {
     import scala.collection.JavaConversions._
-    val partition = new PartitionPruningRDD[T](rdd, _ == partitionId)
-    new java.util.ArrayList(partition.collect().toSeq)
+    val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds, true)
+    res.map(x => new java.util.ArrayList(x.toSeq)).toArray
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d3234f97/core/src/test/scala/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 2862ed3..79913dc 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -899,7 +899,7 @@ public class JavaAPISuite implements Serializable {
   }
 
   @Test
-  public void collectPartition() {
+  public void collectPartitions() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
 
     JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
@@ -909,20 +909,25 @@ public class JavaAPISuite implements Serializable {
       }
     });
 
-    Assert.assertEquals(Arrays.asList(1, 2), rdd1.collectPartition(0));
-    Assert.assertEquals(Arrays.asList(3, 4), rdd1.collectPartition(1));
-    Assert.assertEquals(Arrays.asList(5, 6, 7), rdd1.collectPartition(2));
+    List[] parts = rdd1.collectPartitions(new int[] {0});
+    Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
+
+    parts = rdd1.collectPartitions(new int[] {1, 2});
+    Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
+    Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
 
     Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
                                       new Tuple2<Integer, Integer>(2, 0)),
-                        rdd2.collectPartition(0));
+                        rdd2.collectPartitions(new int[] {0})[0]);
+
+    parts = rdd2.collectPartitions(new int[] {1, 2});
     Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
                                       new Tuple2<Integer, Integer>(4, 0)),
-                        rdd2.collectPartition(1));
+                        parts[0]);
     Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
                                       new Tuple2<Integer, Integer>(6, 0),
                                       new Tuple2<Integer, Integer>(7, 1)),
-                        rdd2.collectPartition(2));
+                        parts[1]);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d3234f97/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index d81b7c9..7015119 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -576,8 +576,13 @@ class RDD(object):
         # Take only up to num elements from each partition we try
         mapped = self.mapPartitions(takeUpToNum)
         items = []
+        # TODO(shivaram): Similar to the scala implementation, update the take 
+        # method to scan multiple splits based on an estimate of how many elements 
+        # we have per-split.
         for partition in range(mapped._jrdd.splits().size()):
-            iterator = mapped._jrdd.collectPartition(partition).iterator()
+            partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1)
+            partitionsToTake[0] = partition
+            iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator()
             items.extend(mapped._collect_iterator_through_file(iterator))
             if len(items) >= num:
                 break


[4/4] git commit: Merge pull request #276 from shivaram/collectPartition

Posted by rx...@apache.org.
Merge pull request #276 from shivaram/collectPartition

Add collectPartition to JavaRDD interface.

This interface is useful for implementing `take` from other language frontends where the data is serialized. Also remove `takePartition` from PythonRDD and use `collectPartition` in rdd.py.

Thanks @concretevitamin for the original change and tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/7990c563
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/7990c563
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/7990c563

Branch: refs/heads/master
Commit: 7990c5637519ae2def30dfba19b7c83562c0ec00
Parents: 440e531 9cc3a6d
Author: Reynold Xin <rx...@apache.org>
Authored: Thu Dec 19 13:35:09 2013 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Dec 19 13:35:09 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaRDDLike.scala | 11 +++++++
 .../org/apache/spark/api/python/PythonRDD.scala |  4 ---
 .../scala/org/apache/spark/JavaAPISuite.java    | 33 ++++++++++++++++++++
 python/pyspark/context.py                       |  3 --
 python/pyspark/rdd.py                           |  7 ++++-
 5 files changed, 50 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7990c563/python/pyspark/rdd.py
----------------------------------------------------------------------


[3/4] git commit: Add comment explaining collectPartitions's use

Posted by rx...@apache.org.
Add comment explaining collectPartitions's use


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/9cc3a6d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/9cc3a6d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/9cc3a6d3

Branch: refs/heads/master
Commit: 9cc3a6d3c0a64b80af77ae358c58d4b29b18c534
Parents: d3234f9
Author: Shivaram Venkataraman <sh...@eecs.berkeley.edu>
Authored: Thu Dec 19 11:49:17 2013 -0800
Committer: Shivaram Venkataraman <sh...@eecs.berkeley.edu>
Committed: Thu Dec 19 11:49:17 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9cc3a6d3/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 458d9dc..f344804 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -248,6 +248,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Return an array that contains all of the elements in a specific partition of this RDD.
    */
   def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = {
+    // This is useful for implementing `take` from other language frontends
+    // like Python where the data is serialized.
     import scala.collection.JavaConversions._
     val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds, true)
     res.map(x => new java.util.ArrayList(x.toSeq)).toArray