You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/06/15 13:49:54 UTC

[GitHub] [arrow-datafusion] alamb commented on issue #4973: Improve the performance of `Aggregator`, grouping, aggregaton

alamb commented on issue #4973:
URL: https://github.com/apache/arrow-datafusion/issues/4973#issuecomment-1593109590

   > For group by high cardinality columns, I will do some POC to support two levels of grouping state (PartitionedHashTable).
   
   I thought DataFusion already did two level partitioned hashing but I just checked and apparently not. 
   
   For example:
   
   ```sql
   ❯ explain select avg(system_load_average_1m) from '/tmp/example' group by uid;
   +---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                                                                                                                          |
   +---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Projection: AVG(/tmp/example.system_load_average_1m)                                                                                                          |
   |               |   Aggregate: groupBy=[[/tmp/example.uid]], aggr=[[AVG(/tmp/example.system_load_average_1m)]]                                                                  |
   |               |     TableScan: /tmp/example projection=[system_load_average_1m, uid]                                                                                          |
   | physical_plan | ProjectionExec: expr=[AVG(/tmp/example.system_load_average_1m)@1 as AVG(/tmp/example.system_load_average_1m)]                                                 |
   |               |   AggregateExec: mode=FinalPartitioned, gby=[uid@0 as uid], aggr=[AVG(/tmp/example.system_load_average_1m)]                                                   |
   |               |     CoalesceBatchesExec: target_batch_size=8192                                                                                                               |
   |               |       RepartitionExec: partitioning=Hash([Column { name: "uid", index: 0 }], 16), input_partitions=16                                                         |
   |               |         RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=2                                                                                 |
   |               |           AggregateExec: mode=Partial, gby=[uid@1 as uid], aggr=[AVG(/tmp/example.system_load_average_1m)]                                                    |
   |               |             ParquetExec: file_groups={2 groups: [[private/tmp/example/1.parquet], [private/tmp/example/2.parquet]]}, projection=[system_load_average_1m, uid] |
   |               |                                                                                                                                                               |
   +---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   
   
   This plan partitions on the group key `uid` (`RepartitionExec: partitioning=Hash([Column { name: "uid", index: 0 }], 16)`) **after** the first `AggregateExec`.
   
   Repartioning **before** the first AggregateExec is what is needed (so that each of the first level hash tables has a distinct set of keys and the hash tables can be smaller and more efficient). So instead of 
   
   ```
   AggregateExec: mode=FinalPartitioned
     RepartitionExec: partitioning=Hash([Column { name: "uid", index: 0 }], 16)
       RepartitionExec: partitioning=RoundRobinBatch(16), 
         AggregateExec: mode=Partial, gby=[uid@1 as uid], 
          ParquetExec: file_groups=...
   ```
   
   The plan would look something like
   ```
   AggregateExec: mode=FinalPartitioned
     AggregateExec: mode=Partial, gby=[uid@1 as uid], 
       RepartitionExec: partitioning=Hash([Column { name: "uid", index: 0 }], 16) <-- this is done prior to aggregation
          ParquetExec: file_groups=...
   ```
   
   The potential downside is that the hash function needs to be computed on more rows and the repartitioning happens prior to the first level aggregation which would be a slow down for low cardinality
   
   The more sophisticated version I think would have *3* aggregates
   
   ```
   AggregateExec: mode=FinalPartitioned
     AggregateExec: mode=Partial, gby=[uid@1 as uid],
       RepartitionExec: partitioning=Hash([Column { name: "uid", index: 0 }], 16) 
         AggregateExec: mode=PartialSmall, gby=[uid@1 as uid], <-- this is done prior to aggregation to reduce low cardinality inputs. The idea would be that the operator would have a small hash table (a few MB?) and dump to the output when the table is full
            ParquetExec: file_groups=...
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org