You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/05/09 03:12:59 UTC

[spark] branch master updated: [SPARK-27207][SQL] : Ensure aggregate buffers are initialized again for So…

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0969d7a  [SPARK-27207][SQL] : Ensure aggregate buffers are initialized again for So…
0969d7a is described below

commit 0969d7aa0ca7c4b32f4753387db8ad67ac6764b0
Author: pgandhi <pg...@verizonmedia.com>
AuthorDate: Thu May 9 11:12:20 2019 +0800

    [SPARK-27207][SQL] : Ensure aggregate buffers are initialized again for So…
    
    …rtBasedAggregate
    
    Normally, the aggregate operations that are invoked for an aggregation buffer for User Defined Aggregate Functions(UDAF) follow the order like initialize(), update(), eval() OR initialize(), merge(), eval(). However, after a certain threshold configurable by spark.sql.objectHashAggregate.sortBased.fallbackThreshold is reached, ObjectHashAggregate falls back to SortBasedAggregator which invokes the merge or update operation without calling initialize() on the aggregate buffer.
    
    ## What changes were proposed in this pull request?
    
    The fix here is to initialize aggregate buffers again when fallback to SortBasedAggregate operator happens.
    
    ## How was this patch tested?
    
    The patch was tested as part of [SPARK-24935](https://issues.apache.org/jira/browse/SPARK-24935) as documented in PR https://github.com/apache/spark/pull/23778.
    
    Closes #24149 from pgandhi999/SPARK-27207.
    
    Authored-by: pgandhi <pg...@verizonmedia.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
index 56c2ee6..6fc2053 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
@@ -158,7 +158,7 @@ case class AggregateExpression(
  * ([[aggBufferAttributes]]) of an aggregation buffer which is used to hold partial aggregate
  * results. At runtime, multiple aggregate functions are evaluated by the same operator using a
  * combined aggregation buffer which concatenates the aggregation buffers of the individual
- * aggregate functions.
+ * aggregate functions. Please note that aggregate functions should be stateless.
  *
  * Code which accepts [[AggregateFunction]] instances should be prepared to handle both types of
  * aggregate functions.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org