You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by xiajunluan <gi...@git.apache.org> on 2014/05/31 16:16:36 UTC

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

GitHub user xiajunluan opened a pull request:

    https://github.com/apache/spark/pull/931

    Fix JIRA-983 and support exteranl sort for sortByKey

    Change class ExternalAppendOnlyMap and make it support customized comparator function(not only sorted by hashCode).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xiajunluan/spark-1 JIRA-983

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/931.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #931
    
----

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13315309
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -17,54 +17,123 @@
     
     package org.apache.spark.rdd
     
    +import java.util.Comparator
    +
    +import scala.collection.mutable.ArrayBuffer
     import scala.reflect.ClassTag
     
    -import org.apache.spark.{Logging, RangePartitioner}
    +import org.apache.spark.{Logging, RangePartitioner, SparkEnv}
    +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
     
     /**
    - * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    - * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    - * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    - * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    - * define their own orderings for custom types, or to override the default ordering.  The implicit
    - * ordering that is in the closest scope will be used.
    - *
    - * {{{
    - *   import org.apache.spark.SparkContext._
    - *
    - *   val rdd: RDD[(String, Int)] = ...
    - *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    - *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    - *   }
    - *
    - *   // Sort by key, using the above case insensitive ordering.
    - *   rdd.sortByKey()
    - * }}}
    - */
    +  * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    +  * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    +  * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    +  * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    +  * define their own orderings for custom types, or to override the default ordering.  The implicit
    +  * ordering that is in the closest scope will be used.
    +  *
    +  * {{{
    +  *   import org.apache.spark.SparkContext._
    +  *
    +  *   val rdd: RDD[(String, Int)] = ...
    +  *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    +  *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    +  *   }
    +  *
    +  *   // Sort by key, using the above case insensitive ordering.
    +  *   rdd.sortByKey()
    +  * }}}
    +  */
    +
     class OrderedRDDFunctions[K : Ordering : ClassTag,
                               V: ClassTag,
                               P <: Product2[K, V] : ClassTag](
    -    self: RDD[P])
    -  extends Logging with Serializable {
    +  self: RDD[P])
    +extends Logging with Serializable {
     
       private val ordering = implicitly[Ordering[K]]
     
    +  private type SortCombiner = ArrayBuffer[V]
       /**
    -   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    -   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    -   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    -   * order of the keys).
    -   */
    +    * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    +    * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    +    * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    +      * order of the keys).
    +    */
       def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    +    val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
         val part = new RangePartitioner(numPartitions, self, ascending)
         val shuffled = new ShuffledRDD[K, V, P](self, part)
    -    shuffled.mapPartitions(iter => {
    -      val buf = iter.toArray
    +        if (!externalSorting) {
    +          shuffled.mapPartitions(iter => {
    +              val buf = iter.toArray
    +              if (ascending) {
    +                buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
    +              } else {
    +                buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
    +              }
    +            }, preservesPartitioning = true)
    +        } else {
    +          shuffled.mapPartitions(iter => {
    +              val map = createExternalMap(ascending)
    +              while (iter.hasNext) { 
    +                val kv = iter.next()
    +                map.insert(kv._1, kv._2)
    +              }
    +              map.iterator
    +            }).flatMap(elem => {
    +              for (value <- elem._2)
    +                yield((elem._1, value).asInstanceOf[P])
    +            })
    +        }
    +  }
    +
    +  private def createExternalMap(ascending: Boolean) :ExternalAppendOnlyMap[K, V, SortCombiner] = {
    +    val createCombiner: (V => SortCombiner) = value => {
    +      val newCombiner = new SortCombiner
    +      newCombiner += value
    +      newCombiner
    +    }
    +    val mergeValue: (SortCombiner, V) => SortCombiner =
    +    (combiner, value) => {
    +      combiner += value
    +      combiner
    +    }
    +    val mergeCombiners: (SortCombiner, SortCombiner) => SortCombiner =
    +    (combiner1, combiner2) => {
    +      combiner1 ++= combiner2
    +    }
    +    new SortedExternalAppendOnlyMap[K, V, SortCombiner](
    +      createCombiner, mergeValue, mergeCombiners, 
    +      new KeyComparator[K, SortCombiner](ascending, ordering))
    +  }
    +
    +  private class KeyComparator[K, SortCombiner](ascending: Boolean, ord: Ordering[K]) 
    +  extends Comparator[(K, SortCombiner)] {
    +    def compare (kc1: (K, SortCombiner), kc2: (K, SortCombiner)): Int = {
           if (ascending) {
    -        buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
    +        if (ord.lt(kc1._1, kc2._1)) -1 else if (ord.equiv(kc1._1, kc2._1)) 0 else 1
    +      } else {
    +        if (ord.gt(kc1._1, kc2._1)) -1 else if (ord.equiv(kc1._1, kc2._1)) 0 else 1
    +      }
    +    }
    +  }
    +
    +  private class SortedExternalAppendOnlyMap[K, V, C](
    +    createCombiner: V => C,
    +    mergeValue: (C, V) => C,
    +    mergeCombiners: (C, C) => C,
    +    customizedComparator: Comparator[(K, C)] = null)
    +  extends ExternalAppendOnlyMap[K, V, C](
    +    createCombiner, mergeValue, mergeCombiners, customizedComparator) {
    +
    +    override def iterator: Iterator[(K, C)] = {
    --- End diff --
    
    Instead of overriding `iterator` here could we add a method called `sortedIterator`? It will be less confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13315222
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -17,54 +17,123 @@
     
     package org.apache.spark.rdd
     
    +import java.util.Comparator
    +
    +import scala.collection.mutable.ArrayBuffer
     import scala.reflect.ClassTag
     
    -import org.apache.spark.{Logging, RangePartitioner}
    +import org.apache.spark.{Logging, RangePartitioner, SparkEnv}
    +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
     
     /**
    - * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    - * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    - * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    - * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    - * define their own orderings for custom types, or to override the default ordering.  The implicit
    - * ordering that is in the closest scope will be used.
    - *
    - * {{{
    - *   import org.apache.spark.SparkContext._
    - *
    - *   val rdd: RDD[(String, Int)] = ...
    - *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    - *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    - *   }
    - *
    - *   // Sort by key, using the above case insensitive ordering.
    - *   rdd.sortByKey()
    - * }}}
    - */
    +  * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    +  * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    +  * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    +  * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    +  * define their own orderings for custom types, or to override the default ordering.  The implicit
    +  * ordering that is in the closest scope will be used.
    +  *
    +  * {{{
    +  *   import org.apache.spark.SparkContext._
    +  *
    +  *   val rdd: RDD[(String, Int)] = ...
    +  *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    +  *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    +  *   }
    +  *
    +  *   // Sort by key, using the above case insensitive ordering.
    +  *   rdd.sortByKey()
    +  * }}}
    +  */
    +
     class OrderedRDDFunctions[K : Ordering : ClassTag,
                               V: ClassTag,
                               P <: Product2[K, V] : ClassTag](
    -    self: RDD[P])
    -  extends Logging with Serializable {
    +  self: RDD[P])
    +extends Logging with Serializable {
     
       private val ordering = implicitly[Ordering[K]]
     
    +  private type SortCombiner = ArrayBuffer[V]
       /**
    -   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    -   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    -   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    -   * order of the keys).
    -   */
    +    * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    +    * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    +    * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    +      * order of the keys).
    +    */
       def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    +    val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
         val part = new RangePartitioner(numPartitions, self, ascending)
         val shuffled = new ShuffledRDD[K, V, P](self, part)
    -    shuffled.mapPartitions(iter => {
    -      val buf = iter.toArray
    +        if (!externalSorting) {
    +          shuffled.mapPartitions(iter => {
    +              val buf = iter.toArray
    +              if (ascending) {
    +                buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
    +              } else {
    +                buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
    +              }
    +            }, preservesPartitioning = true)
    +        } else {
    +          shuffled.mapPartitions(iter => {
    +              val map = createExternalMap(ascending)
    +              while (iter.hasNext) { 
    +                val kv = iter.next()
    +                map.insert(kv._1, kv._2)
    +              }
    +              map.iterator
    +            }).flatMap(elem => {
    +              for (value <- elem._2)
    +                yield((elem._1, value).asInstanceOf[P])
    --- End diff --
    
    This might be more efficient as `elem._2.iterator.map(x => (elem._1, x).asInstanceOf[P])`; it avoids copying the whole ArrayBuffer


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-48144929
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-45293739
  
    Looks like Jenkins is complaining about a line longer than 100 characters


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-44749479
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15322/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13877376
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -41,30 +45,92 @@ import org.apache.spark.{Logging, RangePartitioner}
      *   rdd.sortByKey()
      * }}}
      */
    +
     class OrderedRDDFunctions[K : Ordering : ClassTag,
                               V: ClassTag,
                               P <: Product2[K, V] : ClassTag](
    -    self: RDD[P])
    -  extends Logging with Serializable {
    +  self: RDD[P])
    +extends Logging with Serializable {
     
       private val ordering = implicitly[Ordering[K]]
     
    +  private type SortCombiner = ArrayBuffer[V]
       /**
    -   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    -   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    -   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    -   * order of the keys).
    -   */
    +    * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    +    * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    +    * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    +      * order of the keys).
    --- End diff --
    
    nit: indentation weird


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by xiajunluan <gi...@git.apache.org>.
Github user xiajunluan closed the pull request at:

    https://github.com/apache/spark/pull/931


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-44909735
  
    Hey @xiajunluan, this is a good start, but I made some comments throughout. There are a few other question though:
    - Performance: have you benchmarked this against the old version for non-sorting use cases? We need to make sure the pluggable Comparator doesn't break stuff
    - Long-term it would be good to spill values even within a key for sort, i.e. don't have ArrayBuffer as a combiner, just put in many values. But this probably can't be done easily in this patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-48298665
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16400/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13878210
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -292,14 +296,13 @@ class ExternalAppendOnlyMap[K, V, C](
           }
           // Select a key from the StreamBuffer that holds the lowest key hash
           val minBuffer = mergeHeap.dequeue()
    -      val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
    +      val minPairs = minBuffer.pairs
           var (minKey, minCombiner) = minPairs.remove(0)
    -      assert(minKey.hashCode() == minHash)
     
           // For all other streams that may have this key (i.e. have the same minimum key hash),
           // merge in the corresponding value (if any) from that stream
           val mergedBuffers = ArrayBuffer[StreamBuffer](minBuffer)
    -      while (mergeHeap.length > 0 && mergeHeap.head.minKeyHash == minHash) {
    +      while (mergeHeap.length > 0 && comparator.compare(mergeHeap.head.pairs.head, (minKey, minCombiner)) == 0) {
    --- End diff --
    
    nit: break this line at `&&`. I believe this is failing the tests


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/931


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-48282761
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-44749436
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-48144924
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13877362
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -41,30 +45,92 @@ import org.apache.spark.{Logging, RangePartitioner}
      *   rdd.sortByKey()
      * }}}
      */
    +
     class OrderedRDDFunctions[K : Ordering : ClassTag,
                               V: ClassTag,
                               P <: Product2[K, V] : ClassTag](
    -    self: RDD[P])
    -  extends Logging with Serializable {
    +  self: RDD[P])
    +extends Logging with Serializable {
    --- End diff --
    
    The old style is actually correct. Could you revert this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13315338
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -61,14 +61,15 @@ class ExternalAppendOnlyMap[K, V, C](
         createCombiner: V => C,
         mergeValue: (C, V) => C,
         mergeCombiners: (C, C) => C,
    +    customizedComparator: Comparator[(K, C)] = null,
    --- End diff --
    
    Add this parameter at the end of the argument list to make it more likely to be compatible with code written earlier


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13878347
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -292,14 +296,13 @@ class ExternalAppendOnlyMap[K, V, C](
           }
           // Select a key from the StreamBuffer that holds the lowest key hash
           val minBuffer = mergeHeap.dequeue()
    -      val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
    +      val minPairs = minBuffer.pairs
           var (minKey, minCombiner) = minPairs.remove(0)
    -      assert(minKey.hashCode() == minHash)
     
           // For all other streams that may have this key (i.e. have the same minimum key hash),
           // merge in the corresponding value (if any) from that stream
           val mergedBuffers = ArrayBuffer[StreamBuffer](minBuffer)
    -      while (mergeHeap.length > 0 && mergeHeap.head.minKeyHash == minHash) {
    +      while (mergeHeap.length > 0 && comparator.compare(mergeHeap.head.pairs.head, (minKey, minCombiner)) == 0) {
    --- End diff --
    
    minor: I would probably add a method in `StreamBuffer` called `minPair`, so you can just do `mergeHeap.head.minPair`. Then I would declare a val a few lines above
    
    ```
    val minPair = minPairs.remove(0)
    var (minKey, minCombiner) = minPair
    ...
    ```
    so this line fits under 100 char and is more readable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by xiajunluan <gi...@git.apache.org>.
GitHub user xiajunluan reopened a pull request:

    https://github.com/apache/spark/pull/931

    Fix JIRA-983 and support exteranl sort for sortByKey

    Change class ExternalAppendOnlyMap and make it support customized comparator function(not only sorted by hashCode).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xiajunluan/spark-1 JIRA-983

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/931.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #931
    
----
commit 3092241234e6e85becccbd3d93a2a69b776d71e9
Author: Andrew xia <ju...@intel.com>
Date:   2014-05-31T13:19:35Z

    Fix JIRA-983 and support exteranl sort for sortByKey

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-49251303
  
    @pwendell @xiajunluan I think I'm going to send a new PR based on this because I want to use some of the changes to ExternalAppendOnlyMap in sort-based shuffle. I also noticed an issue in this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13877633
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -41,30 +45,92 @@ import org.apache.spark.{Logging, RangePartitioner}
      *   rdd.sortByKey()
      * }}}
      */
    +
     class OrderedRDDFunctions[K : Ordering : ClassTag,
                               V: ClassTag,
                               P <: Product2[K, V] : ClassTag](
    -    self: RDD[P])
    -  extends Logging with Serializable {
    +  self: RDD[P])
    +extends Logging with Serializable {
     
       private val ordering = implicitly[Ordering[K]]
     
    +  private type SortCombiner = ArrayBuffer[V]
       /**
    -   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    -   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    -   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    -   * order of the keys).
    -   */
    +    * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    +    * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    +    * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    +      * order of the keys).
    +    */
       def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    +    val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
         val part = new RangePartitioner(numPartitions, self, ascending)
         val shuffled = new ShuffledRDD[K, V, P](self, part)
    -    shuffled.mapPartitions(iter => {
    -      val buf = iter.toArray
    +    if (!externalSorting) {
    +      shuffled.mapPartitions(iter => {
    +          val buf = iter.toArray
    +          if (ascending) {
    +            buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
    +          } else {
    +            buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
    +          }
    +        }, preservesPartitioning = true)
    +    } else {
    +      shuffled.mapPartitions(iter => {
    +          val map = createExternalMap(ascending)
    +          while (iter.hasNext) { 
    +            val kv = iter.next()
    +            map.insert(kv._1, kv._2)
    +          }
    +          map.iterator
    +        }).flatMap(elem => {
    +          elem._2.iterator.map(x => (elem._1, x).asInstanceOf[P])
    +        })
    +    }
    --- End diff --
    
    nit: style should be something like
    ```
    shuffled.mapPartitions { iter =>
      ...
    }.flatMap { case (k, c) =>
      c.iterator.map { x => (k, x).asInstanceOf[P] }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-44749434
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-47453105
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13877424
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -41,30 +45,92 @@ import org.apache.spark.{Logging, RangePartitioner}
      *   rdd.sortByKey()
      * }}}
      */
    +
     class OrderedRDDFunctions[K : Ordering : ClassTag,
                               V: ClassTag,
                               P <: Product2[K, V] : ClassTag](
    -    self: RDD[P])
    -  extends Logging with Serializable {
    +  self: RDD[P])
    +extends Logging with Serializable {
     
       private val ordering = implicitly[Ordering[K]]
     
    +  private type SortCombiner = ArrayBuffer[V]
       /**
    -   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    -   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    -   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    -   * order of the keys).
    -   */
    +    * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    +    * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    +    * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    +      * order of the keys).
    +    */
    --- End diff --
    
    nit: revert this indentation? I don't think you intended this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-47453146
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16242/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-48298656
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-50703819
  
    @xiajunluan we can now do this using the ExternalSorter added in #1499: see the new PR at https://github.com/apache/spark/pull/1677. Would you mind closing this old one? The new PR avoids some of the problems I mentioned above with each key having too many values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13878082
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -104,7 +106,8 @@ class ExternalAppendOnlyMap[K, V, C](
       private var _diskBytesSpilled = 0L
     
       private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
    -  private val comparator = new KCComparator[K, C]
    +  protected val comparator = if (customizedComparator == null) new KCComparator[K, C] 
    +    else customizedComparator
    --- End diff --
    
    nit: I would move the whole `if` to the next line:
    ```
    protected val comparator = 
      if (...) ... else ...
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by xiajunluan <gi...@git.apache.org>.
Github user xiajunluan commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-45078849
  
    Hi @mateiz
    
    1.  I will measure the performance influence after I add the pluggable comparator.
    
    2.  I agree with you. if we just implement sortByKey, we should not use combiner(it is for combineByKey related API), it will need firstly aggregate values and after sorting, unfold values for same key. In this patch, I would like to reuse current class and fix this bug quickly. for long-term, I think we should write another similar AppendOnlyMap and ExternalAppendOnlyMap class for sortByKey, and ignore functions such as createCombiner, mergeValue, etc. I will try to design these class later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-44749478
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-48282671
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-45083654
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15445/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13315319
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -17,54 +17,123 @@
     
     package org.apache.spark.rdd
     
    +import java.util.Comparator
    +
    +import scala.collection.mutable.ArrayBuffer
     import scala.reflect.ClassTag
     
    -import org.apache.spark.{Logging, RangePartitioner}
    +import org.apache.spark.{Logging, RangePartitioner, SparkEnv}
    +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
     
     /**
    - * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    - * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    - * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    - * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    - * define their own orderings for custom types, or to override the default ordering.  The implicit
    - * ordering that is in the closest scope will be used.
    - *
    - * {{{
    - *   import org.apache.spark.SparkContext._
    - *
    - *   val rdd: RDD[(String, Int)] = ...
    - *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    - *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    - *   }
    - *
    - *   // Sort by key, using the above case insensitive ordering.
    - *   rdd.sortByKey()
    - * }}}
    - */
    +  * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    +  * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    +  * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    +  * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    +  * define their own orderings for custom types, or to override the default ordering.  The implicit
    +  * ordering that is in the closest scope will be used.
    +  *
    +  * {{{
    +  *   import org.apache.spark.SparkContext._
    +  *
    +  *   val rdd: RDD[(String, Int)] = ...
    +  *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    +  *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    +  *   }
    +  *
    +  *   // Sort by key, using the above case insensitive ordering.
    +  *   rdd.sortByKey()
    +  * }}}
    +  */
    +
     class OrderedRDDFunctions[K : Ordering : ClassTag,
                               V: ClassTag,
                               P <: Product2[K, V] : ClassTag](
    -    self: RDD[P])
    -  extends Logging with Serializable {
    +  self: RDD[P])
    +extends Logging with Serializable {
     
       private val ordering = implicitly[Ordering[K]]
     
    +  private type SortCombiner = ArrayBuffer[V]
       /**
    -   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    -   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    -   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    -   * order of the keys).
    -   */
    +    * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    +    * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    +    * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    +      * order of the keys).
    +    */
       def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    +    val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
         val part = new RangePartitioner(numPartitions, self, ascending)
         val shuffled = new ShuffledRDD[K, V, P](self, part)
    -    shuffled.mapPartitions(iter => {
    -      val buf = iter.toArray
    +        if (!externalSorting) {
    +          shuffled.mapPartitions(iter => {
    +              val buf = iter.toArray
    +              if (ascending) {
    +                buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
    +              } else {
    +                buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
    +              }
    +            }, preservesPartitioning = true)
    +        } else {
    +          shuffled.mapPartitions(iter => {
    +              val map = createExternalMap(ascending)
    +              while (iter.hasNext) { 
    +                val kv = iter.next()
    +                map.insert(kv._1, kv._2)
    +              }
    +              map.iterator
    +            }).flatMap(elem => {
    +              for (value <- elem._2)
    +                yield((elem._1, value).asInstanceOf[P])
    +            })
    +        }
    +  }
    +
    +  private def createExternalMap(ascending: Boolean) :ExternalAppendOnlyMap[K, V, SortCombiner] = {
    +    val createCombiner: (V => SortCombiner) = value => {
    +      val newCombiner = new SortCombiner
    +      newCombiner += value
    +      newCombiner
    +    }
    +    val mergeValue: (SortCombiner, V) => SortCombiner =
    +    (combiner, value) => {
    +      combiner += value
    +      combiner
    +    }
    +    val mergeCombiners: (SortCombiner, SortCombiner) => SortCombiner =
    +    (combiner1, combiner2) => {
    +      combiner1 ++= combiner2
    +    }
    +    new SortedExternalAppendOnlyMap[K, V, SortCombiner](
    +      createCombiner, mergeValue, mergeCombiners, 
    +      new KeyComparator[K, SortCombiner](ascending, ordering))
    +  }
    +
    +  private class KeyComparator[K, SortCombiner](ascending: Boolean, ord: Ordering[K]) 
    +  extends Comparator[(K, SortCombiner)] {
    +    def compare (kc1: (K, SortCombiner), kc2: (K, SortCombiner)): Int = {
           if (ascending) {
    -        buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
    +        if (ord.lt(kc1._1, kc2._1)) -1 else if (ord.equiv(kc1._1, kc2._1)) 0 else 1
    +      } else {
    +        if (ord.gt(kc1._1, kc2._1)) -1 else if (ord.equiv(kc1._1, kc2._1)) 0 else 1
    +      }
    +    }
    +  }
    +
    +  private class SortedExternalAppendOnlyMap[K, V, C](
    +    createCombiner: V => C,
    +    mergeValue: (C, V) => C,
    +    mergeCombiners: (C, C) => C,
    +    customizedComparator: Comparator[(K, C)] = null)
    +  extends ExternalAppendOnlyMap[K, V, C](
    +    createCombiner, mergeValue, mergeCombiners, customizedComparator) {
    +
    +    override def iterator: Iterator[(K, C)] = {
    --- End diff --
    
    (And in this case it could even go directly into ExtenralAppendOnlyMap and you wouldn't need to subclass that)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-46349625
  
    super nit, but can you change JIRA-983 to SPARK-983? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13268434
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -17,54 +17,123 @@
     
     package org.apache.spark.rdd
     
    +import java.util.Comparator
    +
    +import scala.collection.mutable.ArrayBuffer
     import scala.reflect.ClassTag
     
    -import org.apache.spark.{Logging, RangePartitioner}
    +import org.apache.spark.{Logging, RangePartitioner, SparkEnv}
    +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
     
     /**
    - * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    - * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    - * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    - * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    - * define their own orderings for custom types, or to override the default ordering.  The implicit
    - * ordering that is in the closest scope will be used.
    - *
    - * {{{
    - *   import org.apache.spark.SparkContext._
    - *
    - *   val rdd: RDD[(String, Int)] = ...
    - *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    - *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    - *   }
    - *
    - *   // Sort by key, using the above case insensitive ordering.
    - *   rdd.sortByKey()
    - * }}}
    - */
    +  * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    +  * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    +  * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    +  * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    +  * define their own orderings for custom types, or to override the default ordering.  The implicit
    +  * ordering that is in the closest scope will be used.
    +  *
    +  * {{{
    +  *   import org.apache.spark.SparkContext._
    +  *
    +  *   val rdd: RDD[(String, Int)] = ...
    +  *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    +  *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    +  *   }
    +  *
    +  *   // Sort by key, using the above case insensitive ordering.
    +  *   rdd.sortByKey()
    +  * }}}
    +  */
    --- End diff --
    
    It looks like your IDE changed the style of the comments here. Please leave them as they were originally. Our style in Spark is not the default Scala one, it's this:
    ```
    /**
     * aaa
     * bbb
     */
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-48282776
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13315154
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -17,54 +17,123 @@
     
     package org.apache.spark.rdd
     
    +import java.util.Comparator
    +
    +import scala.collection.mutable.ArrayBuffer
     import scala.reflect.ClassTag
     
    -import org.apache.spark.{Logging, RangePartitioner}
    +import org.apache.spark.{Logging, RangePartitioner, SparkEnv}
    +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
     
     /**
    - * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    - * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    - * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    - * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    - * define their own orderings for custom types, or to override the default ordering.  The implicit
    - * ordering that is in the closest scope will be used.
    - *
    - * {{{
    - *   import org.apache.spark.SparkContext._
    - *
    - *   val rdd: RDD[(String, Int)] = ...
    - *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    - *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    - *   }
    - *
    - *   // Sort by key, using the above case insensitive ordering.
    - *   rdd.sortByKey()
    - * }}}
    - */
    +  * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    +  * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    +  * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    +  * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    +  * define their own orderings for custom types, or to override the default ordering.  The implicit
    +  * ordering that is in the closest scope will be used.
    +  *
    +  * {{{
    +  *   import org.apache.spark.SparkContext._
    +  *
    +  *   val rdd: RDD[(String, Int)] = ...
    +  *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    +  *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    +  *   }
    +  *
    +  *   // Sort by key, using the above case insensitive ordering.
    +  *   rdd.sortByKey()
    +  * }}}
    +  */
    +
     class OrderedRDDFunctions[K : Ordering : ClassTag,
                               V: ClassTag,
                               P <: Product2[K, V] : ClassTag](
    -    self: RDD[P])
    -  extends Logging with Serializable {
    +  self: RDD[P])
    +extends Logging with Serializable {
     
       private val ordering = implicitly[Ordering[K]]
     
    +  private type SortCombiner = ArrayBuffer[V]
       /**
    -   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    -   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    -   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    -   * order of the keys).
    -   */
    +    * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    +    * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    +    * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    +      * order of the keys).
    +    */
       def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    +    val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
         val part = new RangePartitioner(numPartitions, self, ascending)
         val shuffled = new ShuffledRDD[K, V, P](self, part)
    -    shuffled.mapPartitions(iter => {
    -      val buf = iter.toArray
    +        if (!externalSorting) {
    +          shuffled.mapPartitions(iter => {
    +              val buf = iter.toArray
    +              if (ascending) {
    +                buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
    +              } else {
    +                buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
    +              }
    +            }, preservesPartitioning = true)
    +        } else {
    +          shuffled.mapPartitions(iter => {
    +              val map = createExternalMap(ascending)
    +              while (iter.hasNext) { 
    +                val kv = iter.next()
    +                map.insert(kv._1, kv._2)
    +              }
    +              map.iterator
    +            }).flatMap(elem => {
    +              for (value <- elem._2)
    +                yield((elem._1, value).asInstanceOf[P])
    +            })
    +        }
    +  }
    +
    +  private def createExternalMap(ascending: Boolean) :ExternalAppendOnlyMap[K, V, SortCombiner] = {
    +    val createCombiner: (V => SortCombiner) = value => {
    +      val newCombiner = new SortCombiner
    +      newCombiner += value
    +      newCombiner
    +    }
    +    val mergeValue: (SortCombiner, V) => SortCombiner =
    +    (combiner, value) => {
    --- End diff --
    
    You can put `(combiner, value)` on the first line, after the `=`; same below


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-48152669
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13878153
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -251,10 +254,11 @@ class ExternalAppendOnlyMap[K, V, C](
           if (it.hasNext) {
             var kc = it.next()
             kcPairs += kc
    -        val minHash = kc._1.hashCode()
    -        while (it.hasNext && kc._1.hashCode() == minHash) {
    -          kc = it.next()
    -          kcPairs += kc
    +        while (it.hasNext) {
    +          var kc1 = it.next()
    +          kcPairs += kc1
    +          if (comparator.compare(kc, kc1) != 0)
    +            return kcPairs
    --- End diff --
    
    nit: add braces for `if`
    ```
    if (...) {
      return kcPairs
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13879317
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -17,9 +17,13 @@
     
     package org.apache.spark.rdd
     
    +import java.util.Comparator
    +
    +import scala.collection.mutable.ArrayBuffer
     import scala.reflect.ClassTag
     
    -import org.apache.spark.{Logging, RangePartitioner}
    +import org.apache.spark.{Logging, RangePartitioner, SparkEnv}
    +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
    --- End diff --
    
    nit: I don't think `AppendOnlyMap` is used?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-49260098
  
    QA results for PR 931:<br>- This patch FAILED unit tests.<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16767/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13948953
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -41,30 +45,92 @@ import org.apache.spark.{Logging, RangePartitioner}
      *   rdd.sortByKey()
      * }}}
      */
    +
     class OrderedRDDFunctions[K : Ordering : ClassTag,
                               V: ClassTag,
                               P <: Product2[K, V] : ClassTag](
    -    self: RDD[P])
    -  extends Logging with Serializable {
    +  self: RDD[P])
    +extends Logging with Serializable {
     
       private val ordering = implicitly[Ordering[K]]
     
    +  private type SortCombiner = ArrayBuffer[V]
       /**
    -   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    -   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    -   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    -   * order of the keys).
    -   */
    +    * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    +    * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    +    * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    +      * order of the keys).
    +    */
       def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    +    val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
         val part = new RangePartitioner(numPartitions, self, ascending)
         val shuffled = new ShuffledRDD[K, V, P](self, part)
    -    shuffled.mapPartitions(iter => {
    -      val buf = iter.toArray
    +    if (!externalSorting) {
    +      shuffled.mapPartitions(iter => {
    +          val buf = iter.toArray
    +          if (ascending) {
    +            buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
    +          } else {
    +            buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
    +          }
    +        }, preservesPartitioning = true)
    +    } else {
    +      shuffled.mapPartitions(iter => {
    +          val map = createExternalMap(ascending)
    +          while (iter.hasNext) { 
    +            val kv = iter.next()
    +            map.insert(kv._1, kv._2)
    +          }
    +          map.iterator
    +        }).flatMap(elem => {
    +          elem._2.iterator.map(x => (elem._1, x).asInstanceOf[P])
    +        })
    +    }
    --- End diff --
    
    This place might be the performance hot place, since we need to reconstruct each tuple. But for current hash map implementation, seems no better solutions to avoid this. I think we should figure out the performance of this manipulation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r15038532
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -56,15 +62,56 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
        * order of the keys).
        */
       def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    +    val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
         val part = new RangePartitioner(numPartitions, self, ascending)
         val shuffled = new ShuffledRDD[K, V, P](self, part)
    -    shuffled.mapPartitions(iter => {
    -      val buf = iter.toArray
    +    if (!externalSorting) {
    +      shuffled.mapPartitions(iter => {
    +        val buf = iter.toArray
    +        if (ascending) {
    +          buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
    +        } else {
    +          buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
    +        }
    +      }, preservesPartitioning = true)
    +    } else {
    +      shuffled.mapPartitions(iter => {
    +        val map = createExternalMap(ascending)
    +        while (iter.hasNext) { 
    +          val kv = iter.next()
    +          map.insert(kv._1, kv.asInstanceOf[P])
    +        }
    +        map.sortIterator
    +      }).flatMap(elem => elem._2)
    +    }
    +  }
    +
    +  private def createExternalMap(ascending: Boolean): ExternalAppendOnlyMap[K, P, SortCombiner] = {
    --- End diff --
    
    It seems unnecessary to have a combiner here: if there are multiple key-value pairs with the same key, this requires them to all fit in memory. Instead we should have an option for the ExternalAppendOnlyMap to not attempt to combine them. I'll work on this in my PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-47453145
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-48703798
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by xiajunluan <gi...@git.apache.org>.
Github user xiajunluan commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-47196343
  
    @pwendell I will update the codes soon, thanks for reminder.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-46351611
  
    @xiajunluan I took a pass over the logic and it looks correct to me. I like your idea of not relying on the hash code anymore; this has caused us a few correctness problems in the past.
    
    In response to your second point about spilling within a key for sort, we have two alternatives: (1) add special handling for sorting in this PR as you suggest, or (2) add general handling to ExternalAppendOnlyMap. I think we should go with the latter, because shuffles in general also suffer for the same problem if we have too many values for one key. Once we fix that, now that sortByKey goes through this map, sorting will get that property for free as well. Also, deferring that change will keep this PR simple.
    
    As @mateiz mentioned, it would be good if you test the performance of a simple shuffle and/or join before and after your changes. The logic in `ExternalIterator` is actually quite sensitive because it is a super hot code path, so we should make sure it doesn't cause a regression.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13315380
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -17,54 +17,123 @@
     
     package org.apache.spark.rdd
     
    +import java.util.Comparator
    +
    +import scala.collection.mutable.ArrayBuffer
     import scala.reflect.ClassTag
     
    -import org.apache.spark.{Logging, RangePartitioner}
    +import org.apache.spark.{Logging, RangePartitioner, SparkEnv}
    +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
     
     /**
    - * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    - * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    - * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    - * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    - * define their own orderings for custom types, or to override the default ordering.  The implicit
    - * ordering that is in the closest scope will be used.
    - *
    - * {{{
    - *   import org.apache.spark.SparkContext._
    - *
    - *   val rdd: RDD[(String, Int)] = ...
    - *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    - *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    - *   }
    - *
    - *   // Sort by key, using the above case insensitive ordering.
    - *   rdd.sortByKey()
    - * }}}
    - */
    +  * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    +  * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    +  * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    +  * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    +  * define their own orderings for custom types, or to override the default ordering.  The implicit
    +  * ordering that is in the closest scope will be used.
    +  *
    +  * {{{
    +  *   import org.apache.spark.SparkContext._
    +  *
    +  *   val rdd: RDD[(String, Int)] = ...
    +  *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    +  *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    +  *   }
    +  *
    +  *   // Sort by key, using the above case insensitive ordering.
    +  *   rdd.sortByKey()
    +  * }}}
    +  */
    +
     class OrderedRDDFunctions[K : Ordering : ClassTag,
                               V: ClassTag,
                               P <: Product2[K, V] : ClassTag](
    -    self: RDD[P])
    -  extends Logging with Serializable {
    +  self: RDD[P])
    +extends Logging with Serializable {
     
       private val ordering = implicitly[Ordering[K]]
     
    +  private type SortCombiner = ArrayBuffer[V]
       /**
    -   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    -   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    -   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    -   * order of the keys).
    -   */
    +    * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    +    * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    +    * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    +      * order of the keys).
    +    */
       def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    +    val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
         val part = new RangePartitioner(numPartitions, self, ascending)
         val shuffled = new ShuffledRDD[K, V, P](self, part)
    -    shuffled.mapPartitions(iter => {
    -      val buf = iter.toArray
    +        if (!externalSorting) {
    +          shuffled.mapPartitions(iter => {
    +              val buf = iter.toArray
    +              if (ascending) {
    +                buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
    +              } else {
    +                buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
    +              }
    +            }, preservesPartitioning = true)
    +        } else {
    +          shuffled.mapPartitions(iter => {
    +              val map = createExternalMap(ascending)
    +              while (iter.hasNext) { 
    +                val kv = iter.next()
    +                map.insert(kv._1, kv._2)
    +              }
    +              map.iterator
    +            }).flatMap(elem => {
    +              for (value <- elem._2)
    +                yield((elem._1, value).asInstanceOf[P])
    +            })
    +        }
    +  }
    +
    +  private def createExternalMap(ascending: Boolean) :ExternalAppendOnlyMap[K, V, SortCombiner] = {
    +    val createCombiner: (V => SortCombiner) = value => {
    +      val newCombiner = new SortCombiner
    +      newCombiner += value
    +      newCombiner
    +    }
    +    val mergeValue: (SortCombiner, V) => SortCombiner =
    +    (combiner, value) => {
    +      combiner += value
    +      combiner
    +    }
    +    val mergeCombiners: (SortCombiner, SortCombiner) => SortCombiner =
    +    (combiner1, combiner2) => {
    +      combiner1 ++= combiner2
    +    }
    +    new SortedExternalAppendOnlyMap[K, V, SortCombiner](
    +      createCombiner, mergeValue, mergeCombiners, 
    +      new KeyComparator[K, SortCombiner](ascending, ordering))
    +  }
    +
    +  private class KeyComparator[K, SortCombiner](ascending: Boolean, ord: Ordering[K]) 
    +  extends Comparator[(K, SortCombiner)] {
    +    def compare (kc1: (K, SortCombiner), kc2: (K, SortCombiner)): Int = {
           if (ascending) {
    -        buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
    +        if (ord.lt(kc1._1, kc2._1)) -1 else if (ord.equiv(kc1._1, kc2._1)) 0 else 1
    +      } else {
    +        if (ord.gt(kc1._1, kc2._1)) -1 else if (ord.equiv(kc1._1, kc2._1)) 0 else 1
    +      }
    --- End diff --
    
    You can just return `ord.compare(kc1._1, kc2._1)` instead of these if statements; it has the same behavior as Comparator.compare.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13315133
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -17,54 +17,123 @@
     
     package org.apache.spark.rdd
     
    +import java.util.Comparator
    +
    +import scala.collection.mutable.ArrayBuffer
     import scala.reflect.ClassTag
     
    -import org.apache.spark.{Logging, RangePartitioner}
    +import org.apache.spark.{Logging, RangePartitioner, SparkEnv}
    +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
     
     /**
    - * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    - * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    - * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    - * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    - * define their own orderings for custom types, or to override the default ordering.  The implicit
    - * ordering that is in the closest scope will be used.
    - *
    - * {{{
    - *   import org.apache.spark.SparkContext._
    - *
    - *   val rdd: RDD[(String, Int)] = ...
    - *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    - *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    - *   }
    - *
    - *   // Sort by key, using the above case insensitive ordering.
    - *   rdd.sortByKey()
    - * }}}
    - */
    +  * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    +  * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    +  * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    +  * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    +  * define their own orderings for custom types, or to override the default ordering.  The implicit
    +  * ordering that is in the closest scope will be used.
    +  *
    +  * {{{
    +  *   import org.apache.spark.SparkContext._
    +  *
    +  *   val rdd: RDD[(String, Int)] = ...
    +  *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    +  *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    +  *   }
    +  *
    +  *   // Sort by key, using the above case insensitive ordering.
    +  *   rdd.sortByKey()
    +  * }}}
    +  */
    +
     class OrderedRDDFunctions[K : Ordering : ClassTag,
                               V: ClassTag,
                               P <: Product2[K, V] : ClassTag](
    -    self: RDD[P])
    -  extends Logging with Serializable {
    +  self: RDD[P])
    +extends Logging with Serializable {
     
       private val ordering = implicitly[Ordering[K]]
     
    +  private type SortCombiner = ArrayBuffer[V]
       /**
    -   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    -   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    -   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    -   * order of the keys).
    -   */
    +    * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    +    * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    +    * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    +      * order of the keys).
    +    */
       def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    +    val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
         val part = new RangePartitioner(numPartitions, self, ascending)
         val shuffled = new ShuffledRDD[K, V, P](self, part)
    -    shuffled.mapPartitions(iter => {
    -      val buf = iter.toArray
    +        if (!externalSorting) {
    +          shuffled.mapPartitions(iter => {
    +              val buf = iter.toArray
    +              if (ascending) {
    +                buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
    +              } else {
    +                buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
    +              }
    +            }, preservesPartitioning = true)
    +        } else {
    +          shuffled.mapPartitions(iter => {
    +              val map = createExternalMap(ascending)
    +              while (iter.hasNext) { 
    +                val kv = iter.next()
    +                map.insert(kv._1, kv._2)
    +              }
    +              map.iterator
    +            }).flatMap(elem => {
    +              for (value <- elem._2)
    +                yield((elem._1, value).asInstanceOf[P])
    +            })
    +        }
    +  }
    +
    +  private def createExternalMap(ascending: Boolean) :ExternalAppendOnlyMap[K, V, SortCombiner] = {
    --- End diff --
    
    Should be `): ExternalAppendOnlyMap`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-49260071
  
    QA tests have started for PR 931. This patch DID NOT merge cleanly! <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16767/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13315234
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -17,54 +17,123 @@
     
     package org.apache.spark.rdd
     
    +import java.util.Comparator
    +
    +import scala.collection.mutable.ArrayBuffer
     import scala.reflect.ClassTag
     
    -import org.apache.spark.{Logging, RangePartitioner}
    +import org.apache.spark.{Logging, RangePartitioner, SparkEnv}
    +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
     
     /**
    - * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    - * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    - * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    - * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    - * define their own orderings for custom types, or to override the default ordering.  The implicit
    - * ordering that is in the closest scope will be used.
    - *
    - * {{{
    - *   import org.apache.spark.SparkContext._
    - *
    - *   val rdd: RDD[(String, Int)] = ...
    - *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    - *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    - *   }
    - *
    - *   // Sort by key, using the above case insensitive ordering.
    - *   rdd.sortByKey()
    - * }}}
    - */
    +  * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    +  * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    +  * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    +  * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    +  * define their own orderings for custom types, or to override the default ordering.  The implicit
    +  * ordering that is in the closest scope will be used.
    +  *
    +  * {{{
    +  *   import org.apache.spark.SparkContext._
    +  *
    +  *   val rdd: RDD[(String, Int)] = ...
    +  *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    +  *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    +  *   }
    +  *
    +  *   // Sort by key, using the above case insensitive ordering.
    +  *   rdd.sortByKey()
    +  * }}}
    +  */
    +
     class OrderedRDDFunctions[K : Ordering : ClassTag,
                               V: ClassTag,
                               P <: Product2[K, V] : ClassTag](
    -    self: RDD[P])
    -  extends Logging with Serializable {
    +  self: RDD[P])
    +extends Logging with Serializable {
     
       private val ordering = implicitly[Ordering[K]]
     
    +  private type SortCombiner = ArrayBuffer[V]
       /**
    -   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    -   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    -   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    -   * order of the keys).
    -   */
    +    * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    +    * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    +    * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    +      * order of the keys).
    +    */
       def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    +    val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
         val part = new RangePartitioner(numPartitions, self, ascending)
         val shuffled = new ShuffledRDD[K, V, P](self, part)
    -    shuffled.mapPartitions(iter => {
    -      val buf = iter.toArray
    +        if (!externalSorting) {
    +          shuffled.mapPartitions(iter => {
    +              val buf = iter.toArray
    +              if (ascending) {
    +                buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
    +              } else {
    +                buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
    +              }
    +            }, preservesPartitioning = true)
    +        } else {
    +          shuffled.mapPartitions(iter => {
    +              val map = createExternalMap(ascending)
    +              while (iter.hasNext) { 
    +                val kv = iter.next()
    +                map.insert(kv._1, kv._2)
    +              }
    +              map.iterator
    +            }).flatMap(elem => {
    +              for (value <- elem._2)
    +                yield((elem._1, value).asInstanceOf[P])
    --- End diff --
    
    Also I'm not 100% sure you need the asInstanceOf[P]; we didn't have it before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-45083651
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13877284
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -41,30 +45,92 @@ import org.apache.spark.{Logging, RangePartitioner}
      *   rdd.sortByKey()
      * }}}
      */
    +
     class OrderedRDDFunctions[K : Ordering : ClassTag,
                               V: ClassTag,
                               P <: Product2[K, V] : ClassTag](
    -    self: RDD[P])
    -  extends Logging with Serializable {
    +  self: RDD[P])
    +extends Logging with Serializable {
     
       private val ordering = implicitly[Ordering[K]]
     
    +  private type SortCombiner = ArrayBuffer[V]
       /**
    -   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    -   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    -   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    -   * order of the keys).
    -   */
    +    * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    +    * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    +    * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    +      * order of the keys).
    +    */
       def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    +    val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
         val part = new RangePartitioner(numPartitions, self, ascending)
         val shuffled = new ShuffledRDD[K, V, P](self, part)
    -    shuffled.mapPartitions(iter => {
    -      val buf = iter.toArray
    +    if (!externalSorting) {
    +      shuffled.mapPartitions(iter => {
    +          val buf = iter.toArray
    +          if (ascending) {
    +            buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
    +          } else {
    +            buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
    +          }
    +        }, preservesPartitioning = true)
    +    } else {
    +      shuffled.mapPartitions(iter => {
    +          val map = createExternalMap(ascending)
    +          while (iter.hasNext) { 
    +            val kv = iter.next()
    +            map.insert(kv._1, kv._2)
    +          }
    +          map.iterator
    +        }).flatMap(elem => {
    +          elem._2.iterator.map(x => (elem._1, x).asInstanceOf[P])
    +        })
    +    }
    +  }
    +
    +  private def createExternalMap(ascending: Boolean): ExternalAppendOnlyMap[K, V, SortCombiner] = {
    +    val createCombiner: (V => SortCombiner) = value => {
    +      val newCombiner = new SortCombiner
    +      newCombiner += value
    +      newCombiner
    +    }
    +    val mergeValue: (SortCombiner, V) => SortCombiner = (combiner, value) => {
    +      combiner += value
    +      combiner
    +    }
    +    val mergeCombiners: (SortCombiner, SortCombiner) => SortCombiner = (combiner1, combiner2) => {
    +      combiner1 ++= combiner2
    +    }
    +    new SortedExternalAppendOnlyMap[K, V, SortCombiner](
    +      createCombiner, mergeValue, mergeCombiners, 
    +      new KeyComparator[K, SortCombiner](ascending, ordering))
    +  }
    +
    +  private class KeyComparator[K, SortCombiner](ascending: Boolean, ord: Ordering[K]) 
    +  extends Comparator[(K, SortCombiner)] {
    +    def compare (kc1: (K, SortCombiner), kc2: (K, SortCombiner)): Int = {
           if (ascending) {
    -        buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
    +        ord.compare(kc1._1, kc2._1)
    +      } else {
    +        ord.compare(kc2._1, kc1._1)
    +      }
    +    }
    +  }
    +
    +  private class SortedExternalAppendOnlyMap[K, V, C](
    --- End diff --
    
    You could save yourself a class if you just add a boolean field to ExternalAppendOnlyMap (e.g. `sortOutput`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-45083514
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-44793662
  
    Also FYI `sbt scalastyle` is complaining about some issues: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15322/console


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13878007
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -62,13 +62,15 @@ class ExternalAppendOnlyMap[K, V, C](
         mergeValue: (C, V) => C,
         mergeCombiners: (C, C) => C,
         serializer: Serializer = SparkEnv.get.serializer,
    -    blockManager: BlockManager = SparkEnv.get.blockManager)
    +    blockManager: BlockManager = SparkEnv.get.blockManager,
    +    customizedComparator: Comparator[(K, C)] = null
    +  )
       extends Iterable[(K, C)] with Serializable with Logging {
     
       import ExternalAppendOnlyMap._
     
    -  private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
    -  private val spilledMaps = new ArrayBuffer[DiskMapIterator]
    +  protected var currentMap = new SizeTrackingAppendOnlyMap[K, C]
    --- End diff --
    
    If you replaced your `SortedExternalAppendOnlyMap`, you can revert all of these `protected`s to `private`s


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13315344
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -104,7 +105,7 @@ class ExternalAppendOnlyMap[K, V, C](
       private var _diskBytesSpilled = 0L
     
       private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
    -  private val comparator = new KCComparator[K, C]
    +  protected val comparator = if (customizedComparator == null) new KCComparator[K, C] else customizedComparator
    --- End diff --
    
    This line might be longer than 100 chars


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-48994210
  
    Jenkins, test this please. @xiajunluan actually I think the main issue now is that this isn't merging cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by xiajunluan <gi...@git.apache.org>.
Github user xiajunluan commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-45059292
  
    hi @matei
    
    1.I will measure the performance influence after I add the pluggable comparator
    2.I agree with you. if we just implement sortByKey, we should not use combiner(it is for combineByKey related API), it will need firstly aggregate values and after sorting, unfold values for same key. In this patch, I would like to reuse current class and fix this bug quickly. for long-term, I think we should write another similar AppendOnlyMap and ExternalAppendOnlyMap class for sortByKey, and ignore functions such as createCombiner, mergeValue, etc. I will try to design these class later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13315241
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -17,54 +17,123 @@
     
     package org.apache.spark.rdd
     
    +import java.util.Comparator
    +
    +import scala.collection.mutable.ArrayBuffer
     import scala.reflect.ClassTag
     
    -import org.apache.spark.{Logging, RangePartitioner}
    +import org.apache.spark.{Logging, RangePartitioner, SparkEnv}
    +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
     
     /**
    - * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    - * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    - * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    - * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    - * define their own orderings for custom types, or to override the default ordering.  The implicit
    - * ordering that is in the closest scope will be used.
    - *
    - * {{{
    - *   import org.apache.spark.SparkContext._
    - *
    - *   val rdd: RDD[(String, Int)] = ...
    - *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    - *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    - *   }
    - *
    - *   // Sort by key, using the above case insensitive ordering.
    - *   rdd.sortByKey()
    - * }}}
    - */
    +  * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
    +  * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
    +  * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
    +  * scope.  Ordering objects already exist for all of the standard primitive types.  Users can also
    +  * define their own orderings for custom types, or to override the default ordering.  The implicit
    +  * ordering that is in the closest scope will be used.
    +  *
    +  * {{{
    +  *   import org.apache.spark.SparkContext._
    +  *
    +  *   val rdd: RDD[(String, Int)] = ...
    +  *   implicit val caseInsensitiveOrdering = new Ordering[String] {
    +  *     override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
    +  *   }
    +  *
    +  *   // Sort by key, using the above case insensitive ordering.
    +  *   rdd.sortByKey()
    +  * }}}
    +  */
    +
     class OrderedRDDFunctions[K : Ordering : ClassTag,
                               V: ClassTag,
                               P <: Product2[K, V] : ClassTag](
    -    self: RDD[P])
    -  extends Logging with Serializable {
    +  self: RDD[P])
    +extends Logging with Serializable {
     
       private val ordering = implicitly[Ordering[K]]
     
    +  private type SortCombiner = ArrayBuffer[V]
       /**
    -   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    -   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    -   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    -   * order of the keys).
    -   */
    +    * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    +    * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    +    * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    +      * order of the keys).
    +    */
       def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    +    val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
         val part = new RangePartitioner(numPartitions, self, ascending)
         val shuffled = new ShuffledRDD[K, V, P](self, part)
    -    shuffled.mapPartitions(iter => {
    -      val buf = iter.toArray
    +        if (!externalSorting) {
    --- End diff --
    
    Seems like you have some tabs in here, or too much indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-47453099
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-48983430
  
    @xiajunluan would you have time to update this in the next few days? It's pretty close but there were those two small issues Andrew pointed out as well as a compile error. This would be great to get into 1.1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13878127
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -251,10 +254,11 @@ class ExternalAppendOnlyMap[K, V, C](
           if (it.hasNext) {
             var kc = it.next()
             kcPairs += kc
    -        val minHash = kc._1.hashCode()
    -        while (it.hasNext && kc._1.hashCode() == minHash) {
    -          kc = it.next()
    -          kcPairs += kc
    +        while (it.hasNext) {
    +          var kc1 = it.next()
    --- End diff --
    
    nit: This can be a val (same with `var kc` above)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13877732
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -41,30 +45,92 @@ import org.apache.spark.{Logging, RangePartitioner}
      *   rdd.sortByKey()
      * }}}
      */
    +
     class OrderedRDDFunctions[K : Ordering : ClassTag,
                               V: ClassTag,
                               P <: Product2[K, V] : ClassTag](
    -    self: RDD[P])
    -  extends Logging with Serializable {
    +  self: RDD[P])
    +extends Logging with Serializable {
     
       private val ordering = implicitly[Ordering[K]]
     
    +  private type SortCombiner = ArrayBuffer[V]
       /**
    -   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    -   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    -   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    -   * order of the keys).
    -   */
    +    * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    +    * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    +    * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    +      * order of the keys).
    +    */
       def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    +    val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
         val part = new RangePartitioner(numPartitions, self, ascending)
         val shuffled = new ShuffledRDD[K, V, P](self, part)
    -    shuffled.mapPartitions(iter => {
    -      val buf = iter.toArray
    +    if (!externalSorting) {
    +      shuffled.mapPartitions(iter => {
    +          val buf = iter.toArray
    +          if (ascending) {
    +            buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
    +          } else {
    +            buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
    +          }
    +        }, preservesPartitioning = true)
    +    } else {
    +      shuffled.mapPartitions(iter => {
    +          val map = createExternalMap(ascending)
    +          while (iter.hasNext) { 
    +            val kv = iter.next()
    +            map.insert(kv._1, kv._2)
    +          }
    +          map.iterator
    +        }).flatMap(elem => {
    +          elem._2.iterator.map(x => (elem._1, x).asInstanceOf[P])
    +        })
    +    }
    +  }
    +
    +  private def createExternalMap(ascending: Boolean): ExternalAppendOnlyMap[K, V, SortCombiner] = {
    +    val createCombiner: (V => SortCombiner) = value => {
    +      val newCombiner = new SortCombiner
    +      newCombiner += value
    +      newCombiner
    +    }
    +    val mergeValue: (SortCombiner, V) => SortCombiner = (combiner, value) => {
    +      combiner += value
    +      combiner
    +    }
    +    val mergeCombiners: (SortCombiner, SortCombiner) => SortCombiner = (combiner1, combiner2) => {
    +      combiner1 ++= combiner2
    +    }
    +    new SortedExternalAppendOnlyMap[K, V, SortCombiner](
    +      createCombiner, mergeValue, mergeCombiners, 
    +      new KeyComparator[K, SortCombiner](ascending, ordering))
    +  }
    +
    +  private class KeyComparator[K, SortCombiner](ascending: Boolean, ord: Ordering[K]) 
    +  extends Comparator[(K, SortCombiner)] {
    --- End diff --
    
    nit: indent by 2 spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-48152671
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16364/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-47127748
  
    @xiajunluan are you going to be able to address these soon? We'd like to get this merged quickly if possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-45083501
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13877967
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -41,30 +45,92 @@ import org.apache.spark.{Logging, RangePartitioner}
      *   rdd.sortByKey()
      * }}}
      */
    +
     class OrderedRDDFunctions[K : Ordering : ClassTag,
                               V: ClassTag,
                               P <: Product2[K, V] : ClassTag](
    -    self: RDD[P])
    -  extends Logging with Serializable {
    +  self: RDD[P])
    +extends Logging with Serializable {
     
       private val ordering = implicitly[Ordering[K]]
     
    +  private type SortCombiner = ArrayBuffer[V]
       /**
    -   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    -   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    -   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    -   * order of the keys).
    -   */
    +    * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
    +    * `collect` or `save` on the resulting RDD will return or output an ordered list of records
    +    * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
    +      * order of the keys).
    +    */
       def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    +    val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
         val part = new RangePartitioner(numPartitions, self, ascending)
         val shuffled = new ShuffledRDD[K, V, P](self, part)
    -    shuffled.mapPartitions(iter => {
    -      val buf = iter.toArray
    +    if (!externalSorting) {
    +      shuffled.mapPartitions(iter => {
    +          val buf = iter.toArray
    +          if (ascending) {
    +            buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
    +          } else {
    +            buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
    +          }
    +        }, preservesPartitioning = true)
    +    } else {
    +      shuffled.mapPartitions(iter => {
    +          val map = createExternalMap(ascending)
    +          while (iter.hasNext) { 
    +            val kv = iter.next()
    +            map.insert(kv._1, kv._2)
    +          }
    +          map.iterator
    +        }).flatMap(elem => {
    +          elem._2.iterator.map(x => (elem._1, x).asInstanceOf[P])
    +        })
    +    }
    --- End diff --
    
    (and follow this style guide in other places of your code)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-48804752
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-48010635
  
    @xiajunluan this is failing style checks and also isn't merging cleanly. You can run the tests locally with `dev/run-tests` and update it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/931#issuecomment-49254607
  
    Hey, so I rebased this PR and made it mergeable in my own branch, https://github.com/mateiz/spark/tree/spark-931. However, in doing this I realized that there might be some problems here that are fundamental.
    
    The main issue is that AppendOnlyMap, and ExternalAppendOnlyMap, require there's only one value for each key. The in-memory AOM will be very inefficient otherwise, and the EAOM depends on it. This means that for sort, we have to create (Key, ArrayBuffer[Value]) pairs, which will consume more memory by default than our in-memory sort, and will make us crash if there are too many identical values (something we do today but which may happen sooner here). Thus it seems that long-term we need a very different solution here, basically an external merge-sort. 
    
    A second, possibly less serious issue is that the changes to EAOM to use comparator.compare instead of hash code equality make it less efficient in the default hashing-based case, because instead of saving one key's hash code in an Int and reusing it to compare with other keys in various places, we always recompute it when we compare each pair of elements.
    
    For these reasons I'd actually hold off on merging this (even my merged version) until we implement an external merge-sort as part of sort-based shuffle. Then we can use that data structure here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/931#discussion_r13877228
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -338,7 +341,7 @@ class ExternalAppendOnlyMap[K, V, C](
     
    --- End diff --
    
    `minKeyHash` is no longer used (github won't let me comment a few lines above)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---