You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "dario-liberman (via GitHub)" <gi...@apache.org> on 2023/07/12 13:33:18 UTC

[GitHub] [pinot] dario-liberman opened a new pull request, #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

dario-liberman opened a new pull request, #11092:
URL: https://github.com/apache/pinot/pull/11092

   PR for https://github.com/apache/pinot/issues/10866
   
   This PR adds the remaining funnel count aggregation strategies documented in [docs](https://github.com/dario-liberman/pinot-docs/blob/7a23268ff0cc8f9636668b09cf85519fef8a4fd2/configuration-reference/functions/funnelcount.md).
   
   In particular, introduces strategies that do not require or make assumptions regarding partitioning configuration:
   - _theta-sketch_
   - _bitmap_
   - _set_
   
   These have the same characteristics as the respective distinct count aggregation functions: 
   - [DISTINCTCOUNTTHETASKETCH](https://docs.pinot.apache.org/configuration-reference/functions/distinctcountthetasketch)
   - [DISTINCTCOUNTBITMAP](https://docs.pinot.apache.org/configuration-reference/functions/distinctcountbitmap)
   - [DISTINCTCOUNT](https://docs.pinot.apache.org/configuration-reference/functions/distinctcount)
   
   These complement the already present aggregation strategies:
   - _partitioned_
   - _partitioned, sorted_
   
   The first corresponding to [SEGMENTPARTITIONEDDISTINCTCOUNT](https://docs.pinot.apache.org/configuration-reference/functions/segmentpartitioneddistinctcount). The latter has no equivalent (tho GAPFILL has a somewhat similar aggregation optimisation for sorted rows).
   
   In order to select the strategy, the user just needs to indicate the desired strategy as a setting parameter, as documented in the link above, for example:
   ```
   select 
     FUNNEL_COUNT(
       STEPS(
         url = '/cart/add', 
         url = '/checkout/start', 
         url = '/checkout/confirmation'),
       CORRELATE_BY(user_id),
       SETTINGS('theta_sketch', 'nominalEntries=4096')
     ) AS counts
   from user_log 
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dario-liberman commented on a diff in pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "dario-liberman (via GitHub)" <gi...@apache.org>.
dario-liberman commented on code in PR #11092:
URL: https://github.com/apache/pinot/pull/11092#discussion_r1272710739


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -198,27 +278,58 @@ private int[][] getSteps(Map<ExpressionContext, BlockValSet> blockValSetMap) {
   }
 
   private boolean isSorted(Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    final Dictionary primaryCorrelationDictionary = blockValSetMap.get(_primaryCorrelationCol).getDictionary();
-    if (primaryCorrelationDictionary == null) {
-      throw new IllegalArgumentException(
-          "CORRELATE_BY column in FUNNELCOUNT aggregation function not supported, please use a dictionary encoded "
-              + "column.");
+    return getDictionary(blockValSetMap).isSorted();
+  }
+
+  private AggregationStrategy getAggregationStrategy(Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    if (_partitionSetting && _sortingSetting && isSorted(blockValSetMap)) {
+      return _sortedAggregationStrategy;
+    }
+    if (_thetaSketchSetting) {
+      return _thetaSketchAggregationStrategy;
     }
-    return primaryCorrelationDictionary.isSorted();
+    // default
+    return _bitmapAggregationStrategy;
   }
 
-  private SegmentAggregationStrategy<?, List<Long>> getAggregationStrategyByBlockValSetMap(
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    return isSorted(blockValSetMap) ? _sortedAggregationStrategy : _bitmapAggregationStrategy;
+  private ResultExtractionStrategy getResultExtractionStrategy(Object aggResult) {
+    if (_partitionSetting) {
+        if (_sortingSetting && aggResult instanceof SortedAggregationResult) {
+          return _sortedPartitionedResultExtractionStrategy;
+        }
+        if (_thetaSketchSetting) {
+          return _thetaSketchPartitionedResultExtractionStrategy;
+        }
+        return _bitmapPartitionedResultExtractionStrategy;
+    }
+    if (_thetaSketchSetting) {
+      return _thetaSketchResultExtractionStrategy;

Review Comment:
   There are 5 tests included with this PR exercising for each strategy both aggregation only and aggregation with group-by:
   - FunnelCountQueriesBitmapTest
   - FunnelCountQueriesPartitionedSortedTest
   - FunnelCountQueriesPartitionedTest
   - FunnelCountQueriesSetTest
   - FunnelCountQueriesThetaSketchTest



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dario-liberman commented on a diff in pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "dario-liberman (via GitHub)" <gi...@apache.org>.
dario-liberman commented on code in PR #11092:
URL: https://github.com/apache/pinot/pull/11092#discussion_r1272643837


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -35,46 +50,107 @@
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
 
 
 /**
- * The {@code FunnelCountAggregationFunction} calculates the number of step conversions for a given partition column and
- * a list of boolean expressions.
- * <p>IMPORTANT: This function relies on the partition column being partitioned for each segment, where there are no
- * common values across different segments.
- * <p>This function calculates the exact number of step matches per partition key within the segment, then sums up the
- * results from different segments.
+ * The {@code FunnelCountAggregationFunction} calculates the number of conversions for a given correlation column and
+ * a list of steps as boolean expressions.
  *
  * Example:
  *   SELECT
  *    dateTrunc('day', timestamp) AS ts,
  *    FUNNEL_COUNT(
  *      STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
- *      CORRELATED_BY(user)
+ *      CORRELATED_BY(user_id)
  *    ) as step_counts
  *    FROM user_log
  *    WHERE url in ('/addToCart', '/checkout', '/orderConfirmation')
  *    GROUP BY 1
+ *
+ *  Counting strategies can be controlled via optional SETTINGS options, for example:
+ *
+ *  FUNNEL_COUNT(
+ *    STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
+ *    CORRELATED_BY(user_id),
+ *    SETTINGS('theta_sketch','nominalEntries=4096')
+ *  )
+ *
+ *  There are 5 strategies available, mirroring the corresponding distinct count implementations as per below.
+ *  <p><ul>
+ *  <li>'set': See DISTINCTCOUNT at {@link DistinctCountAggregationFunction}
+ *  <li>'bitmap' (default): See DISTINCTCOUNTBITMAP at {@link DistinctCountBitmapAggregationFunction}
+ *  <li>'theta_sketch': See DISTINCTCOUNTTHETASKETCH at {@link DistinctCountThetaSketchAggregationFunction}
+ *  <li>'partitioned': See SEGMENTPARTITIONEDDISTINCTCOUNT {@link SegmentPartitionedDistinctCountAggregationFunction}
+ *  <li>'sorted': sorted counts per segment then sums up. Only availabe in combination with 'partitioned'.
+ *  <li>'nominalEntries=4096': theta sketch configuration, default is 4096.
+ *  </ul><p>
  */
-public class FunnelCountAggregationFunction implements AggregationFunction<List<Long>, LongArrayList> {
+public class FunnelCountAggregationFunction implements AggregationFunction<Object, LongArrayList> {
+  private static final Sketch EMPTY_SKETCH = new UpdateSketchBuilder().build().compact();
   final List<ExpressionContext> _expressions;
   final List<ExpressionContext> _stepExpressions;
   final List<ExpressionContext> _correlateByExpressions;
   final ExpressionContext _primaryCorrelationCol;
   final int _numSteps;
-
-  final SegmentAggregationStrategy<?, List<Long>> _sortedAggregationStrategy;
-  final SegmentAggregationStrategy<?, List<Long>> _bitmapAggregationStrategy;
+  final int _nominalEntries;
+  final boolean _partitionSetting;
+  final boolean _sortingSetting;
+  final boolean _thetaSketchSetting;
+  final boolean _setSetting;
+
+  final AggregationStrategy _thetaSketchAggregationStrategy;
+  final AggregationStrategy _bitmapAggregationStrategy;
+  final AggregationStrategy _sortedAggregationStrategy;
+
+  final MergeStrategy _thetaSketchMergeStrategy;
+  final MergeStrategy _setMergeStrategy;
+  final MergeStrategy _bitmapMergeStrategy;
+  final MergeStrategy _partitionedMergeStrategy;
+
+  final ResultExtractionStrategy _thetaSketchResultExtractionStrategy;
+  final ResultExtractionStrategy _setResultExtractionStrategy;
+  final ResultExtractionStrategy _bitmapResultExtractionStrategy;
+  final ResultExtractionStrategy _sortedPartitionedResultExtractionStrategy;
+  final ResultExtractionStrategy _bitmapPartitionedResultExtractionStrategy;
+  final ResultExtractionStrategy _thetaSketchPartitionedResultExtractionStrategy;
 
   public FunnelCountAggregationFunction(List<ExpressionContext> expressions) {
     _expressions = expressions;
+    Option.validate(expressions);
     _correlateByExpressions = Option.CORRELATE_BY.getInputExpressions(expressions);
     _primaryCorrelationCol = Option.CORRELATE_BY.getFirstInputExpression(expressions);
     _stepExpressions = Option.STEPS.getInputExpressions(expressions);
     _numSteps = _stepExpressions.size();
-    _sortedAggregationStrategy = new SortedAggregationStrategy();
+
+    final List<String> settings = Option.SETTINGS.getLiterals(expressions);
+    Setting.validate(settings);

Review Comment:
   Some combinations are valid. For example, one could ask for `partitioned` together with `sorted` or together with `tetha_sketch`.
   But we could indeed check for invalid combinations instead of just prioritising them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dario-liberman commented on pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "dario-liberman (via GitHub)" <gi...@apache.org>.
dario-liberman commented on PR #11092:
URL: https://github.com/apache/pinot/pull/11092#issuecomment-1648468379

   > Some tests are failing -- please check
   
   I believe these are flaky tests unrelated to this PR, is there a way to re-run the failed tests?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dario-liberman commented on a diff in pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "dario-liberman (via GitHub)" <gi...@apache.org>.
dario-liberman commented on code in PR #11092:
URL: https://github.com/apache/pinot/pull/11092#discussion_r1272703933


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -180,8 +256,12 @@ public String toExplainString() {
     return stringBuilder.append(')').toString();
   }
 
-  private static LongArrayList toLongArrayList(List<Long> longList) {
-    return longList instanceof LongArrayList ? ((LongArrayList) longList).clone() : new LongArrayList(longList);
+  private Dictionary getDictionary(Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    final Dictionary primaryCorrelationDictionary = blockValSetMap.get(_primaryCorrelationCol).getDictionary();
+    Preconditions.checkArgument(primaryCorrelationDictionary != null,
+        "CORRELATE_BY column in FUNNELCOUNT aggregation function not supported, please use a dictionary encoded "

Review Comment:
   It is mentioned as a limitation in the documentation for the function (linked above in this PR). I personally have no plans to remove this limitation, as it would be rare to correlate by something other than an actual column. Someone else in the community could obviously contribute the necessary changes, but I think it is a fair limitation for a funnel analytics function. I might work in the future on supporting a secondary set of correlations though, in addition to a primary correlation (eg. correlate by user id + order id). Depending on how that is implemented perhaps I could remove the limitation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dario-liberman commented on pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "dario-liberman (via GitHub)" <gi...@apache.org>.
dario-liberman commented on PR #11092:
URL: https://github.com/apache/pinot/pull/11092#issuecomment-1683401872

   @atris - Did you have a chance to review the PR after the refactoring?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] atris commented on a diff in pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "atris (via GitHub)" <gi...@apache.org>.
atris commented on code in PR #11092:
URL: https://github.com/apache/pinot/pull/11092#discussion_r1272617169


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -35,46 +50,107 @@
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
 
 
 /**
- * The {@code FunnelCountAggregationFunction} calculates the number of step conversions for a given partition column and
- * a list of boolean expressions.
- * <p>IMPORTANT: This function relies on the partition column being partitioned for each segment, where there are no
- * common values across different segments.
- * <p>This function calculates the exact number of step matches per partition key within the segment, then sums up the
- * results from different segments.
+ * The {@code FunnelCountAggregationFunction} calculates the number of conversions for a given correlation column and
+ * a list of steps as boolean expressions.
  *
  * Example:
  *   SELECT
  *    dateTrunc('day', timestamp) AS ts,
  *    FUNNEL_COUNT(
  *      STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
- *      CORRELATED_BY(user)
+ *      CORRELATED_BY(user_id)
  *    ) as step_counts
  *    FROM user_log
  *    WHERE url in ('/addToCart', '/checkout', '/orderConfirmation')
  *    GROUP BY 1
+ *
+ *  Counting strategies can be controlled via optional SETTINGS options, for example:
+ *
+ *  FUNNEL_COUNT(
+ *    STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
+ *    CORRELATED_BY(user_id),
+ *    SETTINGS('theta_sketch','nominalEntries=4096')
+ *  )
+ *
+ *  There are 5 strategies available, mirroring the corresponding distinct count implementations as per below.
+ *  <p><ul>
+ *  <li>'set': See DISTINCTCOUNT at {@link DistinctCountAggregationFunction}
+ *  <li>'bitmap' (default): See DISTINCTCOUNTBITMAP at {@link DistinctCountBitmapAggregationFunction}
+ *  <li>'theta_sketch': See DISTINCTCOUNTTHETASKETCH at {@link DistinctCountThetaSketchAggregationFunction}
+ *  <li>'partitioned': See SEGMENTPARTITIONEDDISTINCTCOUNT {@link SegmentPartitionedDistinctCountAggregationFunction}
+ *  <li>'sorted': sorted counts per segment then sums up. Only availabe in combination with 'partitioned'.
+ *  <li>'nominalEntries=4096': theta sketch configuration, default is 4096.
+ *  </ul><p>
  */
-public class FunnelCountAggregationFunction implements AggregationFunction<List<Long>, LongArrayList> {
+public class FunnelCountAggregationFunction implements AggregationFunction<Object, LongArrayList> {

Review Comment:
   We are losing some type specification here by moving to Object. Is it possible to be creating an abstract type specific to our functions, and use it here>



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -198,27 +278,58 @@ private int[][] getSteps(Map<ExpressionContext, BlockValSet> blockValSetMap) {
   }
 
   private boolean isSorted(Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    final Dictionary primaryCorrelationDictionary = blockValSetMap.get(_primaryCorrelationCol).getDictionary();
-    if (primaryCorrelationDictionary == null) {
-      throw new IllegalArgumentException(
-          "CORRELATE_BY column in FUNNELCOUNT aggregation function not supported, please use a dictionary encoded "
-              + "column.");
+    return getDictionary(blockValSetMap).isSorted();
+  }
+
+  private AggregationStrategy getAggregationStrategy(Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    if (_partitionSetting && _sortingSetting && isSorted(blockValSetMap)) {
+      return _sortedAggregationStrategy;
+    }
+    if (_thetaSketchSetting) {
+      return _thetaSketchAggregationStrategy;
     }
-    return primaryCorrelationDictionary.isSorted();
+    // default
+    return _bitmapAggregationStrategy;
   }
 
-  private SegmentAggregationStrategy<?, List<Long>> getAggregationStrategyByBlockValSetMap(
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    return isSorted(blockValSetMap) ? _sortedAggregationStrategy : _bitmapAggregationStrategy;
+  private ResultExtractionStrategy getResultExtractionStrategy(Object aggResult) {
+    if (_partitionSetting) {
+        if (_sortingSetting && aggResult instanceof SortedAggregationResult) {
+          return _sortedPartitionedResultExtractionStrategy;
+        }
+        if (_thetaSketchSetting) {
+          return _thetaSketchPartitionedResultExtractionStrategy;
+        }
+        return _bitmapPartitionedResultExtractionStrategy;
+    }
+    if (_thetaSketchSetting) {
+      return _thetaSketchResultExtractionStrategy;

Review Comment:
   This IMO looks a bit scary. Are the existing tests exercising the code?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -109,47 +185,52 @@ public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int ma
   @Override
   public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregate(length, aggregationResultHolder, blockValSetMap);
+    getAggregationStrategy(blockValSetMap)
+        .aggregate(length, aggregationResultHolder, blockValSetMap);
   }
 
   @Override
   public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregateGroupBySV(length, groupKeyArray,
-        groupByResultHolder, blockValSetMap);
+    getAggregationStrategy(blockValSetMap)
+        .aggregateGroupBySV(length, groupKeyArray, groupByResultHolder, blockValSetMap);
   }
 
   @Override
   public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregateGroupByMV(length, groupKeysArray,
-        groupByResultHolder, blockValSetMap);
+    getAggregationStrategy(blockValSetMap)
+        .aggregateGroupByMV(length, groupKeysArray, groupByResultHolder, blockValSetMap);
   }
 
   @Override
-  public List<Long> extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
-    return getAggregationStrategyByAggregationResult(aggregationResultHolder.getResult()).extractAggregationResult(
-        aggregationResultHolder);
+  public Object extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    return getResultExtractionStrategy(aggregationResultHolder.getResult())
+        .extractAggregationResult(aggregationResultHolder);
   }
-
   @Override
-  public List<Long> extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
-    return getAggregationStrategyByAggregationResult(groupByResultHolder.getResult(groupKey)).extractGroupByResult(
-        groupByResultHolder, groupKey);
+  public Object extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+    return getResultExtractionStrategy(groupByResultHolder.getResult(groupKey))
+        .extractGroupByResult(groupByResultHolder, groupKey);
   }
 
   @Override
-  public List<Long> merge(List<Long> a, List<Long> b) {
-    int length = a.size();
-    Preconditions.checkState(length == b.size(), "The two operand arrays are not of the same size! provided %s, %s",
-        length, b.size());
+  public Object merge(Object a, Object b) {

Review Comment:
   Can this be an abstract type, known to this class and MergeStrategy?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -180,8 +256,12 @@ public String toExplainString() {
     return stringBuilder.append(')').toString();
   }
 
-  private static LongArrayList toLongArrayList(List<Long> longList) {
-    return longList instanceof LongArrayList ? ((LongArrayList) longList).clone() : new LongArrayList(longList);
+  private Dictionary getDictionary(Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    final Dictionary primaryCorrelationDictionary = blockValSetMap.get(_primaryCorrelationCol).getDictionary();
+    Preconditions.checkArgument(primaryCorrelationDictionary != null,
+        "CORRELATE_BY column in FUNNELCOUNT aggregation function not supported, please use a dictionary encoded "

Review Comment:
   Is this a temporary limitation?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -35,46 +50,107 @@
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
 
 
 /**
- * The {@code FunnelCountAggregationFunction} calculates the number of step conversions for a given partition column and
- * a list of boolean expressions.
- * <p>IMPORTANT: This function relies on the partition column being partitioned for each segment, where there are no
- * common values across different segments.
- * <p>This function calculates the exact number of step matches per partition key within the segment, then sums up the
- * results from different segments.
+ * The {@code FunnelCountAggregationFunction} calculates the number of conversions for a given correlation column and
+ * a list of steps as boolean expressions.
  *
  * Example:
  *   SELECT
  *    dateTrunc('day', timestamp) AS ts,
  *    FUNNEL_COUNT(
  *      STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
- *      CORRELATED_BY(user)
+ *      CORRELATED_BY(user_id)
  *    ) as step_counts
  *    FROM user_log
  *    WHERE url in ('/addToCart', '/checkout', '/orderConfirmation')
  *    GROUP BY 1
+ *
+ *  Counting strategies can be controlled via optional SETTINGS options, for example:
+ *
+ *  FUNNEL_COUNT(
+ *    STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
+ *    CORRELATED_BY(user_id),
+ *    SETTINGS('theta_sketch','nominalEntries=4096')
+ *  )
+ *
+ *  There are 5 strategies available, mirroring the corresponding distinct count implementations as per below.
+ *  <p><ul>
+ *  <li>'set': See DISTINCTCOUNT at {@link DistinctCountAggregationFunction}
+ *  <li>'bitmap' (default): See DISTINCTCOUNTBITMAP at {@link DistinctCountBitmapAggregationFunction}
+ *  <li>'theta_sketch': See DISTINCTCOUNTTHETASKETCH at {@link DistinctCountThetaSketchAggregationFunction}
+ *  <li>'partitioned': See SEGMENTPARTITIONEDDISTINCTCOUNT {@link SegmentPartitionedDistinctCountAggregationFunction}
+ *  <li>'sorted': sorted counts per segment then sums up. Only availabe in combination with 'partitioned'.
+ *  <li>'nominalEntries=4096': theta sketch configuration, default is 4096.
+ *  </ul><p>
  */
-public class FunnelCountAggregationFunction implements AggregationFunction<List<Long>, LongArrayList> {
+public class FunnelCountAggregationFunction implements AggregationFunction<Object, LongArrayList> {
+  private static final Sketch EMPTY_SKETCH = new UpdateSketchBuilder().build().compact();
   final List<ExpressionContext> _expressions;
   final List<ExpressionContext> _stepExpressions;
   final List<ExpressionContext> _correlateByExpressions;
   final ExpressionContext _primaryCorrelationCol;
   final int _numSteps;
-
-  final SegmentAggregationStrategy<?, List<Long>> _sortedAggregationStrategy;
-  final SegmentAggregationStrategy<?, List<Long>> _bitmapAggregationStrategy;
+  final int _nominalEntries;
+  final boolean _partitionSetting;
+  final boolean _sortingSetting;
+  final boolean _thetaSketchSetting;
+  final boolean _setSetting;
+
+  final AggregationStrategy _thetaSketchAggregationStrategy;
+  final AggregationStrategy _bitmapAggregationStrategy;
+  final AggregationStrategy _sortedAggregationStrategy;
+
+  final MergeStrategy _thetaSketchMergeStrategy;

Review Comment:
   Nit: Can this be moved to a child class for better readability?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -239,48 +350,112 @@ Optional<ExpressionContext> find(List<ExpressionContext> expressions) {
 
     public List<ExpressionContext> getInputExpressions(List<ExpressionContext> expressions) {
       return this.find(expressions).map(exp -> exp.getFunction().getArguments())
-          .orElseThrow(() -> new IllegalStateException("FUNNELCOUNT requires " + _name));
+          .orElseThrow(() -> new IllegalArgumentException("FUNNELCOUNT requires " + _name));
     }
 
     public ExpressionContext getFirstInputExpression(List<ExpressionContext> expressions) {
       return this.find(expressions)
           .flatMap(exp -> exp.getFunction().getArguments().stream().findFirst())
-          .orElseThrow(() -> new IllegalStateException("FUNNELCOUNT: " + _name + " requires an argument."));
+          .orElseThrow(() -> new IllegalArgumentException("FUNNELCOUNT: " + _name + " requires an argument."));
+    }
+
+    public List<String> getLiterals(List<ExpressionContext> expressions) {
+      List<ExpressionContext> inputExpressions = find(expressions).map(exp -> exp.getFunction().getArguments())
+          .orElseGet(Collections::emptyList);
+      Preconditions.checkArgument(
+          inputExpressions.stream().allMatch(exp -> exp.getType() == ExpressionContext.Type.LITERAL),
+          "FUNNELCOUNT: " + _name + " parameters must be literals");
+      return inputExpressions.stream().map(exp -> exp.getLiteral().getStringValue()).collect(Collectors.toList());
+    }
+
+    public static void validate(List<ExpressionContext> expressions) {
+      final List<String> invalidOptions = expressions.stream()
+          .filter(expression -> !Arrays.stream(Option.values()).anyMatch(option -> option.matches(expression)))
+          .map(ExpressionContext::toString)
+          .collect(Collectors.toList());
+
+      if (!invalidOptions.isEmpty()) {
+        throw new IllegalArgumentException("Invalid FUNNELCOUNT options: " + String.join(", ", invalidOptions));
+      }
+    }
+  }
+
+  enum Setting {
+    SET("set"),
+    BITMAP("bitmap"),

Review Comment:
   We could use this upstream to also store the MergeStrategy instance instead of having all present in the class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dario-liberman commented on a diff in pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "dario-liberman (via GitHub)" <gi...@apache.org>.
dario-liberman commented on code in PR #11092:
URL: https://github.com/apache/pinot/pull/11092#discussion_r1272643837


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -35,46 +50,107 @@
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
 
 
 /**
- * The {@code FunnelCountAggregationFunction} calculates the number of step conversions for a given partition column and
- * a list of boolean expressions.
- * <p>IMPORTANT: This function relies on the partition column being partitioned for each segment, where there are no
- * common values across different segments.
- * <p>This function calculates the exact number of step matches per partition key within the segment, then sums up the
- * results from different segments.
+ * The {@code FunnelCountAggregationFunction} calculates the number of conversions for a given correlation column and
+ * a list of steps as boolean expressions.
  *
  * Example:
  *   SELECT
  *    dateTrunc('day', timestamp) AS ts,
  *    FUNNEL_COUNT(
  *      STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
- *      CORRELATED_BY(user)
+ *      CORRELATED_BY(user_id)
  *    ) as step_counts
  *    FROM user_log
  *    WHERE url in ('/addToCart', '/checkout', '/orderConfirmation')
  *    GROUP BY 1
+ *
+ *  Counting strategies can be controlled via optional SETTINGS options, for example:
+ *
+ *  FUNNEL_COUNT(
+ *    STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
+ *    CORRELATED_BY(user_id),
+ *    SETTINGS('theta_sketch','nominalEntries=4096')
+ *  )
+ *
+ *  There are 5 strategies available, mirroring the corresponding distinct count implementations as per below.
+ *  <p><ul>
+ *  <li>'set': See DISTINCTCOUNT at {@link DistinctCountAggregationFunction}
+ *  <li>'bitmap' (default): See DISTINCTCOUNTBITMAP at {@link DistinctCountBitmapAggregationFunction}
+ *  <li>'theta_sketch': See DISTINCTCOUNTTHETASKETCH at {@link DistinctCountThetaSketchAggregationFunction}
+ *  <li>'partitioned': See SEGMENTPARTITIONEDDISTINCTCOUNT {@link SegmentPartitionedDistinctCountAggregationFunction}
+ *  <li>'sorted': sorted counts per segment then sums up. Only availabe in combination with 'partitioned'.
+ *  <li>'nominalEntries=4096': theta sketch configuration, default is 4096.
+ *  </ul><p>
  */
-public class FunnelCountAggregationFunction implements AggregationFunction<List<Long>, LongArrayList> {
+public class FunnelCountAggregationFunction implements AggregationFunction<Object, LongArrayList> {
+  private static final Sketch EMPTY_SKETCH = new UpdateSketchBuilder().build().compact();
   final List<ExpressionContext> _expressions;
   final List<ExpressionContext> _stepExpressions;
   final List<ExpressionContext> _correlateByExpressions;
   final ExpressionContext _primaryCorrelationCol;
   final int _numSteps;
-
-  final SegmentAggregationStrategy<?, List<Long>> _sortedAggregationStrategy;
-  final SegmentAggregationStrategy<?, List<Long>> _bitmapAggregationStrategy;
+  final int _nominalEntries;
+  final boolean _partitionSetting;
+  final boolean _sortingSetting;
+  final boolean _thetaSketchSetting;
+  final boolean _setSetting;
+
+  final AggregationStrategy _thetaSketchAggregationStrategy;
+  final AggregationStrategy _bitmapAggregationStrategy;
+  final AggregationStrategy _sortedAggregationStrategy;
+
+  final MergeStrategy _thetaSketchMergeStrategy;
+  final MergeStrategy _setMergeStrategy;
+  final MergeStrategy _bitmapMergeStrategy;
+  final MergeStrategy _partitionedMergeStrategy;
+
+  final ResultExtractionStrategy _thetaSketchResultExtractionStrategy;
+  final ResultExtractionStrategy _setResultExtractionStrategy;
+  final ResultExtractionStrategy _bitmapResultExtractionStrategy;
+  final ResultExtractionStrategy _sortedPartitionedResultExtractionStrategy;
+  final ResultExtractionStrategy _bitmapPartitionedResultExtractionStrategy;
+  final ResultExtractionStrategy _thetaSketchPartitionedResultExtractionStrategy;
 
   public FunnelCountAggregationFunction(List<ExpressionContext> expressions) {
     _expressions = expressions;
+    Option.validate(expressions);
     _correlateByExpressions = Option.CORRELATE_BY.getInputExpressions(expressions);
     _primaryCorrelationCol = Option.CORRELATE_BY.getFirstInputExpression(expressions);
     _stepExpressions = Option.STEPS.getInputExpressions(expressions);
     _numSteps = _stepExpressions.size();
-    _sortedAggregationStrategy = new SortedAggregationStrategy();
+
+    final List<String> settings = Option.SETTINGS.getLiterals(expressions);
+    Setting.validate(settings);

Review Comment:
   Some combinations are valid. For example, one could ask for `partitioned` together with `sorted` or together with `tetha_sketch`, or actually ask for all three (would fall-back to `theta_sketch` instead of the default `bitmap` strategy when a segment is not sorted).
   But we could indeed check for invalid combinations instead of just prioritising them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dario-liberman commented on a diff in pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "dario-liberman (via GitHub)" <gi...@apache.org>.
dario-liberman commented on code in PR #11092:
URL: https://github.com/apache/pinot/pull/11092#discussion_r1272733046


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -239,48 +350,112 @@ Optional<ExpressionContext> find(List<ExpressionContext> expressions) {
 
     public List<ExpressionContext> getInputExpressions(List<ExpressionContext> expressions) {
       return this.find(expressions).map(exp -> exp.getFunction().getArguments())
-          .orElseThrow(() -> new IllegalStateException("FUNNELCOUNT requires " + _name));
+          .orElseThrow(() -> new IllegalArgumentException("FUNNELCOUNT requires " + _name));
     }
 
     public ExpressionContext getFirstInputExpression(List<ExpressionContext> expressions) {
       return this.find(expressions)
           .flatMap(exp -> exp.getFunction().getArguments().stream().findFirst())
-          .orElseThrow(() -> new IllegalStateException("FUNNELCOUNT: " + _name + " requires an argument."));
+          .orElseThrow(() -> new IllegalArgumentException("FUNNELCOUNT: " + _name + " requires an argument."));
+    }
+
+    public List<String> getLiterals(List<ExpressionContext> expressions) {
+      List<ExpressionContext> inputExpressions = find(expressions).map(exp -> exp.getFunction().getArguments())
+          .orElseGet(Collections::emptyList);
+      Preconditions.checkArgument(
+          inputExpressions.stream().allMatch(exp -> exp.getType() == ExpressionContext.Type.LITERAL),
+          "FUNNELCOUNT: " + _name + " parameters must be literals");
+      return inputExpressions.stream().map(exp -> exp.getLiteral().getStringValue()).collect(Collectors.toList());
+    }
+
+    public static void validate(List<ExpressionContext> expressions) {
+      final List<String> invalidOptions = expressions.stream()
+          .filter(expression -> !Arrays.stream(Option.values()).anyMatch(option -> option.matches(expression)))
+          .map(ExpressionContext::toString)
+          .collect(Collectors.toList());
+
+      if (!invalidOptions.isEmpty()) {
+        throw new IllegalArgumentException("Invalid FUNNELCOUNT options: " + String.join(", ", invalidOptions));
+      }
+    }
+  }
+
+  enum Setting {
+    SET("set"),
+    BITMAP("bitmap"),

Review Comment:
   As I say above, a challenge is that of sorted vs unsorted segments. But I can probably reduce the runtime choice to just two strategies: sorted and unsorted strategy; with the unsorted one being resolved at construction time depending on the strategy settings given.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dario-liberman closed pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "dario-liberman (via GitHub)" <gi...@apache.org>.
dario-liberman closed pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)
URL: https://github.com/apache/pinot/pull/11092


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] atris merged pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "atris (via GitHub)" <gi...@apache.org>.
atris merged PR #11092:
URL: https://github.com/apache/pinot/pull/11092


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dario-liberman commented on pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "dario-liberman (via GitHub)" <gi...@apache.org>.
dario-liberman commented on PR #11092:
URL: https://github.com/apache/pinot/pull/11092#issuecomment-1674668813

   @atris : I have refactored the PR as discussed 
   
   1. moving all inner classes into top level within a dedicated sub-package.
   2. moving input validation as well as strategy creation and selection logic to a factory class.
   3. stronger type-checking by making the main aggregation function parametric.
   4. separating out the sorted strategy selection logic into a dedicated sub-class due to the added complexity; as unlike other strategies that can be selected at construction time, sorting can be decided only at aggregation time based on whether the segment being processed is actually sorted or not (eg. open segments may not be sorted).
   
   All previous tests for the different strategies remain untouched and continue to pass after the refactoring.
   
   Please re-review the PR. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] cbalci commented on a diff in pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "cbalci (via GitHub)" <gi...@apache.org>.
cbalci commented on code in PR #11092:
URL: https://github.com/apache/pinot/pull/11092#discussion_r1272481992


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -35,46 +50,107 @@
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
 
 
 /**
- * The {@code FunnelCountAggregationFunction} calculates the number of step conversions for a given partition column and
- * a list of boolean expressions.
- * <p>IMPORTANT: This function relies on the partition column being partitioned for each segment, where there are no
- * common values across different segments.
- * <p>This function calculates the exact number of step matches per partition key within the segment, then sums up the
- * results from different segments.
+ * The {@code FunnelCountAggregationFunction} calculates the number of conversions for a given correlation column and
+ * a list of steps as boolean expressions.
  *
  * Example:
  *   SELECT
  *    dateTrunc('day', timestamp) AS ts,
  *    FUNNEL_COUNT(
  *      STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
- *      CORRELATED_BY(user)
+ *      CORRELATED_BY(user_id)
  *    ) as step_counts
  *    FROM user_log
  *    WHERE url in ('/addToCart', '/checkout', '/orderConfirmation')
  *    GROUP BY 1
+ *
+ *  Counting strategies can be controlled via optional SETTINGS options, for example:
+ *
+ *  FUNNEL_COUNT(
+ *    STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
+ *    CORRELATED_BY(user_id),
+ *    SETTINGS('theta_sketch','nominalEntries=4096')
+ *  )
+ *
+ *  There are 5 strategies available, mirroring the corresponding distinct count implementations as per below.
+ *  <p><ul>
+ *  <li>'set': See DISTINCTCOUNT at {@link DistinctCountAggregationFunction}
+ *  <li>'bitmap' (default): See DISTINCTCOUNTBITMAP at {@link DistinctCountBitmapAggregationFunction}
+ *  <li>'theta_sketch': See DISTINCTCOUNTTHETASKETCH at {@link DistinctCountThetaSketchAggregationFunction}
+ *  <li>'partitioned': See SEGMENTPARTITIONEDDISTINCTCOUNT {@link SegmentPartitionedDistinctCountAggregationFunction}
+ *  <li>'sorted': sorted counts per segment then sums up. Only availabe in combination with 'partitioned'.
+ *  <li>'nominalEntries=4096': theta sketch configuration, default is 4096.
+ *  </ul><p>
  */
-public class FunnelCountAggregationFunction implements AggregationFunction<List<Long>, LongArrayList> {
+public class FunnelCountAggregationFunction implements AggregationFunction<Object, LongArrayList> {
+  private static final Sketch EMPTY_SKETCH = new UpdateSketchBuilder().build().compact();
   final List<ExpressionContext> _expressions;
   final List<ExpressionContext> _stepExpressions;
   final List<ExpressionContext> _correlateByExpressions;
   final ExpressionContext _primaryCorrelationCol;
   final int _numSteps;
-
-  final SegmentAggregationStrategy<?, List<Long>> _sortedAggregationStrategy;
-  final SegmentAggregationStrategy<?, List<Long>> _bitmapAggregationStrategy;
+  final int _nominalEntries;
+  final boolean _partitionSetting;
+  final boolean _sortingSetting;
+  final boolean _thetaSketchSetting;
+  final boolean _setSetting;
+
+  final AggregationStrategy _thetaSketchAggregationStrategy;
+  final AggregationStrategy _bitmapAggregationStrategy;
+  final AggregationStrategy _sortedAggregationStrategy;
+
+  final MergeStrategy _thetaSketchMergeStrategy;
+  final MergeStrategy _setMergeStrategy;
+  final MergeStrategy _bitmapMergeStrategy;
+  final MergeStrategy _partitionedMergeStrategy;
+
+  final ResultExtractionStrategy _thetaSketchResultExtractionStrategy;
+  final ResultExtractionStrategy _setResultExtractionStrategy;
+  final ResultExtractionStrategy _bitmapResultExtractionStrategy;
+  final ResultExtractionStrategy _sortedPartitionedResultExtractionStrategy;
+  final ResultExtractionStrategy _bitmapPartitionedResultExtractionStrategy;
+  final ResultExtractionStrategy _thetaSketchPartitionedResultExtractionStrategy;
 
   public FunnelCountAggregationFunction(List<ExpressionContext> expressions) {
     _expressions = expressions;
+    Option.validate(expressions);
     _correlateByExpressions = Option.CORRELATE_BY.getInputExpressions(expressions);
     _primaryCorrelationCol = Option.CORRELATE_BY.getFirstInputExpression(expressions);
     _stepExpressions = Option.STEPS.getInputExpressions(expressions);
     _numSteps = _stepExpressions.size();
-    _sortedAggregationStrategy = new SortedAggregationStrategy();
+
+    final List<String> settings = Option.SETTINGS.getLiterals(expressions);
+    Setting.validate(settings);

Review Comment:
   Should we validate only one of the strategies are selected and return an error otherwise?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -198,27 +278,58 @@ private int[][] getSteps(Map<ExpressionContext, BlockValSet> blockValSetMap) {
   }
 
   private boolean isSorted(Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    final Dictionary primaryCorrelationDictionary = blockValSetMap.get(_primaryCorrelationCol).getDictionary();
-    if (primaryCorrelationDictionary == null) {
-      throw new IllegalArgumentException(
-          "CORRELATE_BY column in FUNNELCOUNT aggregation function not supported, please use a dictionary encoded "
-              + "column.");
+    return getDictionary(blockValSetMap).isSorted();
+  }
+
+  private AggregationStrategy getAggregationStrategy(Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    if (_partitionSetting && _sortingSetting && isSorted(blockValSetMap)) {
+      return _sortedAggregationStrategy;
+    }
+    if (_thetaSketchSetting) {
+      return _thetaSketchAggregationStrategy;
     }
-    return primaryCorrelationDictionary.isSorted();
+    // default
+    return _bitmapAggregationStrategy;
   }
 
-  private SegmentAggregationStrategy<?, List<Long>> getAggregationStrategyByBlockValSetMap(
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    return isSorted(blockValSetMap) ? _sortedAggregationStrategy : _bitmapAggregationStrategy;
+  private ResultExtractionStrategy getResultExtractionStrategy(Object aggResult) {
+    if (_partitionSetting) {
+        if (_sortingSetting && aggResult instanceof SortedAggregationResult) {
+          return _sortedPartitionedResultExtractionStrategy;
+        }
+        if (_thetaSketchSetting) {
+          return _thetaSketchPartitionedResultExtractionStrategy;
+        }
+        return _bitmapPartitionedResultExtractionStrategy;
+    }
+    if (_thetaSketchSetting) {
+      return _thetaSketchResultExtractionStrategy;
+    }
+    if (_setSetting) {
+      return _setResultExtractionStrategy;
+    }
+    // default
+    return _bitmapResultExtractionStrategy;
   }
 
-  private SegmentAggregationStrategy<?, List<Long>> getAggregationStrategyByAggregationResult(Object aggResult) {
-    return aggResult instanceof SortedAggregationResult ? _sortedAggregationStrategy : _bitmapAggregationStrategy;
+  private MergeStrategy getMergeStrategy() {
+    if (_partitionSetting) {
+      return _partitionedMergeStrategy;
+    }
+    if (_thetaSketchSetting) {
+      return _thetaSketchMergeStrategy;
+    }
+    if (_setSetting) {
+      return _setMergeStrategy;
+    }
+    // default
+    return _bitmapMergeStrategy;
   }

Review Comment:
   This selection logic seems to be duplicated in multiple places. Can we centralize it to calculate merge/result/agg strategy at once?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dario-liberman commented on a diff in pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "dario-liberman (via GitHub)" <gi...@apache.org>.
dario-liberman commented on code in PR #11092:
URL: https://github.com/apache/pinot/pull/11092#discussion_r1272664595


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -35,46 +50,107 @@
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
 
 
 /**
- * The {@code FunnelCountAggregationFunction} calculates the number of step conversions for a given partition column and
- * a list of boolean expressions.
- * <p>IMPORTANT: This function relies on the partition column being partitioned for each segment, where there are no
- * common values across different segments.
- * <p>This function calculates the exact number of step matches per partition key within the segment, then sums up the
- * results from different segments.
+ * The {@code FunnelCountAggregationFunction} calculates the number of conversions for a given correlation column and
+ * a list of steps as boolean expressions.
  *
  * Example:
  *   SELECT
  *    dateTrunc('day', timestamp) AS ts,
  *    FUNNEL_COUNT(
  *      STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
- *      CORRELATED_BY(user)
+ *      CORRELATED_BY(user_id)
  *    ) as step_counts
  *    FROM user_log
  *    WHERE url in ('/addToCart', '/checkout', '/orderConfirmation')
  *    GROUP BY 1
+ *
+ *  Counting strategies can be controlled via optional SETTINGS options, for example:
+ *
+ *  FUNNEL_COUNT(
+ *    STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
+ *    CORRELATED_BY(user_id),
+ *    SETTINGS('theta_sketch','nominalEntries=4096')
+ *  )
+ *
+ *  There are 5 strategies available, mirroring the corresponding distinct count implementations as per below.
+ *  <p><ul>
+ *  <li>'set': See DISTINCTCOUNT at {@link DistinctCountAggregationFunction}
+ *  <li>'bitmap' (default): See DISTINCTCOUNTBITMAP at {@link DistinctCountBitmapAggregationFunction}
+ *  <li>'theta_sketch': See DISTINCTCOUNTTHETASKETCH at {@link DistinctCountThetaSketchAggregationFunction}
+ *  <li>'partitioned': See SEGMENTPARTITIONEDDISTINCTCOUNT {@link SegmentPartitionedDistinctCountAggregationFunction}
+ *  <li>'sorted': sorted counts per segment then sums up. Only availabe in combination with 'partitioned'.
+ *  <li>'nominalEntries=4096': theta sketch configuration, default is 4096.
+ *  </ul><p>
  */
-public class FunnelCountAggregationFunction implements AggregationFunction<List<Long>, LongArrayList> {
+public class FunnelCountAggregationFunction implements AggregationFunction<Object, LongArrayList> {
+  private static final Sketch EMPTY_SKETCH = new UpdateSketchBuilder().build().compact();
   final List<ExpressionContext> _expressions;
   final List<ExpressionContext> _stepExpressions;
   final List<ExpressionContext> _correlateByExpressions;
   final ExpressionContext _primaryCorrelationCol;
   final int _numSteps;
-
-  final SegmentAggregationStrategy<?, List<Long>> _sortedAggregationStrategy;
-  final SegmentAggregationStrategy<?, List<Long>> _bitmapAggregationStrategy;
+  final int _nominalEntries;
+  final boolean _partitionSetting;
+  final boolean _sortingSetting;
+  final boolean _thetaSketchSetting;
+  final boolean _setSetting;
+
+  final AggregationStrategy _thetaSketchAggregationStrategy;
+  final AggregationStrategy _bitmapAggregationStrategy;
+  final AggregationStrategy _sortedAggregationStrategy;
+
+  final MergeStrategy _thetaSketchMergeStrategy;

Review Comment:
   You mean the strategy building/selection logic? I can move it out, yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #11092:
URL: https://github.com/apache/pinot/pull/11092#issuecomment-1633317372

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/11092?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#11092](https://app.codecov.io/gh/apache/pinot/pull/11092?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (442f769) into [master](https://app.codecov.io/gh/apache/pinot/commit/6a47311c03fdc020a4e05a3acf5cacb7843e3307?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (6a47311) will **increase** coverage by `0.00%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@            Coverage Diff            @@
   ##           master   #11092     +/-   ##
   =========================================
     Coverage    0.11%    0.11%             
   =========================================
     Files        2189     2148     -41     
     Lines      118056   115809   -2247     
     Branches    17872    17600    -272     
   =========================================
     Hits          137      137             
   + Misses     117899   115652   -2247     
     Partials       20       20             
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1temurin11 | `?` | |
   | integration1temurin17 | `?` | |
   | integration2temurin11 | `?` | |
   | integration2temurin17 | `?` | |
   | integration2temurin20 | `?` | |
   | unittests1temurin11 | `?` | |
   | unittests1temurin17 | `?` | |
   | unittests1temurin20 | `?` | |
   | unittests2temurin11 | `0.11% <0.00%> (-0.01%)` | :arrow_down: |
   | unittests2temurin17 | `?` | |
   | unittests2temurin20 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://app.codecov.io/gh/apache/pinot/pull/11092?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...ation/function/FunnelCountAggregationFunction.java](https://app.codecov.io/gh/apache/pinot/pull/11092?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9GdW5uZWxDb3VudEFnZ3JlZ2F0aW9uRnVuY3Rpb24uamF2YQ==) | `0.00% <0.00%> (ø)` | |
   
   ... and [168 files with indirect coverage changes](https://app.codecov.io/gh/apache/pinot/pull/11092/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dario-liberman commented on a diff in pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "dario-liberman (via GitHub)" <gi...@apache.org>.
dario-liberman commented on code in PR #11092:
URL: https://github.com/apache/pinot/pull/11092#discussion_r1272704204


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -180,8 +256,12 @@ public String toExplainString() {
     return stringBuilder.append(')').toString();
   }
 
-  private static LongArrayList toLongArrayList(List<Long> longList) {
-    return longList instanceof LongArrayList ? ((LongArrayList) longList).clone() : new LongArrayList(longList);
+  private Dictionary getDictionary(Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    final Dictionary primaryCorrelationDictionary = blockValSetMap.get(_primaryCorrelationCol).getDictionary();
+    Preconditions.checkArgument(primaryCorrelationDictionary != null,
+        "CORRELATE_BY column in FUNNELCOUNT aggregation function not supported, please use a dictionary encoded "

Review Comment:
   It is mentioned as a limitation in the documentation for the function (linked above in this PR). I personally have no plans to remove this limitation, as it would be rare to correlate by something other than an actual column. Someone else in the community could obviously contribute the necessary changes, but I think it is a fair limitation for a funnel analytics function. I might work in the future on supporting a secondary set of correlations though, in addition to a primary correlation (eg. correlate by user id + order id). Depending on how that is implemented perhaps I could remove the limitation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dario-liberman commented on a diff in pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "dario-liberman (via GitHub)" <gi...@apache.org>.
dario-liberman commented on code in PR #11092:
URL: https://github.com/apache/pinot/pull/11092#discussion_r1272661647


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -35,46 +50,107 @@
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
 
 
 /**
- * The {@code FunnelCountAggregationFunction} calculates the number of step conversions for a given partition column and
- * a list of boolean expressions.
- * <p>IMPORTANT: This function relies on the partition column being partitioned for each segment, where there are no
- * common values across different segments.
- * <p>This function calculates the exact number of step matches per partition key within the segment, then sums up the
- * results from different segments.
+ * The {@code FunnelCountAggregationFunction} calculates the number of conversions for a given correlation column and
+ * a list of steps as boolean expressions.
  *
  * Example:
  *   SELECT
  *    dateTrunc('day', timestamp) AS ts,
  *    FUNNEL_COUNT(
  *      STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
- *      CORRELATED_BY(user)
+ *      CORRELATED_BY(user_id)
  *    ) as step_counts
  *    FROM user_log
  *    WHERE url in ('/addToCart', '/checkout', '/orderConfirmation')
  *    GROUP BY 1
+ *
+ *  Counting strategies can be controlled via optional SETTINGS options, for example:
+ *
+ *  FUNNEL_COUNT(
+ *    STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
+ *    CORRELATED_BY(user_id),
+ *    SETTINGS('theta_sketch','nominalEntries=4096')
+ *  )
+ *
+ *  There are 5 strategies available, mirroring the corresponding distinct count implementations as per below.
+ *  <p><ul>
+ *  <li>'set': See DISTINCTCOUNT at {@link DistinctCountAggregationFunction}
+ *  <li>'bitmap' (default): See DISTINCTCOUNTBITMAP at {@link DistinctCountBitmapAggregationFunction}
+ *  <li>'theta_sketch': See DISTINCTCOUNTTHETASKETCH at {@link DistinctCountThetaSketchAggregationFunction}
+ *  <li>'partitioned': See SEGMENTPARTITIONEDDISTINCTCOUNT {@link SegmentPartitionedDistinctCountAggregationFunction}
+ *  <li>'sorted': sorted counts per segment then sums up. Only availabe in combination with 'partitioned'.
+ *  <li>'nominalEntries=4096': theta sketch configuration, default is 4096.
+ *  </ul><p>
  */
-public class FunnelCountAggregationFunction implements AggregationFunction<List<Long>, LongArrayList> {
+public class FunnelCountAggregationFunction implements AggregationFunction<Object, LongArrayList> {

Review Comment:
   Yes, unfortunately java has no support for union types (well, only in exception catch clauses). I can create a wrapper if you think that helps, as each strategy is effectively using a different underlying aggregation type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dario-liberman commented on a diff in pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "dario-liberman (via GitHub)" <gi...@apache.org>.
dario-liberman commented on code in PR #11092:
URL: https://github.com/apache/pinot/pull/11092#discussion_r1272648479


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -198,27 +278,58 @@ private int[][] getSteps(Map<ExpressionContext, BlockValSet> blockValSetMap) {
   }
 
   private boolean isSorted(Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    final Dictionary primaryCorrelationDictionary = blockValSetMap.get(_primaryCorrelationCol).getDictionary();
-    if (primaryCorrelationDictionary == null) {
-      throw new IllegalArgumentException(
-          "CORRELATE_BY column in FUNNELCOUNT aggregation function not supported, please use a dictionary encoded "
-              + "column.");
+    return getDictionary(blockValSetMap).isSorted();
+  }
+
+  private AggregationStrategy getAggregationStrategy(Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    if (_partitionSetting && _sortingSetting && isSorted(blockValSetMap)) {
+      return _sortedAggregationStrategy;
+    }
+    if (_thetaSketchSetting) {
+      return _thetaSketchAggregationStrategy;
     }
-    return primaryCorrelationDictionary.isSorted();
+    // default
+    return _bitmapAggregationStrategy;
   }
 
-  private SegmentAggregationStrategy<?, List<Long>> getAggregationStrategyByBlockValSetMap(
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    return isSorted(blockValSetMap) ? _sortedAggregationStrategy : _bitmapAggregationStrategy;
+  private ResultExtractionStrategy getResultExtractionStrategy(Object aggResult) {
+    if (_partitionSetting) {
+        if (_sortingSetting && aggResult instanceof SortedAggregationResult) {
+          return _sortedPartitionedResultExtractionStrategy;
+        }
+        if (_thetaSketchSetting) {
+          return _thetaSketchPartitionedResultExtractionStrategy;
+        }
+        return _bitmapPartitionedResultExtractionStrategy;
+    }
+    if (_thetaSketchSetting) {
+      return _thetaSketchResultExtractionStrategy;
+    }
+    if (_setSetting) {
+      return _setResultExtractionStrategy;
+    }
+    // default
+    return _bitmapResultExtractionStrategy;
   }
 
-  private SegmentAggregationStrategy<?, List<Long>> getAggregationStrategyByAggregationResult(Object aggResult) {
-    return aggResult instanceof SortedAggregationResult ? _sortedAggregationStrategy : _bitmapAggregationStrategy;
+  private MergeStrategy getMergeStrategy() {
+    if (_partitionSetting) {
+      return _partitionedMergeStrategy;
+    }
+    if (_thetaSketchSetting) {
+      return _thetaSketchMergeStrategy;
+    }
+    if (_setSetting) {
+      return _setMergeStrategy;
+    }
+    // default
+    return _bitmapMergeStrategy;
   }

Review Comment:
   The main challenge is that the strategy is somewhat dynamic, the first two strategies depend on whether the segment is actually sorted or not. For example the current open segment will not be sorted, only closed segments will.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dario-liberman commented on a diff in pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "dario-liberman (via GitHub)" <gi...@apache.org>.
dario-liberman commented on code in PR #11092:
URL: https://github.com/apache/pinot/pull/11092#discussion_r1272679653


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -109,47 +185,52 @@ public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int ma
   @Override
   public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregate(length, aggregationResultHolder, blockValSetMap);
+    getAggregationStrategy(blockValSetMap)
+        .aggregate(length, aggregationResultHolder, blockValSetMap);
   }
 
   @Override
   public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregateGroupBySV(length, groupKeyArray,
-        groupByResultHolder, blockValSetMap);
+    getAggregationStrategy(blockValSetMap)
+        .aggregateGroupBySV(length, groupKeyArray, groupByResultHolder, blockValSetMap);
   }
 
   @Override
   public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregateGroupByMV(length, groupKeysArray,
-        groupByResultHolder, blockValSetMap);
+    getAggregationStrategy(blockValSetMap)
+        .aggregateGroupByMV(length, groupKeysArray, groupByResultHolder, blockValSetMap);
   }
 
   @Override
-  public List<Long> extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
-    return getAggregationStrategyByAggregationResult(aggregationResultHolder.getResult()).extractAggregationResult(
-        aggregationResultHolder);
+  public Object extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    return getResultExtractionStrategy(aggregationResultHolder.getResult())
+        .extractAggregationResult(aggregationResultHolder);
   }
-
   @Override
-  public List<Long> extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
-    return getAggregationStrategyByAggregationResult(groupByResultHolder.getResult(groupKey)).extractGroupByResult(
-        groupByResultHolder, groupKey);
+  public Object extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+    return getResultExtractionStrategy(groupByResultHolder.getResult(groupKey))
+        .extractGroupByResult(groupByResultHolder, groupKey);
   }
 
   @Override
-  public List<Long> merge(List<Long> a, List<Long> b) {
-    int length = a.size();
-    Preconditions.checkState(length == b.size(), "The two operand arrays are not of the same size! provided %s, %s",
-        length, b.size());
+  public Object merge(Object a, Object b) {

Review Comment:
   It can be a wrapper type, but it would not be known to the underlying merge strategy, I don't think, as each uses a different aggregation type. 
   I personally think that the abstraction is unnecessary and will create unnecessary garbage collection burden.
   
   Note also that although the `AggregationFunction` interface has a type parameter for the intermediate result, everything outside just uses `Object`, see for example `IndexedTable`.
   
   I considered to propagate the type further up to avoid the use of `Object` here, using a generic type instead, moving some of the strategy selection to a separate aggregation function factory class.
   There are two main challenges in that approach: (1) segments might be unsorted, making strategy selection dynamic. (2) currently it supports quite a few combinations of strategies (but I can probably make it more dumb and support specific combinations for the sake of readability/maintainability).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] cbalci commented on a diff in pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "cbalci (via GitHub)" <gi...@apache.org>.
cbalci commented on code in PR #11092:
URL: https://github.com/apache/pinot/pull/11092#discussion_r1272489947


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -198,27 +278,58 @@ private int[][] getSteps(Map<ExpressionContext, BlockValSet> blockValSetMap) {
   }
 
   private boolean isSorted(Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    final Dictionary primaryCorrelationDictionary = blockValSetMap.get(_primaryCorrelationCol).getDictionary();
-    if (primaryCorrelationDictionary == null) {
-      throw new IllegalArgumentException(
-          "CORRELATE_BY column in FUNNELCOUNT aggregation function not supported, please use a dictionary encoded "
-              + "column.");
+    return getDictionary(blockValSetMap).isSorted();
+  }
+
+  private AggregationStrategy getAggregationStrategy(Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    if (_partitionSetting && _sortingSetting && isSorted(blockValSetMap)) {
+      return _sortedAggregationStrategy;
+    }
+    if (_thetaSketchSetting) {
+      return _thetaSketchAggregationStrategy;
     }
-    return primaryCorrelationDictionary.isSorted();
+    // default
+    return _bitmapAggregationStrategy;
   }
 
-  private SegmentAggregationStrategy<?, List<Long>> getAggregationStrategyByBlockValSetMap(
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    return isSorted(blockValSetMap) ? _sortedAggregationStrategy : _bitmapAggregationStrategy;
+  private ResultExtractionStrategy getResultExtractionStrategy(Object aggResult) {
+    if (_partitionSetting) {
+        if (_sortingSetting && aggResult instanceof SortedAggregationResult) {
+          return _sortedPartitionedResultExtractionStrategy;
+        }
+        if (_thetaSketchSetting) {
+          return _thetaSketchPartitionedResultExtractionStrategy;
+        }
+        return _bitmapPartitionedResultExtractionStrategy;
+    }
+    if (_thetaSketchSetting) {
+      return _thetaSketchResultExtractionStrategy;
+    }
+    if (_setSetting) {
+      return _setResultExtractionStrategy;
+    }
+    // default
+    return _bitmapResultExtractionStrategy;
   }
 
-  private SegmentAggregationStrategy<?, List<Long>> getAggregationStrategyByAggregationResult(Object aggResult) {
-    return aggResult instanceof SortedAggregationResult ? _sortedAggregationStrategy : _bitmapAggregationStrategy;
+  private MergeStrategy getMergeStrategy() {
+    if (_partitionSetting) {
+      return _partitionedMergeStrategy;
+    }
+    if (_thetaSketchSetting) {
+      return _thetaSketchMergeStrategy;
+    }
+    if (_setSetting) {
+      return _setMergeStrategy;
+    }
+    // default
+    return _bitmapMergeStrategy;
   }

Review Comment:
   This selection logic seems to be duplicated in multiple places. Can we centralize it to calculate merge/result/agg strategy in one place?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on PR #11092:
URL: https://github.com/apache/pinot/pull/11092#issuecomment-1699681177

   Thanks for the contribution! Can you help update the pinot doc for this feature?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org