You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/01/23 09:19:02 UTC

[1/6] git commit: Add collectPartition to JavaRDD interface. Also remove takePartition from PythonRDD and use collectPartition in rdd.py.

Updated Branches:
  refs/heads/branch-0.8 f3cc3a7b8 -> c89b71ac7


Add collectPartition to JavaRDD interface.
Also remove takePartition from PythonRDD and use collectPartition in rdd.py.

Conflicts:
	core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
	python/pyspark/context.py
	python/pyspark/rdd.py


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5092baed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5092baed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5092baed

Branch: refs/heads/branch-0.8
Commit: 5092baedc20a7372b4238a7468470a9c5c60deeb
Parents: 5c443ad
Author: Shivaram Venkataraman <sh...@eecs.berkeley.edu>
Authored: Wed Dec 18 11:40:07 2013 -0800
Committer: Shivaram Venkataraman <sh...@eecs.berkeley.edu>
Committed: Thu Jan 16 19:20:57 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaRDDLike.scala | 11 +++++++-
 .../org/apache/spark/api/python/PythonRDD.scala |  4 ---
 .../scala/org/apache/spark/JavaAPISuite.java    | 28 ++++++++++++++++++++
 python/pyspark/context.py                       |  3 ---
 python/pyspark/rdd.py                           |  4 +--
 5 files changed, 40 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5092baed/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 7a3568c..0e46876 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -25,7 +25,7 @@ import com.google.common.base.Optional
 import org.apache.hadoop.io.compress.CompressionCodec
 
 import org.apache.spark.{SparkContext, Partition, TaskContext}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDD, PartitionPruningRDD}
 import org.apache.spark.api.java.JavaPairRDD._
 import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
 import org.apache.spark.partial.{PartialResult, BoundedDouble}
@@ -247,6 +247,15 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   }
 
   /**
+   * Return an array that contains all of the elements in a specific partition of this RDD.
+   */
+  def collectPartition(partitionId: Int): JList[T] = {
+    import scala.collection.JavaConversions._
+    val partition = new PartitionPruningRDD[T](rdd, _ == partitionId)
+    new java.util.ArrayList(partition.collect().toSeq)
+  }
+
+  /**
    * Reduces the elements of this RDD using the specified commutative and associative binary operator.
    */
   def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5092baed/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 12b4d94..f7b38b4 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -283,10 +283,6 @@ private[spark] object PythonRDD {
     file.close()
   }
 
-  def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = {
-    implicit val cm : ClassManifest[T] = rdd.elementClassManifest
-    rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator
-  }
 }
 
 private object Pickle {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5092baed/core/src/test/scala/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 352036f..d7c673a 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -883,4 +883,32 @@ public class JavaAPISuite implements Serializable {
         new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
 
   }
+
+  @Test
+  public void collectPartition() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
+
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+      @Override
+      public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+        return new Tuple2<Integer, Integer>(i, i % 2);
+      }
+    });
+
+    Assert.assertEquals(Arrays.asList(1, 2), rdd1.collectPartition(0));
+    Assert.assertEquals(Arrays.asList(3, 4), rdd1.collectPartition(1));
+    Assert.assertEquals(Arrays.asList(5, 6, 7), rdd1.collectPartition(2));
+
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
+                                      new Tuple2<Integer, Integer>(2, 0)),
+                        rdd2.collectPartition(0));
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
+                                      new Tuple2<Integer, Integer>(4, 0)),
+                        rdd2.collectPartition(1));
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
+                                      new Tuple2<Integer, Integer>(6, 0),
+                                      new Tuple2<Integer, Integer>(7, 1)),
+                        rdd2.collectPartition(2));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5092baed/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index a7ca8bc..3d47589 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -43,7 +43,6 @@ class SparkContext(object):
     _gateway = None
     _jvm = None
     _writeIteratorToPickleFile = None
-    _takePartition = None
     _next_accum_id = 0
     _active_spark_context = None
     _lock = Lock()
@@ -127,8 +126,6 @@ class SparkContext(object):
                 SparkContext._jvm = SparkContext._gateway.jvm
                 SparkContext._writeIteratorToPickleFile = \
                     SparkContext._jvm.PythonRDD.writeIteratorToPickleFile
-                SparkContext._takePartition = \
-                    SparkContext._jvm.PythonRDD.takePartition
 
             if instance:
                 if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5092baed/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 0c599e0..22cddf0 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -570,8 +570,8 @@ class RDD(object):
         mapped = self.mapPartitions(takeUpToNum)
         items = []
         for partition in range(mapped._jrdd.splits().size()):
-            iterator = self.ctx._takePartition(mapped._jrdd.rdd(), partition)
-            items.extend(self._collect_iterator_through_file(iterator))
+            iterator = mapped._jrdd.collectPartition(partition).iterator()
+            items.extend(mapped._collect_iterator_through_file(iterator))
             if len(items) >= num:
                 break
         return items[:num]


[2/6] git commit: Make collectPartitions take an array of partitions Change the implementation to use runJob instead of PartitionPruningRDD. Also update the unit tests and the python take implementation to use the new interface.

Posted by rx...@apache.org.
Make collectPartitions take an array of partitions
Change the implementation to use runJob instead of PartitionPruningRDD.
Also update the unit tests and the python take implementation
to use the new interface.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/91e6e5ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/91e6e5ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/91e6e5ba

Branch: refs/heads/branch-0.8
Commit: 91e6e5bad21a7df9ad8c0f339b1442e59854eee7
Parents: 5092bae
Author: Shivaram Venkataraman <sh...@eecs.berkeley.edu>
Authored: Thu Dec 19 11:40:34 2013 -0800
Committer: Shivaram Venkataraman <sh...@eecs.berkeley.edu>
Committed: Thu Jan 16 19:21:08 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaRDDLike.scala  |  8 ++++----
 .../scala/org/apache/spark/JavaAPISuite.java     | 19 ++++++++++++-------
 python/pyspark/rdd.py                            |  7 ++++++-
 3 files changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91e6e5ba/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 0e46876..f3dd013 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -25,7 +25,7 @@ import com.google.common.base.Optional
 import org.apache.hadoop.io.compress.CompressionCodec
 
 import org.apache.spark.{SparkContext, Partition, TaskContext}
-import org.apache.spark.rdd.{RDD, PartitionPruningRDD}
+import org.apache.spark.rdd.RDD
 import org.apache.spark.api.java.JavaPairRDD._
 import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
 import org.apache.spark.partial.{PartialResult, BoundedDouble}
@@ -249,10 +249,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   /**
    * Return an array that contains all of the elements in a specific partition of this RDD.
    */
-  def collectPartition(partitionId: Int): JList[T] = {
+  def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = {
     import scala.collection.JavaConversions._
-    val partition = new PartitionPruningRDD[T](rdd, _ == partitionId)
-    new java.util.ArrayList(partition.collect().toSeq)
+    val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds, true)
+    res.map(x => new java.util.ArrayList(x.toSeq)).toArray
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91e6e5ba/core/src/test/scala/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index d7c673a..07f312f 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -885,7 +885,7 @@ public class JavaAPISuite implements Serializable {
   }
 
   @Test
-  public void collectPartition() {
+  public void collectPartitions() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
 
     JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
@@ -895,20 +895,25 @@ public class JavaAPISuite implements Serializable {
       }
     });
 
-    Assert.assertEquals(Arrays.asList(1, 2), rdd1.collectPartition(0));
-    Assert.assertEquals(Arrays.asList(3, 4), rdd1.collectPartition(1));
-    Assert.assertEquals(Arrays.asList(5, 6, 7), rdd1.collectPartition(2));
+    List[] parts = rdd1.collectPartitions(new int[] {0});
+    Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
+
+    parts = rdd1.collectPartitions(new int[] {1, 2});
+    Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
+    Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
 
     Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
                                       new Tuple2<Integer, Integer>(2, 0)),
-                        rdd2.collectPartition(0));
+                        rdd2.collectPartitions(new int[] {0})[0]);
+
+    parts = rdd2.collectPartitions(new int[] {1, 2});
     Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
                                       new Tuple2<Integer, Integer>(4, 0)),
-                        rdd2.collectPartition(1));
+                        parts[0]);
     Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
                                       new Tuple2<Integer, Integer>(6, 0),
                                       new Tuple2<Integer, Integer>(7, 1)),
-                        rdd2.collectPartition(2));
+                        parts[1]);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91e6e5ba/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 22cddf0..3f84312 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -569,8 +569,13 @@ class RDD(object):
         # Take only up to num elements from each partition we try
         mapped = self.mapPartitions(takeUpToNum)
         items = []
+        # TODO(shivaram): Similar to the scala implementation, update the take 
+        # method to scan multiple splits based on an estimate of how many elements 
+        # we have per-split.
         for partition in range(mapped._jrdd.splits().size()):
-            iterator = mapped._jrdd.collectPartition(partition).iterator()
+            partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1)
+            partitionsToTake[0] = partition
+            iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator()
             items.extend(mapped._collect_iterator_through_file(iterator))
             if len(items) >= num:
                 break


[5/6] git commit: Restore takePartition to PythonRDD, context.py This is to avoid removing functions in minor releases.

Posted by rx...@apache.org.
Restore takePartition to PythonRDD, context.py
This is to avoid removing functions in minor releases.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/38bf7860
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/38bf7860
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/38bf7860

Branch: refs/heads/branch-0.8
Commit: 38bf7860da3bf57288457510d15701bfaa1f7517
Parents: 691dfef
Author: Shivaram Venkataraman <sh...@eecs.berkeley.edu>
Authored: Wed Jan 22 23:28:54 2014 -0800
Committer: Shivaram Venkataraman <sh...@eecs.berkeley.edu>
Committed: Wed Jan 22 23:28:54 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 4 ++++
 python/pyspark/context.py                                       | 3 +++
 2 files changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/38bf7860/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index f7b38b4..12b4d94 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -283,6 +283,10 @@ private[spark] object PythonRDD {
     file.close()
   }
 
+  def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = {
+    implicit val cm : ClassManifest[T] = rdd.elementClassManifest
+    rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator
+  }
 }
 
 private object Pickle {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/38bf7860/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 3d47589..a7ca8bc 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -43,6 +43,7 @@ class SparkContext(object):
     _gateway = None
     _jvm = None
     _writeIteratorToPickleFile = None
+    _takePartition = None
     _next_accum_id = 0
     _active_spark_context = None
     _lock = Lock()
@@ -126,6 +127,8 @@ class SparkContext(object):
                 SparkContext._jvm = SparkContext._gateway.jvm
                 SparkContext._writeIteratorToPickleFile = \
                     SparkContext._jvm.PythonRDD.writeIteratorToPickleFile
+                SparkContext._takePartition = \
+                    SparkContext._jvm.PythonRDD.takePartition
 
             if instance:
                 if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:


[6/6] git commit: Merge pull request #453 from shivaram/branch-0.8-SparkR

Posted by rx...@apache.org.
Merge pull request #453 from shivaram/branch-0.8-SparkR

Backport changes used in SparkR to 0.8 branch

Backports two changes from master branch

1. Adding collectPartition to JavaRDD and using it from Python as well
2. Making broadcast id public.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c89b71ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c89b71ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c89b71ac

Branch: refs/heads/branch-0.8
Commit: c89b71ac761dc1eea32435eadc0621f739e96098
Parents: f3cc3a7 38bf786
Author: Reynold Xin <rx...@apache.org>
Authored: Thu Jan 23 00:18:56 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Jan 23 00:18:56 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaRDDLike.scala | 11 +++++++
 .../org/apache/spark/broadcast/Broadcast.scala  |  2 +-
 .../scala/org/apache/spark/JavaAPISuite.java    | 33 ++++++++++++++++++++
 python/pyspark/rdd.py                           |  9 ++++--
 4 files changed, 52 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[4/6] git commit: Make broadcast id public for use in R frontend

Posted by rx...@apache.org.
Make broadcast id public for use in R frontend


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/691dfefe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/691dfefe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/691dfefe

Branch: refs/heads/branch-0.8
Commit: 691dfefeff8c40d18891853c50121293de39876b
Parents: 3ef68e4
Author: Shivaram Venkataraman <sh...@eecs.berkeley.edu>
Authored: Thu Jan 16 21:28:56 2014 -0800
Committer: Shivaram Venkataraman <sh...@eecs.berkeley.edu>
Committed: Thu Jan 16 21:28:56 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/691dfefe/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index 43c1829..f22a66a 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.spark._
 
-abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
+abstract class Broadcast[T](val id: Long) extends Serializable {
   def value: T
 
   // We cannot have an abstract readObject here due to some weird issues with


[3/6] git commit: Add comment explaining collectPartitions's use

Posted by rx...@apache.org.
Add comment explaining collectPartitions's use


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/3ef68e49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/3ef68e49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/3ef68e49

Branch: refs/heads/branch-0.8
Commit: 3ef68e49ef7c5548e2978beb7b2f903f00b50d95
Parents: 91e6e5b
Author: Shivaram Venkataraman <sh...@eecs.berkeley.edu>
Authored: Thu Dec 19 11:49:17 2013 -0800
Committer: Shivaram Venkataraman <sh...@eecs.berkeley.edu>
Committed: Thu Jan 16 19:21:18 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3ef68e49/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index f3dd013..2283a8e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -250,6 +250,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Return an array that contains all of the elements in a specific partition of this RDD.
    */
   def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = {
+    // This is useful for implementing `take` from other language frontends
+    // like Python where the data is serialized.
     import scala.collection.JavaConversions._
     val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds, true)
     res.map(x => new java.util.ArrayList(x.toSeq)).toArray