You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/02/04 08:10:01 UTC

[GitHub] peter-toth edited a comment on issue #23731: [SPARK-26572][SQL] fix aggregate codegen result evaluation

peter-toth edited a comment on issue #23731: [SPARK-26572][SQL] fix aggregate codegen result evaluation
URL: https://github.com/apache/spark/pull/23731#issuecomment-460158941
 
 
   The reason why I think this is a code generation issue is that if you disable `spark.sql.codegen.wholeStage` then the result is correct. 
   
   This is the physical plan of the example in the ticket:
   ```
   == Physical Plan ==
   *(3) Project [idx#4, id#6L]
   +- *(3) BroadcastHashJoin [idx#4], [idx#9], Inner, BuildLeft
      :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      :  +- *(1) Project [value#1 AS idx#4]
      :     +- LocalTableScan [value#1]
      +- *(3) HashAggregate(keys=[idx#9], functions=[], output=[idx#9, id#6L])
         +- Exchange hashpartitioning(idx#9, 5)
            +- *(2) HashAggregate(keys=[idx#9], functions=[], output=[idx#9])
               +- *(2) Project [value#1 AS idx#9]
                  +- LocalTableScan [value#1]
   ```
   and if you take a look the code of stage 3 (left some comments in it regarding what my PR does):
   ```
       ...
       // this method is called for every aggregation key
       private void agg_doAggregateWithKeysOutput_0(UnsafeRow agg_keyTerm_0, UnsafeRow agg_bufferTerm_0)
               throws java.io.IOException {
           ((org.apache.spark.sql.execution.metric.SQLMetric) references[4] /* numOutputRows */).add(1);
   
           int agg_value_4 = agg_keyTerm_0.getInt(0);
           // this PR moves agg_value_5 calculation and agg_count_0 increment from boradcast join loop to here
   
           // generate join key for stream side
           boolean bhj_isNull_0 = false;
           long bhj_value_0 = -1L;
           if (!false) {
               bhj_value_0 = (long) agg_value_4;
           }
           // find matches from HashRelation
           scala.collection.Iterator bhj_matches_0 = bhj_isNull_0 ? null
                   : (scala.collection.Iterator) bhj_relation_0.get(bhj_value_0);
           if (bhj_matches_0 != null) {
               while (bhj_matches_0.hasNext()) {
                   UnsafeRow bhj_matched_0 = (UnsafeRow) bhj_matches_0.next();
                   {
                       ((org.apache.spark.sql.execution.metric.SQLMetric) references[6] /* numOutputRows */).add(1);
   
                       int bhj_value_2 = bhj_matched_0.getInt(0);
                       boolean project_isNull_0 = false;
                       UTF8String project_value_0 = null;
                       if (!false) {
                           project_value_0 = UTF8String.fromString(String.valueOf(bhj_value_2));
                       }
                       final long agg_value_5 = partitionMask + agg_count_0;
                       agg_count_0++;
                       boolean project_isNull_2 = false;
                       UTF8String project_value_2 = null;
                       if (!false) {
                           project_value_2 = UTF8String.fromString(String.valueOf(agg_value_5));
                       }
       ...
   ```
   So both hash aggregate and broadcast join are required in one codegen stage to experience this issue and also important that aggregate has to be on the "stream" side. This might be a rare case and explains why this issue hasn't come up earlier.
   (I also think that there might be other operators than broadcast join that generate loop and so are affected, but I didn't look into that.)
   But I think this is an issue with the generated code of `HashAggregateExec` and it seems to me that we can force evaluation of `resultExpressions` before generating broadcast join code (ie. calling `consume()`) without any drawback.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org