You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/30 12:48:06 UTC

[GitHub] [spark] zhengruifeng opened a new pull request, #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

zhengruifeng opened a new pull request, #37728:
URL: https://github.com/apache/spark/pull/37728

   ### What changes were proposed in this pull request?
   use `Array` instead of `BoundedPriorityQueue` to store intermediate results
   
   ### Why are the changes needed?
   1, encountered a case that `RDD.takeOrdered` fails due to `Total size of serialized results of xxx tasks (... MiB) is bigger than spark.driver.`
   
   2, performance improvement:
   
   `bin/spark-shell --driver-memory=4G`
   
   ```scala
   Seq(10, 100, 1000, 10000, 50000, 100000).foreach { n => val start = System.currentTimeMillis; Seq.range(0, 10).foreach(_ => sc.range(0, 100000000, 1, 1000).top(n)); val duration = System.currentTimeMillis - start; println(s"n=$n, duration=$duration") }
   ```
   
    duration | n=10 | n=100 | n=1,000 | n=10,000 | n=50,000 | n=100,000
   -- | -- | -- | -- | -- | -- | --
   master | 2,552 | 2,197 | 2,543 | 10,003 | 58,552 | org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 763 tasks (1024.6 MiB) is bigger than spark.driver
   this PR | 2,556 | 2,138 | 2,196 | 7,371  | 33,903 | 66,895
   this PR + treeReduce | 9,160 | 9,748 | 9,728  | 11,441  | 17,216 | 24,728
   
   it is strange that `this PR + treeReduce` turns out to be slowest when `n` is small, so still use `reduce` in this PR.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   added UT
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r959257404


##########
sql/core/src/test/resources/tpcds-query-results/v1_4/q77.sql.out:
##########
@@ -4,8 +4,8 @@
 struct<channel:string,id:int,sales:decimal(27,2),returns:decimal(27,2),profit:decimal(28,2)>
 -- !query output
 NULL	NULL	238379361.39	11949589.80	-69066318.65
-catalog channel	NULL	116209.49	1989207.49	-1103184.43

Review Comment:
   `RDD.takeOrdered` and `RDD.top` are used in many place.
   In SQL, it's used in [TakeOrderedAndProjectExec](https://github.com/apache/spark/blob/d1ca0ea4ae55bf19c7569eacae5b377c3340dde9/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L285-L300), to pass the `TPCDS querys`, I had to swap two line in the results of `q77` and `q77a`. the [`q77`](https://github.com/apache/spark/blob/a2d8d767d933321426a4eb9df1583e017722d7d6/sql/core/src/test/resources/tpcds/q77.sql#L99-L100) and [`q77a`](https://github.com/apache/spark/blob/5f653d4f7c84e6147cd323cd650da65e0381ebe8/sql/core/src/test/resources/tpcds-v2.7.0/q77a.sql#L119-L121) both end with `order by channel, id limit 100`, and the swapped lines have the same `(channel, id)`, so I guess this change is fine.
   
   But I think it still need you to take a look @cloud-fan @HyukjinKwon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r959245366


##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -1523,22 +1523,28 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || this.getNumPartitions == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
-        mapRDDs.reduce { (queue1, queue2) =>
-          queue1 ++= queue2
-          queue1
-        }.toArray.sorted(ord)
+      this.mapPartitionsWithIndex { case (pid, iter) =>
+        if (iter.nonEmpty) {
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)
+        } else if (pid == 0) {
+          // make sure partition 0 always returns an array to avoid reduce on empty RDD
+          Iterator.single(Array.empty[T])
+        } else {
+          Iterator.empty
+        }
+      }.reduce { (array1, array2) =>
+        val iterator = collectionUtils.mergeOrdered[T](Seq(array1, array2))(ord)
+        val array = Array.ofDim[T](math.min(num, array1.length + array2.length))
+        var i = 0
+        while (i < array.length) {
+          array(i) = iterator.next()
+          i += 1
+        }

Review Comment:
   `iterator.copyToArray(array)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r958815540


##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -1523,22 +1523,21 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || this.getNumPartitions == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
-        mapRDDs.reduce { (queue1, queue2) =>
-          queue1 ++= queue2
-          queue1
-        }.toArray.sorted(ord)
+      this.mapPartitionsWithIndex { case (pid, iter) =>
+        if (iter.nonEmpty) {
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)
+        } else if (pid == 0) {
+          // make sure partition 0 always returns an array to avoid reduce on empty RDD
+          Iterator.single(Array.empty[T])
+        } else {
+          Iterator.empty
+        }
+      }.reduce { (array1, array2) =>
+        collectionUtils.mergeOrdered[T](Seq(array1, array2))(ord).take(num).toArray

Review Comment:
   Nit: you can statically create the array upfront (`min(num, array1.length + array2.length)` ) and copy into it, instead of `toArray`



##########
core/src/main/scala/org/apache/spark/util/collection/Utils.scala:
##########
@@ -37,6 +37,23 @@ private[spark] object Utils {
     ordering.leastOf(input.asJava, num).iterator.asScala
   }
 
+  /**
+   * Returns an iterator over the merged contents of all given iterators,
+   * traversing every element of the input iterators.
+   * Equivalent entries will not be de-duplicated.
+   *
+   * Callers must ensure that the source iterators are already sorted by
+   * the same ordering `ord`, otherwise the result is likely to be incorrect.
+   */
+  def mergeOrdered[T](iterators: Iterable[TraversableOnce[T]])(

Review Comment:
   nit: Rename `iterators` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r959208095


##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -1523,22 +1523,21 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || this.getNumPartitions == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
-        mapRDDs.reduce { (queue1, queue2) =>
-          queue1 ++= queue2
-          queue1
-        }.toArray.sorted(ord)
+      this.mapPartitionsWithIndex { case (pid, iter) =>
+        if (iter.nonEmpty) {
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)
+        } else if (pid == 0) {
+          // make sure partition 0 always returns an array to avoid reduce on empty RDD
+          Iterator.single(Array.empty[T])
+        } else {
+          Iterator.empty
+        }
+      }.reduce { (array1, array2) =>
+        collectionUtils.mergeOrdered[T](Seq(array1, array2))(ord).take(num).toArray

Review Comment:
   Wondering if there was any impact on the perf numbers, or it was essentially noise :-) (might affect gc more I guess - with large enough buffer and higher number of tasks per executors probably)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng closed pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
zhengruifeng closed pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered
URL: https://github.com/apache/spark/pull/37728


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r960145512


##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -1523,22 +1523,21 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || this.getNumPartitions == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
-        mapRDDs.reduce { (queue1, queue2) =>
-          queue1 ++= queue2
-          queue1
-        }.toArray.sorted(ord)
+      this.mapPartitionsWithIndex { case (pid, iter) =>
+        if (iter.nonEmpty) {
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)
+        } else if (pid == 0) {
+          // make sure partition 0 always returns an array to avoid reduce on empty RDD
+          Iterator.single(Array.empty[T])
+        } else {
+          Iterator.empty
+        }
+      }.reduce { (array1, array2) =>
+        collectionUtils.mergeOrdered[T](Seq(array1, array2))(ord).take(num).toArray

Review Comment:
   When both `num` and  `numParitions` are large, I think we should use `treeReduce` instead, then there will be no bottleneck at driver.
   I'm not sure whether it is worthwhile to add anther configuration for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #37728:
URL: https://github.com/apache/spark/pull/37728#issuecomment-1233622631

   Merged into master, thank you all!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r959208095


##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -1523,22 +1523,21 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || this.getNumPartitions == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
-        mapRDDs.reduce { (queue1, queue2) =>
-          queue1 ++= queue2
-          queue1
-        }.toArray.sorted(ord)
+      this.mapPartitionsWithIndex { case (pid, iter) =>
+        if (iter.nonEmpty) {
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)
+        } else if (pid == 0) {
+          // make sure partition 0 always returns an array to avoid reduce on empty RDD
+          Iterator.single(Array.empty[T])
+        } else {
+          Iterator.empty
+        }
+      }.reduce { (array1, array2) =>
+        collectionUtils.mergeOrdered[T](Seq(array1, array2))(ord).take(num).toArray

Review Comment:
   Wondering if there was any impact on the perf numbers :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r959033831


##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -1523,22 +1523,21 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || this.getNumPartitions == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
-        mapRDDs.reduce { (queue1, queue2) =>
-          queue1 ++= queue2
-          queue1
-        }.toArray.sorted(ord)
+      this.mapPartitionsWithIndex { case (pid, iter) =>
+        if (iter.nonEmpty) {
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)
+        } else if (pid == 0) {
+          // make sure partition 0 always returns an array to avoid reduce on empty RDD
+          Iterator.single(Array.empty[T])
+        } else {
+          Iterator.empty
+        }
+      }.reduce { (array1, array2) =>
+        collectionUtils.mergeOrdered[T](Seq(array1, array2))(ord).take(num).toArray

Review Comment:
   good point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r959217849


##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -1523,22 +1523,28 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || this.getNumPartitions == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
-        mapRDDs.reduce { (queue1, queue2) =>
-          queue1 ++= queue2
-          queue1
-        }.toArray.sorted(ord)
+      this.mapPartitionsWithIndex { case (pid, iter) =>
+        if (iter.nonEmpty) {
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)
+        } else if (pid == 0) {
+          // make sure partition 0 always returns an array to avoid reduce on empty RDD
+          Iterator.single(Array.empty[T])
+        } else {
+          Iterator.empty
+        }
+      }.reduce { (array1, array2) =>
+        val iterator = collectionUtils.mergeOrdered[T](Seq(array1, array2))(ord)
+        val array = Array.ofDim[T](math.min(num, array1.length + array2.length))
+        var i = 0
+        while (i < array.length) {
+          array(i) = iterator.next()
+          i += 1
+        }

Review Comment:
   oh, let me update it, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r959190331


##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -1523,22 +1523,28 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || this.getNumPartitions == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
-        mapRDDs.reduce { (queue1, queue2) =>
-          queue1 ++= queue2
-          queue1
-        }.toArray.sorted(ord)
+      this.mapPartitionsWithIndex { case (pid, iter) =>
+        if (iter.nonEmpty) {
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)

Review Comment:
   do you mean `collectionUtils.takeOrdered`?
   both existing `takeOrdered` and  newly added `mergeOrdered` call the Guava implementations, I think they should be irrelative to the Scala version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r959038961


##########
core/src/main/scala/org/apache/spark/util/collection/Utils.scala:
##########
@@ -37,6 +37,23 @@ private[spark] object Utils {
     ordering.leastOf(input.asJava, num).iterator.asScala
   }
 
+  /**
+   * Returns an iterator over the merged contents of all given iterators,
+   * traversing every element of the input iterators.
+   * Equivalent entries will not be de-duplicated.
+   *
+   * Callers must ensure that the source iterators are already sorted by
+   * the same ordering `ord`, otherwise the result is likely to be incorrect.
+   */
+  def mergeOrdered[T](iterators: Iterable[TraversableOnce[T]])(

Review Comment:
   yeah, let me rename it as `inputs`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r959341535


##########
sql/core/src/test/resources/tpcds-query-results/v1_4/q77.sql.out:
##########
@@ -4,8 +4,8 @@
 struct<channel:string,id:int,sales:decimal(27,2),returns:decimal(27,2),profit:decimal(28,2)>
 -- !query output
 NULL	NULL	238379361.39	11949589.80	-69066318.65
-catalog channel	NULL	116209.49	1989207.49	-1103184.43

Review Comment:
   this is fine



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r959247297


##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -1523,22 +1523,28 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || this.getNumPartitions == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
-        mapRDDs.reduce { (queue1, queue2) =>
-          queue1 ++= queue2
-          queue1
-        }.toArray.sorted(ord)
+      this.mapPartitionsWithIndex { case (pid, iter) =>
+        if (iter.nonEmpty) {
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)
+        } else if (pid == 0) {
+          // make sure partition 0 always returns an array to avoid reduce on empty RDD
+          Iterator.single(Array.empty[T])
+        } else {
+          Iterator.empty
+        }
+      }.reduce { (array1, array2) =>
+        val iterator = collectionUtils.mergeOrdered[T](Seq(array1, array2))(ord)
+        val array = Array.ofDim[T](math.min(num, array1.length + array2.length))
+        var i = 0
+        while (i < array.length) {
+          array(i) = iterator.next()
+          i += 1
+        }

Review Comment:
   yep



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
srowen commented on PR #37728:
URL: https://github.com/apache/spark/pull/37728#issuecomment-1233209735

   Seems fine. Maybe could switch to treeReduce when the size is large, but no big deal


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r960290810


##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -1523,22 +1523,21 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || this.getNumPartitions == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
-        mapRDDs.reduce { (queue1, queue2) =>
-          queue1 ++= queue2
-          queue1
-        }.toArray.sorted(ord)
+      this.mapPartitionsWithIndex { case (pid, iter) =>
+        if (iter.nonEmpty) {
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)
+        } else if (pid == 0) {
+          // make sure partition 0 always returns an array to avoid reduce on empty RDD
+          Iterator.single(Array.empty[T])
+        } else {
+          Iterator.empty
+        }
+      }.reduce { (array1, array2) =>
+        collectionUtils.mergeOrdered[T](Seq(array1, array2))(ord).take(num).toArray

Review Comment:
   if the concern is that one new array will be created at each merge, I guess we can switch to an [in-place merge sort](https://www.techiedelight.com/inplace-merge-two-sorted-arrays/)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r959250334


##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -1523,22 +1523,28 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || this.getNumPartitions == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
-        mapRDDs.reduce { (queue1, queue2) =>
-          queue1 ++= queue2
-          queue1
-        }.toArray.sorted(ord)
+      this.mapPartitionsWithIndex { case (pid, iter) =>
+        if (iter.nonEmpty) {
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)
+        } else if (pid == 0) {
+          // make sure partition 0 always returns an array to avoid reduce on empty RDD
+          Iterator.single(Array.empty[T])
+        } else {
+          Iterator.empty
+        }
+      }.reduce { (array1, array2) =>
+        val iterator = collectionUtils.mergeOrdered[T](Seq(array1, array2))(ord)
+        val array = Array.ofDim[T](math.min(num, array1.length + array2.length))
+        var i = 0
+        while (i < array.length) {
+          array(i) = iterator.next()
+          i += 1
+        }

Review Comment:
   I suggest use `3-arg version copyToArray` method directly due to 
   
   ```
     @deprecatedOverriding("This should always forward to the 3-arg version of this method", since = "2.13.4")
     def copyToArray[B >: A](xs: Array[B]): Int = copyToArray(xs, 0, Int.MaxValue)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r959170859


##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -1523,22 +1523,28 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || this.getNumPartitions == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
-        mapRDDs.reduce { (queue1, queue2) =>
-          queue1 ++= queue2
-          queue1
-        }.toArray.sorted(ord)
+      this.mapPartitionsWithIndex { case (pid, iter) =>
+        if (iter.nonEmpty) {
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)

Review Comment:
   Will the same benefits when using Scala 2.13?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r959262174


##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -1523,22 +1523,28 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || this.getNumPartitions == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
-        mapRDDs.reduce { (queue1, queue2) =>
-          queue1 ++= queue2
-          queue1
-        }.toArray.sorted(ord)
+      this.mapPartitionsWithIndex { case (pid, iter) =>
+        if (iter.nonEmpty) {
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)
+        } else if (pid == 0) {
+          // make sure partition 0 always returns an array to avoid reduce on empty RDD
+          Iterator.single(Array.empty[T])
+        } else {
+          Iterator.empty
+        }
+      }.reduce { (array1, array2) =>
+        val iterator = collectionUtils.mergeOrdered[T](Seq(array1, array2))(ord)
+        val array = Array.ofDim[T](math.min(num, array1.length + array2.length))
+        var i = 0
+        while (i < array.length) {
+          array(i) = iterator.next()
+          i += 1
+        }

Review Comment:
   nice!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r959207762


##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -1523,22 +1523,28 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || this.getNumPartitions == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
-        mapRDDs.reduce { (queue1, queue2) =>
-          queue1 ++= queue2
-          queue1
-        }.toArray.sorted(ord)
+      this.mapPartitionsWithIndex { case (pid, iter) =>
+        if (iter.nonEmpty) {
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)
+        } else if (pid == 0) {
+          // make sure partition 0 always returns an array to avoid reduce on empty RDD
+          Iterator.single(Array.empty[T])
+        } else {
+          Iterator.empty
+        }
+      }.reduce { (array1, array2) =>
+        val iterator = collectionUtils.mergeOrdered[T](Seq(array1, array2))(ord)
+        val array = Array.ofDim[T](math.min(num, array1.length + array2.length))
+        var i = 0
+        while (i < array.length) {
+          array(i) = iterator.next()
+          i += 1
+        }

Review Comment:
   nit: `copyToArray` instead ? (should have mentioned it in my initial comment :-( )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r959254007


##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -1523,22 +1523,28 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || this.getNumPartitions == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
-        mapRDDs.reduce { (queue1, queue2) =>
-          queue1 ++= queue2
-          queue1
-        }.toArray.sorted(ord)
+      this.mapPartitionsWithIndex { case (pid, iter) =>
+        if (iter.nonEmpty) {
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)

Review Comment:
   I am concerned about the difference between the 'toArray' methods, but this can be ignored. From the source code, `IterableOnceOps.toArray` seems more efficient than `TraversableOnce.toArray`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37728: [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37728:
URL: https://github.com/apache/spark/pull/37728#discussion_r959254007


##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -1523,22 +1523,28 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || this.getNumPartitions == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
-        mapRDDs.reduce { (queue1, queue2) =>
-          queue1 ++= queue2
-          queue1
-        }.toArray.sorted(ord)
+      this.mapPartitionsWithIndex { case (pid, iter) =>
+        if (iter.nonEmpty) {
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)

Review Comment:
   I am concerned about the difference between the `toArray` methods, but this can be ignored. From the source code, `IterableOnceOps.toArray` seems more efficient than `TraversableOnce.toArray`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org