You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "atris (via GitHub)" <gi...@apache.org> on 2023/07/24 18:47:21 UTC

[GitHub] [pinot] atris commented on a diff in pull request #11092: Funnel Count - Multiple Strategies (no partitioning requisites)

atris commented on code in PR #11092:
URL: https://github.com/apache/pinot/pull/11092#discussion_r1272617169


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -35,46 +50,107 @@
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
 
 
 /**
- * The {@code FunnelCountAggregationFunction} calculates the number of step conversions for a given partition column and
- * a list of boolean expressions.
- * <p>IMPORTANT: This function relies on the partition column being partitioned for each segment, where there are no
- * common values across different segments.
- * <p>This function calculates the exact number of step matches per partition key within the segment, then sums up the
- * results from different segments.
+ * The {@code FunnelCountAggregationFunction} calculates the number of conversions for a given correlation column and
+ * a list of steps as boolean expressions.
  *
  * Example:
  *   SELECT
  *    dateTrunc('day', timestamp) AS ts,
  *    FUNNEL_COUNT(
  *      STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
- *      CORRELATED_BY(user)
+ *      CORRELATED_BY(user_id)
  *    ) as step_counts
  *    FROM user_log
  *    WHERE url in ('/addToCart', '/checkout', '/orderConfirmation')
  *    GROUP BY 1
+ *
+ *  Counting strategies can be controlled via optional SETTINGS options, for example:
+ *
+ *  FUNNEL_COUNT(
+ *    STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
+ *    CORRELATED_BY(user_id),
+ *    SETTINGS('theta_sketch','nominalEntries=4096')
+ *  )
+ *
+ *  There are 5 strategies available, mirroring the corresponding distinct count implementations as per below.
+ *  <p><ul>
+ *  <li>'set': See DISTINCTCOUNT at {@link DistinctCountAggregationFunction}
+ *  <li>'bitmap' (default): See DISTINCTCOUNTBITMAP at {@link DistinctCountBitmapAggregationFunction}
+ *  <li>'theta_sketch': See DISTINCTCOUNTTHETASKETCH at {@link DistinctCountThetaSketchAggregationFunction}
+ *  <li>'partitioned': See SEGMENTPARTITIONEDDISTINCTCOUNT {@link SegmentPartitionedDistinctCountAggregationFunction}
+ *  <li>'sorted': sorted counts per segment then sums up. Only availabe in combination with 'partitioned'.
+ *  <li>'nominalEntries=4096': theta sketch configuration, default is 4096.
+ *  </ul><p>
  */
-public class FunnelCountAggregationFunction implements AggregationFunction<List<Long>, LongArrayList> {
+public class FunnelCountAggregationFunction implements AggregationFunction<Object, LongArrayList> {

Review Comment:
   We are losing some type specification here by moving to Object. Is it possible to be creating an abstract type specific to our functions, and use it here>



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -198,27 +278,58 @@ private int[][] getSteps(Map<ExpressionContext, BlockValSet> blockValSetMap) {
   }
 
   private boolean isSorted(Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    final Dictionary primaryCorrelationDictionary = blockValSetMap.get(_primaryCorrelationCol).getDictionary();
-    if (primaryCorrelationDictionary == null) {
-      throw new IllegalArgumentException(
-          "CORRELATE_BY column in FUNNELCOUNT aggregation function not supported, please use a dictionary encoded "
-              + "column.");
+    return getDictionary(blockValSetMap).isSorted();
+  }
+
+  private AggregationStrategy getAggregationStrategy(Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    if (_partitionSetting && _sortingSetting && isSorted(blockValSetMap)) {
+      return _sortedAggregationStrategy;
+    }
+    if (_thetaSketchSetting) {
+      return _thetaSketchAggregationStrategy;
     }
-    return primaryCorrelationDictionary.isSorted();
+    // default
+    return _bitmapAggregationStrategy;
   }
 
-  private SegmentAggregationStrategy<?, List<Long>> getAggregationStrategyByBlockValSetMap(
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    return isSorted(blockValSetMap) ? _sortedAggregationStrategy : _bitmapAggregationStrategy;
+  private ResultExtractionStrategy getResultExtractionStrategy(Object aggResult) {
+    if (_partitionSetting) {
+        if (_sortingSetting && aggResult instanceof SortedAggregationResult) {
+          return _sortedPartitionedResultExtractionStrategy;
+        }
+        if (_thetaSketchSetting) {
+          return _thetaSketchPartitionedResultExtractionStrategy;
+        }
+        return _bitmapPartitionedResultExtractionStrategy;
+    }
+    if (_thetaSketchSetting) {
+      return _thetaSketchResultExtractionStrategy;

Review Comment:
   This IMO looks a bit scary. Are the existing tests exercising the code?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -109,47 +185,52 @@ public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int ma
   @Override
   public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregate(length, aggregationResultHolder, blockValSetMap);
+    getAggregationStrategy(blockValSetMap)
+        .aggregate(length, aggregationResultHolder, blockValSetMap);
   }
 
   @Override
   public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregateGroupBySV(length, groupKeyArray,
-        groupByResultHolder, blockValSetMap);
+    getAggregationStrategy(blockValSetMap)
+        .aggregateGroupBySV(length, groupKeyArray, groupByResultHolder, blockValSetMap);
   }
 
   @Override
   public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregateGroupByMV(length, groupKeysArray,
-        groupByResultHolder, blockValSetMap);
+    getAggregationStrategy(blockValSetMap)
+        .aggregateGroupByMV(length, groupKeysArray, groupByResultHolder, blockValSetMap);
   }
 
   @Override
-  public List<Long> extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
-    return getAggregationStrategyByAggregationResult(aggregationResultHolder.getResult()).extractAggregationResult(
-        aggregationResultHolder);
+  public Object extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    return getResultExtractionStrategy(aggregationResultHolder.getResult())
+        .extractAggregationResult(aggregationResultHolder);
   }
-
   @Override
-  public List<Long> extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
-    return getAggregationStrategyByAggregationResult(groupByResultHolder.getResult(groupKey)).extractGroupByResult(
-        groupByResultHolder, groupKey);
+  public Object extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+    return getResultExtractionStrategy(groupByResultHolder.getResult(groupKey))
+        .extractGroupByResult(groupByResultHolder, groupKey);
   }
 
   @Override
-  public List<Long> merge(List<Long> a, List<Long> b) {
-    int length = a.size();
-    Preconditions.checkState(length == b.size(), "The two operand arrays are not of the same size! provided %s, %s",
-        length, b.size());
+  public Object merge(Object a, Object b) {

Review Comment:
   Can this be an abstract type, known to this class and MergeStrategy?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -180,8 +256,12 @@ public String toExplainString() {
     return stringBuilder.append(')').toString();
   }
 
-  private static LongArrayList toLongArrayList(List<Long> longList) {
-    return longList instanceof LongArrayList ? ((LongArrayList) longList).clone() : new LongArrayList(longList);
+  private Dictionary getDictionary(Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    final Dictionary primaryCorrelationDictionary = blockValSetMap.get(_primaryCorrelationCol).getDictionary();
+    Preconditions.checkArgument(primaryCorrelationDictionary != null,
+        "CORRELATE_BY column in FUNNELCOUNT aggregation function not supported, please use a dictionary encoded "

Review Comment:
   Is this a temporary limitation?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -35,46 +50,107 @@
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
 
 
 /**
- * The {@code FunnelCountAggregationFunction} calculates the number of step conversions for a given partition column and
- * a list of boolean expressions.
- * <p>IMPORTANT: This function relies on the partition column being partitioned for each segment, where there are no
- * common values across different segments.
- * <p>This function calculates the exact number of step matches per partition key within the segment, then sums up the
- * results from different segments.
+ * The {@code FunnelCountAggregationFunction} calculates the number of conversions for a given correlation column and
+ * a list of steps as boolean expressions.
  *
  * Example:
  *   SELECT
  *    dateTrunc('day', timestamp) AS ts,
  *    FUNNEL_COUNT(
  *      STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
- *      CORRELATED_BY(user)
+ *      CORRELATED_BY(user_id)
  *    ) as step_counts
  *    FROM user_log
  *    WHERE url in ('/addToCart', '/checkout', '/orderConfirmation')
  *    GROUP BY 1
+ *
+ *  Counting strategies can be controlled via optional SETTINGS options, for example:
+ *
+ *  FUNNEL_COUNT(
+ *    STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
+ *    CORRELATED_BY(user_id),
+ *    SETTINGS('theta_sketch','nominalEntries=4096')
+ *  )
+ *
+ *  There are 5 strategies available, mirroring the corresponding distinct count implementations as per below.
+ *  <p><ul>
+ *  <li>'set': See DISTINCTCOUNT at {@link DistinctCountAggregationFunction}
+ *  <li>'bitmap' (default): See DISTINCTCOUNTBITMAP at {@link DistinctCountBitmapAggregationFunction}
+ *  <li>'theta_sketch': See DISTINCTCOUNTTHETASKETCH at {@link DistinctCountThetaSketchAggregationFunction}
+ *  <li>'partitioned': See SEGMENTPARTITIONEDDISTINCTCOUNT {@link SegmentPartitionedDistinctCountAggregationFunction}
+ *  <li>'sorted': sorted counts per segment then sums up. Only availabe in combination with 'partitioned'.
+ *  <li>'nominalEntries=4096': theta sketch configuration, default is 4096.
+ *  </ul><p>
  */
-public class FunnelCountAggregationFunction implements AggregationFunction<List<Long>, LongArrayList> {
+public class FunnelCountAggregationFunction implements AggregationFunction<Object, LongArrayList> {
+  private static final Sketch EMPTY_SKETCH = new UpdateSketchBuilder().build().compact();
   final List<ExpressionContext> _expressions;
   final List<ExpressionContext> _stepExpressions;
   final List<ExpressionContext> _correlateByExpressions;
   final ExpressionContext _primaryCorrelationCol;
   final int _numSteps;
-
-  final SegmentAggregationStrategy<?, List<Long>> _sortedAggregationStrategy;
-  final SegmentAggregationStrategy<?, List<Long>> _bitmapAggregationStrategy;
+  final int _nominalEntries;
+  final boolean _partitionSetting;
+  final boolean _sortingSetting;
+  final boolean _thetaSketchSetting;
+  final boolean _setSetting;
+
+  final AggregationStrategy _thetaSketchAggregationStrategy;
+  final AggregationStrategy _bitmapAggregationStrategy;
+  final AggregationStrategy _sortedAggregationStrategy;
+
+  final MergeStrategy _thetaSketchMergeStrategy;

Review Comment:
   Nit: Can this be moved to a child class for better readability?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -239,48 +350,112 @@ Optional<ExpressionContext> find(List<ExpressionContext> expressions) {
 
     public List<ExpressionContext> getInputExpressions(List<ExpressionContext> expressions) {
       return this.find(expressions).map(exp -> exp.getFunction().getArguments())
-          .orElseThrow(() -> new IllegalStateException("FUNNELCOUNT requires " + _name));
+          .orElseThrow(() -> new IllegalArgumentException("FUNNELCOUNT requires " + _name));
     }
 
     public ExpressionContext getFirstInputExpression(List<ExpressionContext> expressions) {
       return this.find(expressions)
           .flatMap(exp -> exp.getFunction().getArguments().stream().findFirst())
-          .orElseThrow(() -> new IllegalStateException("FUNNELCOUNT: " + _name + " requires an argument."));
+          .orElseThrow(() -> new IllegalArgumentException("FUNNELCOUNT: " + _name + " requires an argument."));
+    }
+
+    public List<String> getLiterals(List<ExpressionContext> expressions) {
+      List<ExpressionContext> inputExpressions = find(expressions).map(exp -> exp.getFunction().getArguments())
+          .orElseGet(Collections::emptyList);
+      Preconditions.checkArgument(
+          inputExpressions.stream().allMatch(exp -> exp.getType() == ExpressionContext.Type.LITERAL),
+          "FUNNELCOUNT: " + _name + " parameters must be literals");
+      return inputExpressions.stream().map(exp -> exp.getLiteral().getStringValue()).collect(Collectors.toList());
+    }
+
+    public static void validate(List<ExpressionContext> expressions) {
+      final List<String> invalidOptions = expressions.stream()
+          .filter(expression -> !Arrays.stream(Option.values()).anyMatch(option -> option.matches(expression)))
+          .map(ExpressionContext::toString)
+          .collect(Collectors.toList());
+
+      if (!invalidOptions.isEmpty()) {
+        throw new IllegalArgumentException("Invalid FUNNELCOUNT options: " + String.join(", ", invalidOptions));
+      }
+    }
+  }
+
+  enum Setting {
+    SET("set"),
+    BITMAP("bitmap"),

Review Comment:
   We could use this upstream to also store the MergeStrategy instance instead of having all present in the class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org