You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/04/18 21:04:18 UTC

git commit: SPARK-1456 Remove view bounds on Ordered in favor of a context bound on Ordering.

Repository: spark
Updated Branches:
  refs/heads/master 81a152c54 -> c399baa0f


SPARK-1456 Remove view bounds on Ordered in favor of a context bound on Ordering.

This doesn't require creating new Ordering objects per row.  Additionally, [view bounds are going to be deprecated](https://issues.scala-lang.org/browse/SI-7629), so we should get rid of them while APIs are still flexible.

Author: Michael Armbrust <mi...@databricks.com>

Closes #410 from marmbrus/viewBounds and squashes the following commits:

c574221 [Michael Armbrust] fix example.
812008e [Michael Armbrust] Update Java API.
1b9b85c [Michael Armbrust] Update scala doc.
35798a8 [Michael Armbrust] Remove view bounds on Ordered in favor of a context bound on Ordering.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c399baa0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c399baa0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c399baa0

Branch: refs/heads/master
Commit: c399baa0fc40be7aa51835aaeadcd5d768dfdb95
Parents: 81a152c
Author: Michael Armbrust <mi...@databricks.com>
Authored: Fri Apr 18 12:04:13 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Fri Apr 18 12:04:13 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/Partitioner.scala    |  8 +++---
 .../scala/org/apache/spark/SparkContext.scala   |  2 +-
 .../org/apache/spark/api/java/JavaPairRDD.scala | 10 ++------
 .../apache/spark/rdd/OrderedRDDFunctions.scala  | 26 ++++++++++++++++----
 .../org/apache/spark/util/CollectionsUtil.scala |  2 +-
 5 files changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c399baa0/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index ad99882..9155159 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -89,12 +89,14 @@ class HashPartitioner(partitions: Int) extends Partitioner {
  * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
  * equal ranges. The ranges are determined by sampling the content of the RDD passed in.
  */
-class RangePartitioner[K <% Ordered[K]: ClassTag, V](
+class RangePartitioner[K : Ordering : ClassTag, V](
     partitions: Int,
     @transient rdd: RDD[_ <: Product2[K,V]],
     private val ascending: Boolean = true)
   extends Partitioner {
 
+  private val ordering = implicitly[Ordering[K]]
+
   // An array of upper bounds for the first (partitions - 1) partitions
   private val rangeBounds: Array[K] = {
     if (partitions == 1) {
@@ -103,7 +105,7 @@ class RangePartitioner[K <% Ordered[K]: ClassTag, V](
       val rddSize = rdd.count()
       val maxSampleSize = partitions * 20.0
       val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0)
-      val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sortWith(_ < _)
+      val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sorted
       if (rddSample.length == 0) {
         Array()
       } else {
@@ -126,7 +128,7 @@ class RangePartitioner[K <% Ordered[K]: ClassTag, V](
     var partition = 0
     if (rangeBounds.length < 1000) {
       // If we have less than 100 partitions naive search
-      while (partition < rangeBounds.length && k > rangeBounds(partition)) {
+      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
         partition += 1
       }
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/c399baa0/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ee56373..d3ef75b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1273,7 +1273,7 @@ object SparkContext extends Logging {
       rdd: RDD[(K, V)]) =
     new SequenceFileRDDFunctions(rdd)
 
-  implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
+  implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
       rdd: RDD[(K, V)]) =
     new OrderedRDDFunctions[K, V, (K, V)](rdd)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c399baa0/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index e5b2c8a..b3ec270 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -626,10 +626,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * order of the keys).
    */
   def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = {
-    class KeyOrdering(val a: K) extends Ordered[K] {
-      override def compare(b: K) = comp.compare(a, b)
-    }
-    implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
+    implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
     fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending))
   }
 
@@ -640,10 +637,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * order of the keys).
    */
   def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V] = {
-    class KeyOrdering(val a: K) extends Ordered[K] {
-      override def compare(b: K) = comp.compare(a, b)
-    }
-    implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
+    implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
     fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c399baa0/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
index d5691f2..6a3f698 100644
--- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -24,15 +24,31 @@ import org.apache.spark.{Logging, RangePartitioner}
 /**
  * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
  * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
- * use these functions. They will work with any key type that has a `scala.math.Ordered`
- * implementation.
+ * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
+ * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
+ * define their own orderings for custom types, or to override the default ordering.  The implicit
+ * ordering that is in the closest scope will be used.
+ *
+ * {{{
+ *   import org.apache.spark.SparkContext._
+ *
+ *   val rdd: RDD[(String, Int)] = ...
+ *   implicit val caseInsensitiveOrdering = new Ordering[String] {
+ *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
+ *   }
+ *
+ *   // Sort by key, using the above case insensitive ordering.
+ *   rdd.sortByKey()
+ * }}}
  */
-class OrderedRDDFunctions[K <% Ordered[K]: ClassTag,
+class OrderedRDDFunctions[K : Ordering : ClassTag,
                           V: ClassTag,
                           P <: Product2[K, V] : ClassTag](
     self: RDD[P])
   extends Logging with Serializable {
 
+  private val ordering = implicitly[Ordering[K]]
+
   /**
    * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    * `collect` or `save` on the resulting RDD will return or output an ordered list of records
@@ -45,9 +61,9 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassTag,
     shuffled.mapPartitions(iter => {
       val buf = iter.toArray
       if (ascending) {
-        buf.sortWith((x, y) => x._1 < y._1).iterator
+        buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
       } else {
-        buf.sortWith((x, y) => x._1 > y._1).iterator
+        buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
       }
     }, preservesPartitioning = true)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c399baa0/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala b/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala
index 9323503..e4c254b 100644
--- a/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala
+++ b/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala
@@ -23,7 +23,7 @@ import scala.Array
 import scala.reflect._
 
 private[spark] object CollectionsUtils {
-  def makeBinarySearch[K <% Ordered[K] : ClassTag] : (Array[K], K) => Int = {
+  def makeBinarySearch[K : Ordering : ClassTag] : (Array[K], K) => Int = {
     classTag[K] match {
       case ClassTag.Float =>
         (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float])