You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/07/29 10:13:12 UTC

git commit: [SPARK-2726] and [SPARK-2727] Remove SortOrder and do in-place sort.

Repository: spark
Updated Branches:
  refs/heads/master 92ef02626 -> 96ba04bbf


[SPARK-2726] and [SPARK-2727] Remove SortOrder and do in-place sort.

The pull request includes two changes:

1. Removes SortOrder introduced by SPARK-2125. The key ordering already includes the SortOrder information since an Ordering can be reverse. This is similar to Java's Comparator interface. Rarely does an API accept both a Comparator as well as a SortOrder.

2. Replaces the sortWith call in HashShuffleReader with an in-place quick sort.

Author: Reynold Xin <rx...@apache.org>

Closes #1631 from rxin/sortOrder and squashes the following commits:

c9d37e1 [Reynold Xin] [SPARK-2726] and [SPARK-2727] Remove SortOrder and do in-place sort.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96ba04bb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96ba04bb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96ba04bb

Branch: refs/heads/master
Commit: 96ba04bbf917bcb971dd0d8cd1e1766dbe9366e8
Parents: 92ef026
Author: Reynold Xin <rx...@apache.org>
Authored: Tue Jul 29 01:12:44 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Tue Jul 29 01:12:44 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/Dependency.scala     |  4 +---
 .../apache/spark/rdd/OrderedRDDFunctions.scala  |  8 +------
 .../org/apache/spark/rdd/ShuffledRDD.scala      | 12 +---------
 .../spark/shuffle/hash/HashShuffleReader.scala  | 25 ++++++++++++--------
 4 files changed, 18 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/96ba04bb/core/src/main/scala/org/apache/spark/Dependency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
index f010c03..09a6057 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -19,7 +19,6 @@ package org.apache.spark
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.SortOrder.SortOrder
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.ShuffleHandle
 
@@ -63,8 +62,7 @@ class ShuffleDependency[K, V, C](
     val serializer: Option[Serializer] = None,
     val keyOrdering: Option[Ordering[K]] = None,
     val aggregator: Option[Aggregator[K, V, C]] = None,
-    val mapSideCombine: Boolean = false,
-    val sortOrder: Option[SortOrder] = None)
+    val mapSideCombine: Boolean = false)
   extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
 
   val shuffleId: Int = rdd.context.newShuffleId()

http://git-wip-us.apache.org/repos/asf/spark/blob/96ba04bb/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
index afd7075..d85f962 100644
--- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -58,12 +58,6 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
   def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
     val part = new RangePartitioner(numPartitions, self, ascending)
     new ShuffledRDD[K, V, V, P](self, part)
-      .setKeyOrdering(ordering)
-      .setSortOrder(if (ascending) SortOrder.ASCENDING else SortOrder.DESCENDING)
+      .setKeyOrdering(if (ascending) ordering else ordering.reverse)
   }
 }
-
-private[spark] object SortOrder extends Enumeration {
-  type SortOrder = Value
-  val ASCENDING, DESCENDING = Value
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/96ba04bb/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
index da4a8c3..bf02f68 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
@@ -21,7 +21,6 @@ import scala.reflect.ClassTag
 
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.rdd.SortOrder.SortOrder
 import org.apache.spark.serializer.Serializer
 
 private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
@@ -52,8 +51,6 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
 
   private var mapSideCombine: Boolean = false
 
-  private var sortOrder: Option[SortOrder] = None
-
   /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
   def setSerializer(serializer: Serializer): ShuffledRDD[K, V, C, P] = {
     this.serializer = Option(serializer)
@@ -78,15 +75,8 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
     this
   }
 
-  /** Set sort order for RDD's sorting. */
-  def setSortOrder(sortOrder: SortOrder): ShuffledRDD[K, V, C, P] = {
-    this.sortOrder = Option(sortOrder)
-    this
-  }
-
   override def getDependencies: Seq[Dependency[_]] = {
-    List(new ShuffleDependency(prev, part, serializer,
-      keyOrdering, aggregator, mapSideCombine, sortOrder))
+    List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
   }
 
   override val partitioner = Some(part)

http://git-wip-us.apache.org/repos/asf/spark/blob/96ba04bb/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
index 76cdb8f..c805949 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.shuffle.hash
 
 import org.apache.spark.{InterruptibleIterator, TaskContext}
-import org.apache.spark.rdd.SortOrder
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader}
 
@@ -51,16 +50,22 @@ class HashShuffleReader[K, C](
       iter
     }
 
-    val sortedIter = for (sortOrder <- dep.sortOrder; ordering <- dep.keyOrdering) yield {
-      val buf = aggregatedIter.toArray
-      if (sortOrder == SortOrder.ASCENDING) {
-        buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
-      } else {
-        buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
-      }
+    // Sort the output if there is a sort ordering defined.
+    dep.keyOrdering match {
+      case Some(keyOrd: Ordering[K]) =>
+        // Define a Comparator for the whole record based on the key Ordering.
+        val cmp = new Ordering[Product2[K, C]] {
+          override def compare(o1: Product2[K, C], o2: Product2[K, C]): Int = {
+            keyOrd.compare(o1._1, o2._1)
+          }
+        }
+        val sortBuffer: Array[Product2[K, C]] = aggregatedIter.toArray
+        // TODO: do external sort.
+        scala.util.Sorting.quickSort(sortBuffer)(cmp)
+        sortBuffer.iterator
+      case None =>
+        aggregatedIter
     }
-
-    sortedIter.getOrElse(aggregatedIter)
   }
 
   /** Close this reader */