You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/04/23 23:58:51 UTC

[GitHub] [spark] m44444 commented on a change in pull request #24149: [SPARK-27207] : Ensure aggregate buffers are initialized again for So…

m44444 commented on a change in pull request #24149: [SPARK-27207] : Ensure aggregate buffers are initialized again for So…
URL: https://github.com/apache/spark/pull/24149#discussion_r277914333
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
 ##########
 @@ -258,21 +275,29 @@ class SortBasedAggregator(
         if (hasNextInput || hasNextAggBuffer) {
           // Find smaller key of the initialAggBufferIterator and initialAggBufferIterator
           groupingKey = findGroupingKey()
-          result = new AggregationBufferEntry(groupingKey, makeEmptyAggregationBuffer)
+          updateResult = new AggregationBufferEntry(
+            groupingKey, makeEmptyAggregationBufferForSortBasedUpdateAggFunctions)
+          finalResult = new AggregationBufferEntry(
+            groupingKey, makeEmptyAggregationBufferForSortBasedMergeAggFunctions)
 
           // Firstly, update the aggregation buffer with input rows.
           while (hasNextInput &&
             groupingKeyOrdering.compare(inputIterator.getKey, groupingKey) == 0) {
-            processRow(result.aggregationBuffer, inputIterator.getValue)
+            processRow(updateResult.aggregationBuffer, inputIterator.getValue)
             hasNextInput = inputIterator.next()
           }
 
+          // This step ensures that the contents of the updateResult aggregation buffer are
+          // merged with the finalResult aggregation buffer to maintain consistency
+          serializeBuffer(updateResult.aggregationBuffer)
+          mergeAggregationBuffers(finalResult.aggregationBuffer, updateResult.aggregationBuffer)
           // Secondly, merge the aggregation buffer with existing aggregation buffers.
           // NOTE: the ordering of these two while-block matter, mergeAggregationBuffer() should
           // be called after calling processRow.
           while (hasNextAggBuffer &&
             groupingKeyOrdering.compare(initialAggBufferIterator.getKey, groupingKey) == 0) {
 
 Review comment:
   Hi @cloud-fan, what you said here is really killing! With Spark 2.4.1 by turning this conf off I see the DataSketches hll issue is solved: `--conf spark.sql.execution.useObjectHashAggregateExec=false`. Which basically disable the hash agg attempt. But as you said, this is downgrading the general performance (not so bad when people have a sense about how big their data is).
   However my question is, is it possible that it can be automatically done for the framework to recognize Hive UDAF and only apply its way to it? e.g. For such a query `select year, count(month), hive_udaf_count(month) from ... group by year` I want the Spark count() to behave as the way INIT -> UPDATE -> MERGE -> FINISH, while the hive_udaf_count() to behave as the way INIT -> UPDATE -> FINISH, INIT -> MERGE -> FINISH

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org