You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/09/08 20:20:13 UTC
git commit: SPARK-2978. Transformation with MR shuffle semantics
Repository: spark
Updated Branches:
refs/heads/master e16a8e7db -> 16a73c247
SPARK-2978. Transformation with MR shuffle semantics
I didn't add this to the transformations list in the docs because it's kind of obscure, but would be happy to do so if others think it would be helpful.
Author: Sandy Ryza <sa...@cloudera.com>
Closes #2274 from sryza/sandy-spark-2978 and squashes the following commits:
4a5332a [Sandy Ryza] Fix Java test
c04b447 [Sandy Ryza] Fix Python doc and add back deleted code
433ad5b [Sandy Ryza] Add Java test
4c25a54 [Sandy Ryza] Add s at the end and a couple other fixes
9b0ba99 [Sandy Ryza] Fix compilation
36e0571 [Sandy Ryza] Fix import ordering
48c12c2 [Sandy Ryza] Add Java version and additional doc
e5381cd [Sandy Ryza] Fix python style warnings
f147634 [Sandy Ryza] SPARK-2978. Transformation with MR shuffle semantics
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16a73c24
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16a73c24
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16a73c24
Branch: refs/heads/master
Commit: 16a73c2473181e03d88001aa3e08e6ffac92eb8b
Parents: e16a8e7
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Mon Sep 8 11:20:00 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Mon Sep 8 11:20:00 2014 -0700
----------------------------------------------------------------------
.../org/apache/spark/api/java/JavaPairRDD.scala | 26 +++++++++++++++++
.../apache/spark/rdd/OrderedRDDFunctions.scala | 14 ++++++++-
.../java/org/apache/spark/JavaAPISuite.java | 30 ++++++++++++++++++++
.../scala/org/apache/spark/rdd/RDDSuite.scala | 14 +++++++++
python/pyspark/rdd.py | 24 ++++++++++++++++
python/pyspark/tests.py | 8 ++++++
6 files changed, 115 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/16a73c24/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index feeb6c0..880f61c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -759,6 +759,32 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
}
/**
+ * Repartition the RDD according to the given partitioner and, within each resulting partition,
+ * sort records by their keys.
+ *
+ * This is more efficient than calling `repartition` and then sorting within each partition
+ * because it can push the sorting down into the shuffle machinery.
+ */
+ def repartitionAndSortWithinPartitions(partitioner: Partitioner): JavaPairRDD[K, V] = {
+ val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]
+ repartitionAndSortWithinPartitions(partitioner, comp)
+ }
+
+ /**
+ * Repartition the RDD according to the given partitioner and, within each resulting partition,
+ * sort records by their keys.
+ *
+ * This is more efficient than calling `repartition` and then sorting within each partition
+ * because it can push the sorting down into the shuffle machinery.
+ */
+ def repartitionAndSortWithinPartitions(partitioner: Partitioner, comp: Comparator[K])
+ : JavaPairRDD[K, V] = {
+ implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
+ fromRDD(
+ new OrderedRDDFunctions[K, V, (K, V)](rdd).repartitionAndSortWithinPartitions(partitioner))
+ }
+
+ /**
* Sort the RDD by key, so that each partition contains a sorted range of the elements in
* ascending order. Calling `collect` or `save` on the resulting RDD will return or output an
* ordered list of records (in the `save` case, they will be written to multiple `part-X` files
http://git-wip-us.apache.org/repos/asf/spark/blob/16a73c24/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
index e98bad2..d0dbfef 100644
--- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -19,7 +19,7 @@ package org.apache.spark.rdd
import scala.reflect.ClassTag
-import org.apache.spark.{Logging, RangePartitioner}
+import org.apache.spark.{Logging, Partitioner, RangePartitioner}
import org.apache.spark.annotation.DeveloperApi
/**
@@ -64,4 +64,16 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
+
+ /**
+ * Repartition the RDD according to the given partitioner and, within each resulting partition,
+ * sort records by their keys.
+ *
+ * This is more efficient than calling `repartition` and then sorting within each partition
+ * because it can push the sorting down into the shuffle machinery.
+ */
+ def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = {
+ new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/16a73c24/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index e1c13de..be99dc5 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -190,6 +190,36 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void repartitionAndSortWithinPartitions() {
+ List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
+ pairs.add(new Tuple2<Integer, Integer>(0, 5));
+ pairs.add(new Tuple2<Integer, Integer>(3, 8));
+ pairs.add(new Tuple2<Integer, Integer>(2, 6));
+ pairs.add(new Tuple2<Integer, Integer>(0, 8));
+ pairs.add(new Tuple2<Integer, Integer>(3, 8));
+ pairs.add(new Tuple2<Integer, Integer>(1, 3));
+
+ JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+
+ Partitioner partitioner = new Partitioner() {
+ public int numPartitions() {
+ return 2;
+ }
+ public int getPartition(Object key) {
+ return ((Integer)key).intValue() % 2;
+ }
+ };
+
+ JavaPairRDD<Integer, Integer> repartitioned =
+ rdd.repartitionAndSortWithinPartitions(partitioner);
+ List<List<Tuple2<Integer, Integer>>> partitions = repartitioned.glom().collect();
+ Assert.assertEquals(partitions.get(0), Arrays.asList(new Tuple2<Integer, Integer>(0, 5),
+ new Tuple2<Integer, Integer>(0, 8), new Tuple2<Integer, Integer>(2, 6)));
+ Assert.assertEquals(partitions.get(1), Arrays.asList(new Tuple2<Integer, Integer>(1, 3),
+ new Tuple2<Integer, Integer>(3, 8), new Tuple2<Integer, Integer>(3, 8)));
+ }
+
+ @Test
public void emptyRDD() {
JavaRDD<String> rdd = sc.emptyRDD();
Assert.assertEquals("Empty RDD shouldn't have any values", 0, rdd.count());
http://git-wip-us.apache.org/repos/asf/spark/blob/16a73c24/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 499dcda..c1b501a 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -682,6 +682,20 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(data.sortBy(parse, true, 2)(NameOrdering, classTag[Person]).collect() === nameOrdered)
}
+ test("repartitionAndSortWithinPartitions") {
+ val data = sc.parallelize(Seq((0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)), 2)
+
+ val partitioner = new Partitioner {
+ def numPartitions: Int = 2
+ def getPartition(key: Any): Int = key.asInstanceOf[Int] % 2
+ }
+
+ val repartitioned = data.repartitionAndSortWithinPartitions(partitioner)
+ val partitions = repartitioned.glom().collect()
+ assert(partitions(0) === Seq((0, 5), (0, 8), (2, 6)))
+ assert(partitions(1) === Seq((1, 3), (3, 8), (3, 8)))
+ }
+
test("intersection") {
val all = sc.parallelize(1 to 10)
val evens = sc.parallelize(2 to 10 by 2)
http://git-wip-us.apache.org/repos/asf/spark/blob/16a73c24/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 266090e..5667154 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -520,6 +520,30 @@ class RDD(object):
raise TypeError
return self.union(other)
+ def repartitionAndSortWithinPartitions(self, numPartitions=None, partitionFunc=portable_hash,
+ ascending=True, keyfunc=lambda x: x):
+ """
+ Repartition the RDD according to the given partitioner and, within each resulting partition,
+ sort records by their keys.
+
+ >>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)])
+ >>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, 2)
+ >>> rdd2.glom().collect()
+ [[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]]
+ """
+ if numPartitions is None:
+ numPartitions = self._defaultReducePartitions()
+
+ spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == "true")
+ memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
+ serializer = self._jrdd_deserializer
+
+ def sortPartition(iterator):
+ sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted
+ return iter(sort(iterator, key=lambda (k, v): keyfunc(k), reverse=(not ascending)))
+
+ return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True)
+
def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
"""
Sorts this RDD, which is assumed to consist of (key, value) pairs.
http://git-wip-us.apache.org/repos/asf/spark/blob/16a73c24/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 9fbeb36..0bd2a9e 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -545,6 +545,14 @@ class TestRDDFunctions(PySparkTestCase):
self.assertEquals(([1, "b"], [5]), rdd.histogram(1))
self.assertRaises(TypeError, lambda: rdd.histogram(2))
+ def test_repartitionAndSortWithinPartitions(self):
+ rdd = self.sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)], 2)
+
+ repartitioned = rdd.repartitionAndSortWithinPartitions(2, lambda key: key % 2)
+ partitions = repartitioned.glom().collect()
+ self.assertEquals(partitions[0], [(0, 5), (0, 8), (2, 6)])
+ self.assertEquals(partitions[1], [(1, 3), (3, 8), (3, 8)])
+
class TestSQL(PySparkTestCase):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org