You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/07/07 19:41:43 UTC

[GitHub] [spark] izchen opened a new pull request #29028: [SPARK-32212][CORE]RDD.takeOrdered can choose to merge intermediate r…

izchen opened a new pull request #29028:
URL: https://github.com/apache/spark/pull/29028


   …esults in executor or driver
   
   ### What changes were proposed in this pull request?
   Add an optional config to determine whether the intermediate results(local TopK) of each partition in RDD.takeOrdered will be merged in driver process or executor process. If set to true, merge in driver process(by util.PriorityQueue), which will get shorter waiting time for return. But if the intermediate results are too large and too many partitions, the intermediate results may accumulate in the memory of the driver process, causing excessive memory pressure. If set to false, merge in executor process(by guava.QuickSelect), intermediate results will not accumulate in memory, but will cause longer runtimes.
   
   ### Why are the changes needed?
   The problem with original implementation is that if the intermediate results are too large and too many partitions, the intermediate results may accumulate in the memory of the driver process, causing excessive memory pressure.
   
   ### Does this PR introduce _any_ user-facing change?
   Adds configuration parameter
   "spark.rdd.takeOrdered.mergeInDriver" (default: true)
   
   ### How was this patch tested?
   Added UTs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #29028: [SPARK-32212][CORE]RDD.takeOrdered can choose to merge intermediate r…

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #29028:
URL: https://github.com/apache/spark/pull/29028#discussion_r451528174



##########
File path: core/src/main/scala/org/apache/spark/rdd/RDD.scala
##########
@@ -1509,22 +1509,26 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || partitions.length == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
+      if (conf.get(RDD_TAKE_ORDERED_MERGE_IN_DRIVER)) {
+        val mapRDDs = mapPartitions { items =>
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
+          queue ++= collectionUtils.takeOrdered(items, num)(ord)
+          Iterator.single(queue)
+        }
         mapRDDs.reduce { (queue1, queue2) =>
           queue1 ++= queue2
           queue1
         }.toArray.sorted(ord)
+      } else {
+        mapPartitions { items =>
+          collectionUtils.takeOrdered(items, num)(ord)
+        }.repartition(1).mapPartitions { items =>

Review comment:
       Then, the executor could easily be under excessive memory pressure by `repartition(1)`? 
   
   Actually, I think it's probably a good idea to use `treeReduce` instead as mentioned in SPARK-32212.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #29028: [SPARK-32212][CORE]RDD.takeOrdered can choose to merge intermediate r…

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #29028:
URL: https://github.com/apache/spark/pull/29028


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29028: [SPARK-32212][CORE]RDD.takeOrdered can choose to merge intermediate r…

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29028:
URL: https://github.com/apache/spark/pull/29028#issuecomment-655084829


   Can one of the admins verify this patch?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] izchen commented on a change in pull request #29028: [SPARK-32212][CORE]RDD.takeOrdered can choose to merge intermediate r…

Posted by GitBox <gi...@apache.org>.
izchen commented on a change in pull request #29028:
URL: https://github.com/apache/spark/pull/29028#discussion_r452858008



##########
File path: core/src/main/scala/org/apache/spark/rdd/RDD.scala
##########
@@ -1509,22 +1509,26 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || partitions.length == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
+      if (conf.get(RDD_TAKE_ORDERED_MERGE_IN_DRIVER)) {
+        val mapRDDs = mapPartitions { items =>
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
+          queue ++= collectionUtils.takeOrdered(items, num)(ord)
+          Iterator.single(queue)
+        }
         mapRDDs.reduce { (queue1, queue2) =>
           queue1 ++= queue2
           queue1
         }.toArray.sorted(ord)
+      } else {
+        mapPartitions { items =>
+          collectionUtils.takeOrdered(items, num)(ord)
+        }.repartition(1).mapPartitions { items =>

Review comment:
       Thank you very much for code review.
   
   The analysis of several implementations is as follows:
   _p = rdd.getNumPartitions_
   _reduce - PriorityQueue_(Original implementation) => At worst, _O(p * k)_ memory may be used in the driver, but intermediate result(local TopK) merging does not need to wait until all partitions are returned. This is executed in parallel.
   
   _repartition(1) - guava.QuickSelect_ => The merging takes place in the executor. The merging algorithm itself uses _O(k)_ extra memory, and the time complexity is _O(p * k)_. The intermediate result data is from ShuffleBlockFetcherIterator of spark shuffle, the iterator does not use too much memory.
   
   _treeReduce(depth=2) - guava.QuickSelect_ => At worst, _O(sqrt(p) * k)_ memory may be used in the driver. The memory usage in the executor is the same as _repartition(1)_. Compared with _repartition(1)_ it increases the parallelism of local TopK merging, which can improve the speed of merging under very large amount of data, but it uses more memory on the driver.
   
   The number of intermediate results that need to be merged is _p * k_. in general, this number is not very large, and the non parallel _O(p * k)_ merging algorithm is acceptable.
   So I think maybe _repartition(1)_  is a better choice.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] commented on pull request #29028: [SPARK-32212][CORE]RDD.takeOrdered can choose to merge intermediate r…

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #29028:
URL: https://github.com/apache/spark/pull/29028#issuecomment-715647015


   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29028: [SPARK-32212][CORE]RDD.takeOrdered can choose to merge intermediate r…

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29028:
URL: https://github.com/apache/spark/pull/29028#issuecomment-655084829


   Can one of the admins verify this patch?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] izchen edited a comment on pull request #29028: [SPARK-32212][CORE]RDD.takeOrdered can choose to merge intermediate r…

Posted by GitBox <gi...@apache.org>.
izchen edited a comment on pull request #29028:
URL: https://github.com/apache/spark/pull/29028#issuecomment-657111204


   > I'd rather not expose yet another config for this. Are there any heuristics that can select this more intelligently?
   
   Thank you very much for code review.
   
   The idea of heuristic selection is like:
   `mergeInDriver = intermediateResultsTotalSize < getConf("spark.driver.maxResultSize")`
   
   The intermediate result is the local TopK result of each RDD partition. We can know that the total number of items is _partitionNum * k_. However, due to uncertain length objects such as string or user-defined classes, the total size cannot be estimated before all partitions are executed.
   
   In most user applications, `mergeInDriver = true`, then the localTopK merge begins after the second RDD partition result is returned, and the merge process and the RDD partition compute process are parallel. This can obtain the shortest result return waiting time. Collecting the information needed for heuristic selection will have a negative impact on the performance of such applications.
   
   `mergeInDriver = true` may cause program errors in two types of applications:
   The result data contains large objects, such as large strings or large user-defined classes.
   Driver applications with high memory pressure, such as multi-user spark/sql/hive-thriftserver.
   
   There may not be a good way to not expose this config.
   
   Maybe I can add a user-friendly log about this config.
   something like:
   ```
   try {
       mapRDDs.reduce { (queue1, queue2) =>
        ...
       }
     } catch {
      case e: SparkException if e.getMessage.contains(s"bigger than ${MAX_RESULT_SIZE.key}") =>
        logError(s"Total size of serialized intermediate results is bigger than " +
          s"${MAX_RESULT_SIZE.key} (${Utils.bytesToString(conf.get(MAX_RESULT_SIZE))}). " +
          s"If the final result size is less than this value, you can set the " +
          s"config ${RDD_TAKE_ORDERED_MERGE_IN_DRIVER.key} to false to complete " +
          s"the intermediate result merge in executor. But this config will " +
          s"increase the return time of the takeOrdered method.")
        throw e
     }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] izchen commented on a change in pull request #29028: [SPARK-32212][CORE]RDD.takeOrdered can choose to merge intermediate r…

Posted by GitBox <gi...@apache.org>.
izchen commented on a change in pull request #29028:
URL: https://github.com/apache/spark/pull/29028#discussion_r452858008



##########
File path: core/src/main/scala/org/apache/spark/rdd/RDD.scala
##########
@@ -1509,22 +1509,26 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || partitions.length == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
+      if (conf.get(RDD_TAKE_ORDERED_MERGE_IN_DRIVER)) {
+        val mapRDDs = mapPartitions { items =>
+          // Priority keeps the largest elements, so let's reverse the ordering.
+          val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
+          queue ++= collectionUtils.takeOrdered(items, num)(ord)
+          Iterator.single(queue)
+        }
         mapRDDs.reduce { (queue1, queue2) =>
           queue1 ++= queue2
           queue1
         }.toArray.sorted(ord)
+      } else {
+        mapPartitions { items =>
+          collectionUtils.takeOrdered(items, num)(ord)
+        }.repartition(1).mapPartitions { items =>

Review comment:
       Thank you very much for code review.
   
   The analysis of several implementations is as follows:
   _p = rdd.getNumPartitions_
   _reduce - PriorityQueue_(Original implementation) => At worst, _O(p * k)_ memory may be used in the driver, but intermediate result(local TopK) merging does not need to wait until all partitions are returned. This is executed in parallel.
   
   _repartition(1) - guava.QuickSelect_ => The merging takes place in the executor. The merging algorithm itself uses _O(k)_ extra memory, and the time complexity is _O(p * k)_. The intermediate result data is from ShuffleBlockFetcherIterator of spark shuffle, the iterator does not use too much memory.
   
   _treeReduce(depth=2) - guava.QuickSelect_ => At worst, _O(sqrt(p) * k)_ memory may be used in the driver. The memory usage in the driver is the same as _repartition(1)_. Compared with _repartition(1)_ it increases the parallelism of local TopK merging, which can improve the speed of merging under very large amount of data, but it uses more memory on the driver.
   
   The number of intermediate results that need to be merged is _p * k_. in general, this number is not very large, and the non parallel _O(p * k)_ merging algorithm is acceptable.
   So I think maybe _repartition(1)_  is a better choice.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] izchen commented on pull request #29028: [SPARK-32212][CORE]RDD.takeOrdered can choose to merge intermediate r…

Posted by GitBox <gi...@apache.org>.
izchen commented on pull request #29028:
URL: https://github.com/apache/spark/pull/29028#issuecomment-657111204


   > I'd rather not expose yet another config for this. Are there any heuristics that can select this more intelligently?
   
   Thank you very much for code review.
   
   The idea of heuristic selection is like:
   `mergeInDriver = intermediateResultsTotalSize < getConf("spark.driver.maxResultSize")`
   
   The intermediate result is the local TopK result of each RDD partition. We can know that the total number of items is _partitionNum * k_. However, due to uncertain length objects such as string or user-defined classes, the total size cannot be estimated before all partitions are executed.
   
   In most user applications, `mergeInDriver = true`, then the localTopK merge begins after the second RDD partition result is returned, and the merge process and the RDD partition compute process are parallel. This can obtain the shortest result return waiting time. Collecting the information needed for heuristic selection will have a negative impact on the performance of such applications.
   
   `mergeInDriver = true` may cause program errors in two types of applications:
   The result data contains large objects, such as large strings or large user-defined classes.
   Driver applications with high memory pressure, such as multi-user spark/sql/hive-thriftserver.
   
   There may not be a good way to not expose this config.
   
   Maybe I can add a user-friendly log about this config.
   something like:
   ```
   try {
       mapRDDs.reduce { (queue1, queue2) =>
        ...
       }
     } catch {
      case e: SparkException if e.getMessage.contains(MAX_RESULT_SIZE.key) =>
        logError(s"Total size of serialized intermediate results is bigger than " +
          s"${MAX_RESULT_SIZE.key} (${Utils.bytesToString(conf.get(MAX_RESULT_SIZE))}). " +
          s"If the final result size is less than this value, you can set the " +
          s"config ${RDD_TAKE_ORDERED_MERGE_IN_DRIVER.key} to false to complete " +
          s"the intermediate result merge in executor. But this config will " +
          s"increase the return time of the takeOrdered function.")
        throw e
     }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29028: [SPARK-32212][CORE]RDD.takeOrdered can choose to merge intermediate r…

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29028:
URL: https://github.com/apache/spark/pull/29028#issuecomment-655085626


   Can one of the admins verify this patch?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] izchen edited a comment on pull request #29028: [SPARK-32212][CORE]RDD.takeOrdered can choose to merge intermediate r…

Posted by GitBox <gi...@apache.org>.
izchen edited a comment on pull request #29028:
URL: https://github.com/apache/spark/pull/29028#issuecomment-657111204


   > I'd rather not expose yet another config for this. Are there any heuristics that can select this more intelligently?
   
   Thank you very much for code review.
   
   The idea of heuristic selection is like:
   `mergeInDriver = intermediateResultsTotalSize < getConf("spark.driver.maxResultSize")`
   
   The intermediate result is the local TopK result of each RDD partition. We can know that the total number of items is _partitionNum * k_. However, due to uncertain length objects such as string or user-defined classes, the total size cannot be estimated before all partitions are executed.
   
   In most user applications, `mergeInDriver = true`, then the localTopK merge begins after the second RDD partition result is returned, and the merge process and the RDD partition compute process are parallel. This can obtain the shortest result return waiting time. Collecting the information needed for heuristic selection will have a negative impact on the performance of such applications.
   
   `mergeInDriver = true` may cause program errors in two types of applications:
   The result data contains large objects, such as large strings or large user-defined classes.
   Driver applications with high memory pressure, such as multi-user spark/sql/hive-thriftserver.
   
   There may not be a good way to not expose this config.
   
   Maybe I can add a user-friendly log about this config.
   something like:
   ```
   try {
       mapRDDs.reduce { (queue1, queue2) =>
        ...
       }
     } catch {
      case e: SparkException if e.getMessage.contains(s"bigger than ${MAX_RESULT_SIZE.key}") =>
        logError(s"Total size of serialized intermediate results is bigger than " +
          s"${MAX_RESULT_SIZE.key} (${Utils.bytesToString(conf.get(MAX_RESULT_SIZE))}). " +
          s"If the final result size is less than this value, you can set the " +
          s"config ${RDD_TAKE_ORDERED_MERGE_IN_DRIVER.key} to false to complete " +
          s"the intermediate result merge in executor. But this config will " +
          s"increase the return time of the takeOrdered function.")
        throw e
     }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org