You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "dario-liberman (via GitHub)" <gi...@apache.org> on 2023/11/28 03:00:57 UTC

[I] Partitioned Aggregation Support [pinot]

dario-liberman opened a new issue, #12057:
URL: https://github.com/apache/pinot/issues/12057

   Add support to optimise aggregation for partitioned columns.
   
   An example where the optimisation would be beneficial is distinct count, as it is sufficient to count distinct values within each partition and then a simple sum of the counts per partition would yield the final aggregate result.
   
   There is in fact an aggregation function today that uses this notion, [`SEGMENTPARTITIONEDDISTINCTCOUNT` ](https://docs.pinot.apache.org/configuration-reference/functions/segmentpartitioneddistinctcount), however it only handles cases where the whole partition fits in a single segment, so it is not applicable to most Pinot use cases (eg time-based).
   
   A reasonable approach is to support the partitioned aggregation optimisation for [partitioned segment assignment](https://docs.pinot.apache.org/operators/operating-pinot/segment-assignment#partitioned-replica-group-segment-assignment) configurations, so we can effectively assume that all segments for a given partition will be co-located on the same server.   
   
   This means that there are two key aspects that remain the same:
   1. Aggregation within each segment remains the same, as a segment belongs to a single partition.
   2. Broker aggregation across server results remains the same, all partitioning concerns can be resolved within the server.
   
   The proposal is for the [`AggregationFunction`](https://github.com/apache/pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunction.java) to have an extra stage allowing to merge results within each partition if the partitioning optimisation applies. 
   
   There are multiple ways to go about supporting this.
   
   New methods could look something like this:
   ``` 
   boolean isPartitionedAggregation();
   IntermediateResult extractPartitionResult(IntermediateResult intermediateResult);
   ```
   
   A more involved change would have a new generic type added, and consequently a new merge function to aggregate across partitions.
   ``` 
   boolean isPartitionedAggregation();
   PartitionedResult extractPartitionResult(IntermediateResult intermediateResult);
   PartitionedResult mergePartitions(PartitionedResult partitionedResult1, PartitionedResult partitionedResult2);
   ```
   
   An even more involved change would allow to have a different column type (tho this is likely unnecessary given `OBJECT` can be used anyways).
   ```
   ColumnDataType getPartitionResultColumnType();
   ```
   
   We can either add default methods to `AggregationFunction` (eg returning `false`) or we can have a new interface `PartitionedAggregationFunction` extending `AggregationFunction` (then where necessary check if the aggregation function object implements the new interface). 
   The latter has the benefit that we could more easily introduce a new generic type to represent the partition level intermediate result.
   
   The main changes to the aggregation logic in order to support this partitioning optimisation would be in `AggregationCombineOperator`, `GroupByCombineOperator` and `AggregationResultsBlockMerger`.
   
   Basically, the intermediate aggregation would be done across segments within each partition based on which partition the segment belongs to, partition results can then be extracted and finally merged across partitions.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [I] Partitioned Aggregation Support [pinot]

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on issue #12057:
URL: https://github.com/apache/pinot/issues/12057#issuecomment-1843900336

   I see. i think one misconception is this deals with more than just what multi-stage is doing: e.g. 
   1. not only the group-key is partitioned, thus the agg can directly return final result on each group
   2. but also if the aggregate operand is by itself partitioned, the agg function can produce another form of merge-able format that's not as complex as normal IntermediateResult. 
   
   in this sense, i agree with @Jackie-Jiang's comment above.
   
   The only thing I want to call out is that the `boolean isPartitionedAggregation()` is confusing. b/c it doesn't clearly tell whether it meant scenario (1) or (2) for the above. 
   
   I think a way to clearly indicate whether the result of the aggregate is "partitionResult" or "finalResult" would be more desirable (after all that's the goal of the flag is for yes?)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [I] Partitioned Aggregation Support [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on issue #12057:
URL: https://github.com/apache/pinot/issues/12057#issuecomment-1843764278

   Good point. To solve the above 2 requirements, we can:
   
   - Still process the partitioning info on broker, but group segments for each partition for the server request so that server knows the which segments belong to which partition, and perform merge accordingly
   
   - Introduce `PartitionResult` generic type, which default to `IntermediateResult`:
     - IntermediateResult: segment level result
     - PartitionResult: partition level result
     - FinalResult: query level result
   
     The default handling for `PartitionResult` should be the same as `IntermediateResult`
   
   - Broker may ask server to return `PartitionResult`, then broker and server can use partition level result merging accordingly


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [I] Partitioned Aggregation Support [pinot]

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on issue #12057:
URL: https://github.com/apache/pinot/issues/12057#issuecomment-1839909938

   is this similar to the multi-stage engine partitioned group-by optimization introduced in #11266 . we can potentially unify the 2 logics if we use broker to handle this optimization yes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [I] Partitioned Aggregation Support [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on issue #12057:
URL: https://github.com/apache/pinot/issues/12057#issuecomment-1839235972

   This is a great optimization! There is another way to think about this problem. Since the partition info is maintained on the broker side, and broker needs to ensure all segments for the same partition are served by the same server, I'd suggest letting broker handle this optimization by leveraging the query option `serverReturnFinalResult` introduced in #9304. There is no need to change the `AggregationFunction`, and we can make broker splitting the segments into per-partition segments and ask server to directly return the final result. When introducing the query option, I've considered using it for this optimization, but haven't got the time to do it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [I] Partitioned Aggregation Support [pinot]

Posted by "dario-liberman (via GitHub)" <gi...@apache.org>.
dario-liberman commented on issue #12057:
URL: https://github.com/apache/pinot/issues/12057#issuecomment-1842696322

   @Jackie-Jiang - I think we can use some of the building blocks of `serverReturnFinalResult`, but it still has two gaps as far as I can see.
   
   1. We still need a way to aggregate across servers, my point is that the merge functions within partition and across partitions are different. For example when calculating distinct counts, the former merge would be a union of sets whereas the latter merge would be a sum of numbers.
   2. If we only worry about the broker, we still miss on optimisation that can be done at the server level by having more efficient cross-partition aggregation (or more importantly lower memory cost within partition, such as smaller sets for distinct count). There could be an order of magnitude (or more) difference between partition count and server count (eg. 512 partitions split across 16 servers), specially with replication there could be a deployment configuration where the cluster is large but the partitions could be co-located on a small number of servers sufficient to return the result for a given query.
   
   In order to address both points above, I don't see how we can do it without explicit partitioned support in `AggregationFunction` (or as I say above, we can have a `PartitionedAggregationFunction` sub-class and we can check in run-time if the object implements the sub-class).
   
   @walterddr - I was not aware of the support for partitioned multi-stage query optimisations, thanks for sharing. I agree this work could indeed also benefit partitioned optimizations in multi-stage engine. However, as I say above, it is a different problem to optimize aggregations where the group-by column is partitioned vs optimising aggregations where the partitioning allows to optimise the aggregation itself (eg distinct count over a partitioned column). I argue that the aggregation function needs to be written in such a way to allow for the optimisation.
   
   There is a way to make this more generic I guess, we could have perhaps a pseudo function representing the partition id (could be just a number from 1 to N), then ask to write the query having two clearly separate phases. For example:
   
   ```
   select SUM(partitioned_agg) from (
       select DISTINCT_COUNT(partition_column) as partitioned_agg from table 
       group by PARTITION_ID(partition_column)
   )
   ```
   Then it is the user that needs to know what function to use to aggregate across partitions.
   I did not propose this, because in my view the engine should figure out how to optimise partitioning, not the user. But it is an option I guess. Maybe even ask to introduce appropriate hints for both selects above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org