You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "viirya (via GitHub)" <gi...@apache.org> on 2023/04/26 07:32:57 UTC

[GitHub] [spark] viirya commented on a diff in pull request #40915: [SPARK-43232][SQL] Improve ObjectHashAggregateExec performance for high cardinality

viirya commented on code in PR #40915:
URL: https://github.com/apache/spark/pull/40915#discussion_r1177453821


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala:
##########
@@ -252,6 +249,22 @@ class SortBasedAggregator(
       var hasNextAggBuffer: Boolean = initialAggBufferIterator.next()
       private var result: AggregationBufferEntry = _
       private var groupingKey: UnsafeRow = _
+      /**
+       * A flag to represent how to process row for current grouping key:
+       *  0: update input row to aggregation buffer
+       *  1: merge input aggregation buffer to sort based aggregation buffer

Review Comment:
   `sort based aggregation buffer`? Isn't it initial agg buffer?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala:
##########
@@ -252,6 +249,22 @@ class SortBasedAggregator(
       var hasNextAggBuffer: Boolean = initialAggBufferIterator.next()
       private var result: AggregationBufferEntry = _
       private var groupingKey: UnsafeRow = _
+      /**
+       * A flag to represent how to process row for current grouping key:
+       *  0: update input row to aggregation buffer
+       *  1: merge input aggregation buffer to sort based aggregation buffer
+       *  2: first update input row to aggregation buffer then merge it to sort based
+       *     aggregation buffer
+       *
+       * The state transition is:
+       * - If `initialAggBufferIterator` has no more row, then it's 0
+       * - If `inputIterator` has no more row, then then it's 1

Review Comment:
   ```suggestion
          * - If `inputIterator` has no more row, then it's 1
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala:
##########
@@ -144,8 +136,7 @@ class ObjectAggregationIterator(
 
   // This function is used to read and process input rows. When processing input rows, it first uses
   // hash-based aggregation by putting groups and their buffers in `hashMap`. If `hashMap` grows too
-  // large, it sorts the contents, spills them to disk, and creates a new map. At last, all sorted
-  // spills are merged together for sort-based aggregation.
+  // large, it destroys the `hashMap` and falls back to sort-based aggregation.

Review Comment:
   Hmm, I think `it sorts the contents` looks correct. We get sorted content from the hash map as initial agg buffer iterator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org