You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/09/11 07:06:11 UTC

[GitHub] [incubator-pinot] Jackie-Jiang opened a new pull request #6004: Add ThetaSketchAggregationFunction

Jackie-Jiang opened a new pull request #6004:
URL: https://github.com/apache/incubator-pinot/pull/6004


   ## Description
   Introduce `ThetaSketchAggregationFunction` as an enhanced version of `DistinctCountThetaSketchAggregationFunction`, and add the following supports:
   - Support aggregating on raw values of all data types (both SV and MV)
   - Support nested filter (E.g. `A = 1 AND (B = 2 OR C = 3)`)
   - Support filter on all data types (both SV and MV)
   - Support simple union without filters (E.g. `thetaSketch(col)`)
   - Support `$0` as the default sketch (sketch without filter)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on pull request #6004: Enhance DistinctCountThetaSketchAggregationFunction

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on pull request #6004:
URL: https://github.com/apache/incubator-pinot/pull/6004#issuecomment-700913048


   @mayankshriv Changed the ``DistinctCountRawThetaSketchAggregationFunction` to have the same usage as `DistinctCountThetaSketchAggregationFunction`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mayankshriv commented on a change in pull request #6004: Enhance DistinctCountThetaSketchAggregationFunction

Posted by GitBox <gi...@apache.org>.
mayankshriv commented on a change in pull request #6004:
URL: https://github.com/apache/incubator-pinot/pull/6004#discussion_r496864642



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
##########
@@ -776,6 +783,69 @@ public IdSet deserialize(ByteBuffer byteBuffer) {
     }
   };
 
+  public static final ObjectSerDe<List<Object>> LIST_SER_DE = new ObjectSerDe<List<Object>>() {
+
+    @Override
+    public byte[] serialize(List<Object> list) {
+      int size = list.size();
+
+      // Directly return the size (0) for empty list
+      if (size == 0) {
+        return new byte[Integer.BYTES];

Review comment:
       Nit: Can a static final be used here (to avoid creating new object)?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
##########
@@ -18,118 +18,529 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import com.google.common.base.Preconditions;
+import java.util.Base64;
 import java.util.List;
 import java.util.Map;
-import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.theta.SetOperationBuilder;
 import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.theta.UpdateSketchBuilder;
 import org.apache.pinot.common.function.AggregationFunctionType;
-import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.function.DistinctCountThetaSketchAggregationFunction.Parameters;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
-import org.apache.pinot.spi.utils.ByteArray;
-
-import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.BYTES;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
 /**
- * A variation of the {@link DistinctCountThetaSketchAggregationFunction} that returns the serialized bytes
- * of the theta-sketch, as opposed to the actual distinct value.
- *
- * Note: It would have been natural for this class to extend the {@link DistinctCountThetaSketchAggregationFunction},
- * except that the return type for this class is a String, as opposed to Integer for the former, due to which the
- * extension is not possible.
+ * The {@code DistinctCountRawThetaSketchAggregationFunction} collects the values for a given expression (can be
+ * single-valued or multi-valued) into a {@link Sketch} object, and returns the sketch as a base64 encoded string. It
+ * treats BYTES expression as serialized sketches.
+ * <p>The function takes an optional second argument as the parameters for the function. Currently there is only 1

Review comment:
       What about predicates, and post-aggregation expression? Are those arguments not supported?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
##########
@@ -776,6 +783,69 @@ public IdSet deserialize(ByteBuffer byteBuffer) {
     }
   };
 
+  public static final ObjectSerDe<List<Object>> LIST_SER_DE = new ObjectSerDe<List<Object>>() {
+
+    @Override
+    public byte[] serialize(List<Object> list) {
+      int size = list.size();
+
+      // Directly return the size (0) for empty list
+      if (size == 0) {
+        return new byte[Integer.BYTES];
+      }
+
+      // No need to close these 2 streams

Review comment:
       Please also add why in the comment.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
##########
@@ -18,118 +18,529 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import com.google.common.base.Preconditions;
+import java.util.Base64;
 import java.util.List;
 import java.util.Map;
-import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.theta.SetOperationBuilder;
 import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.theta.UpdateSketchBuilder;
 import org.apache.pinot.common.function.AggregationFunctionType;
-import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.function.DistinctCountThetaSketchAggregationFunction.Parameters;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
-import org.apache.pinot.spi.utils.ByteArray;
-
-import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.BYTES;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
 /**
- * A variation of the {@link DistinctCountThetaSketchAggregationFunction} that returns the serialized bytes
- * of the theta-sketch, as opposed to the actual distinct value.
- *
- * Note: It would have been natural for this class to extend the {@link DistinctCountThetaSketchAggregationFunction},
- * except that the return type for this class is a String, as opposed to Integer for the former, due to which the
- * extension is not possible.
+ * The {@code DistinctCountRawThetaSketchAggregationFunction} collects the values for a given expression (can be
+ * single-valued or multi-valued) into a {@link Sketch} object, and returns the sketch as a base64 encoded string. It
+ * treats BYTES expression as serialized sketches.
+ * <p>The function takes an optional second argument as the parameters for the function. Currently there is only 1
+ * parameter for the function:
+ * <ul>
+ *   <li>
+ *     nominalEntries: The nominal entries used to create the sketch. (Default 4096)
+ *   </li>
+ * </ul>
+ * <p>Example: DISTINCT_COUNT_RAW_THETA_SKETCH(col, 'nominalEntries=8192')
  */
-public class DistinctCountRawThetaSketchAggregationFunction implements AggregationFunction<Map<String, Sketch>, ByteArray> {
-  private final DistinctCountThetaSketchAggregationFunction _thetaSketchAggregationFunction;
+public class DistinctCountRawThetaSketchAggregationFunction extends BaseSingleInputAggregationFunction<Sketch, String> {
+  private final UpdateSketchBuilder _updateSketchBuilder = new UpdateSketchBuilder();
+  private final SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
+
+  public DistinctCountRawThetaSketchAggregationFunction(List<ExpressionContext> arguments) {
+    super(arguments.get(0));
 
-  public DistinctCountRawThetaSketchAggregationFunction(List<ExpressionContext> arguments)
-      throws SqlParseException {
-    _thetaSketchAggregationFunction = new DistinctCountThetaSketchAggregationFunction(arguments);
+    // Optional second argument for theta-sketch parameters
+    if (arguments.size() > 1) {
+      ExpressionContext paramsExpression = arguments.get(1);
+      Preconditions.checkArgument(paramsExpression.getType() == ExpressionContext.Type.LITERAL,
+          "Second argument of DISTINCT_COUNT_RAW_THETA_SKETCH aggregation function must be literal (parameters)");
+      Parameters parameters = new Parameters(paramsExpression.getLiteral());
+      int nominalEntries = parameters.getNominalEntries();
+      _updateSketchBuilder.setNominalEntries(nominalEntries);
+      _setOperationBuilder.setNominalEntries(nominalEntries);
+    }
   }
 
   @Override
   public AggregationFunctionType getType() {
     return AggregationFunctionType.DISTINCTCOUNTRAWTHETASKETCH;
   }
 
-  @Override
-  public String getColumnName() {
-    return _thetaSketchAggregationFunction.getColumnName();
-  }
-
-  @Override
-  public String getResultColumnName() {
-    return _thetaSketchAggregationFunction.getResultColumnName();
-  }
-
-  @Override
-  public List<ExpressionContext> getInputExpressions() {
-    return _thetaSketchAggregationFunction.getInputExpressions();
-  }
-
   @Override
   public AggregationResultHolder createAggregationResultHolder() {
-    return _thetaSketchAggregationFunction.createAggregationResultHolder();
+    return new ObjectAggregationResultHolder();
   }
 
   @Override
   public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
-    return _thetaSketchAggregationFunction.createGroupByResultHolder(initialCapacity, maxCapacity);
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
   }
 
   @Override
   public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    _thetaSketchAggregationFunction.aggregate(length, aggregationResultHolder, blockValSetMap);
+    BlockValSet blockValSet = blockValSetMap.get(_expression);

Review comment:
       Why is the `raw` version not able to re-use the `non-raw` version of the code as in the previous implementation? 

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
##########
@@ -19,136 +19,144 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.datasketches.Util;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.AnotB;
 import org.apache.datasketches.theta.Intersection;
 import org.apache.datasketches.theta.SetOperationBuilder;
 import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.theta.UpdateSketchBuilder;
 import org.apache.pinot.common.function.AggregationFunctionType;
-import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.function.RawThetaSketchAggregationFunction.Parameters;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.FunctionContext;
 import org.apache.pinot.core.query.request.context.predicate.Predicate;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 
 
 /**
- * Implementation of {@link AggregationFunction} to perform the distinct count aggregation using
- * Theta Sketches.
- * <p>TODO: For performance concern, use {@code List<Sketch>} as the intermediate result.
+ * The {@code DistinctCountThetaSketchAggregationFunction} can be used in 2 modes:
+ * <ul>
+ *   <li>
+ *     Simple union without post-aggregation (1 or 2 arguments): main expression to aggregate on, optional theta-sketch
+ *     parameters
+ *     <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col)
+ *   </li>
+ *   <li>
+ *     Union with post-aggregation (at least 4 arguments): main expression to aggregate on, theta-sketch parameters,
+ *     filter(s), post-aggregation expression
+ *     <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col, '', 'dimName=''gender'' AND dimValue=''male''',
+ *     'dimName=''course'' AND dimValue=''math''', 'SET_INTERSECT($1,$2)')
+ *   </li>
+ * </ul>
+ * Currently there is only 1 parameter for the function:
+ * <ul>
+ *   <li>
+ *     nominalEntries: The nominal entries used to create the sketch. (Default 4096)
+ *   </li>
+ * </ul>
+ * <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col, 'nominalEntries=8192')
  */
-public class DistinctCountThetaSketchAggregationFunction implements AggregationFunction<Map<String, Sketch>, Long> {
-
-  public enum MergeFunction {
-    SET_UNION, SET_INTERSECT, SET_DIFF;
-
-    public static final ImmutableList<String> STRING_VALUES =
-        ImmutableList.of(SET_UNION.name(), SET_INTERSECT.name(), SET_DIFF.name());
-
-    public static final String CSV_VALUES = String.join(",", STRING_VALUES);
-
-    public static boolean isValid(String name) {
-      return SET_UNION.name().equalsIgnoreCase(name) || SET_INTERSECT.name().equalsIgnoreCase(name) || SET_DIFF.name()
-          .equalsIgnoreCase(name);
-    }
-  }
-
-  private static final Pattern ARGUMENT_SUBSTITUTION = Pattern.compile("\\$(\\d+)");
-
-  private final ExpressionContext _thetaSketchColumn;
-  private final SetOperationBuilder _setOperationBuilder;
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class DistinctCountThetaSketchAggregationFunction implements AggregationFunction<List<Sketch>, Long> {
+  private static final String SET_UNION = "SET_UNION";

Review comment:
       Use enum for set operations?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
##########
@@ -19,136 +19,144 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.datasketches.Util;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.AnotB;
 import org.apache.datasketches.theta.Intersection;
 import org.apache.datasketches.theta.SetOperationBuilder;
 import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.theta.UpdateSketchBuilder;
 import org.apache.pinot.common.function.AggregationFunctionType;
-import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.function.RawThetaSketchAggregationFunction.Parameters;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.FunctionContext;
 import org.apache.pinot.core.query.request.context.predicate.Predicate;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 
 
 /**
- * Implementation of {@link AggregationFunction} to perform the distinct count aggregation using
- * Theta Sketches.
- * <p>TODO: For performance concern, use {@code List<Sketch>} as the intermediate result.
+ * The {@code DistinctCountThetaSketchAggregationFunction} can be used in 2 modes:
+ * <ul>
+ *   <li>
+ *     Simple union without post-aggregation (1 or 2 arguments): main expression to aggregate on, optional theta-sketch
+ *     parameters
+ *     <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col)
+ *   </li>
+ *   <li>
+ *     Union with post-aggregation (at least 4 arguments): main expression to aggregate on, theta-sketch parameters,
+ *     filter(s), post-aggregation expression
+ *     <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col, '', 'dimName=''gender'' AND dimValue=''male''',
+ *     'dimName=''course'' AND dimValue=''math''', 'SET_INTERSECT($1,$2)')
+ *   </li>
+ * </ul>
+ * Currently there is only 1 parameter for the function:
+ * <ul>
+ *   <li>
+ *     nominalEntries: The nominal entries used to create the sketch. (Default 4096)
+ *   </li>
+ * </ul>
+ * <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col, 'nominalEntries=8192')
  */
-public class DistinctCountThetaSketchAggregationFunction implements AggregationFunction<Map<String, Sketch>, Long> {
-
-  public enum MergeFunction {
-    SET_UNION, SET_INTERSECT, SET_DIFF;
-
-    public static final ImmutableList<String> STRING_VALUES =
-        ImmutableList.of(SET_UNION.name(), SET_INTERSECT.name(), SET_DIFF.name());
-
-    public static final String CSV_VALUES = String.join(",", STRING_VALUES);
-
-    public static boolean isValid(String name) {
-      return SET_UNION.name().equalsIgnoreCase(name) || SET_INTERSECT.name().equalsIgnoreCase(name) || SET_DIFF.name()
-          .equalsIgnoreCase(name);
-    }
-  }
-
-  private static final Pattern ARGUMENT_SUBSTITUTION = Pattern.compile("\\$(\\d+)");
-
-  private final ExpressionContext _thetaSketchColumn;
-  private final SetOperationBuilder _setOperationBuilder;
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class DistinctCountThetaSketchAggregationFunction implements AggregationFunction<List<Sketch>, Long> {
+  private static final String SET_UNION = "SET_UNION";
+  private static final String SET_INTERSECT = "SET_INTERSECT";
+  private static final String SET_DIFF = "SET_DIFF";
+  private static final String DEFAULT_SKETCH_IDENTIFIER = "$0";
+  static final Sketch EMPTY_SKETCH = new UpdateSketchBuilder().build().compact();
+
+  private final ExpressionContext _mainExpression;
   private final List<ExpressionContext> _inputExpressions;
+  private final boolean _includeDefaultSketch;
+  private final List<FilterEvaluator> _filterEvaluators;
   private final ExpressionContext _postAggregationExpression;
-  private final List<Predicate> _predicates;
-  private final Map<Predicate, PredicateInfo> _predicateInfoMap;
+  private final UpdateSketchBuilder _updateSketchBuilder = new UpdateSketchBuilder();
+  private final SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
 
-  /**
-   * Constructor for the class.
-   * @param arguments List of parameters as arguments strings. At least three arguments are expected:
-   *                    <ul>
-   *                    <li> Required: First expression is interpreted as theta sketch column to aggregate on. </li>
-   *                    <li> Required: Second argument is the thetaSketchParams. </li>
-   *                    <li> Optional: Third to penultimate are predicates with LHS and RHS. </li>
-   *                    <li> Required: Last expression is the one that will be evaluated to compute final result. </li>
-   *                    </ul>
-   */
-  public DistinctCountThetaSketchAggregationFunction(List<ExpressionContext> arguments)
-      throws SqlParseException {
+  public DistinctCountThetaSketchAggregationFunction(List<ExpressionContext> arguments) {
     int numArguments = arguments.size();
+    _mainExpression = arguments.get(0);
+
+    // Initialize the UpdateSketchBuilder and SetOperationBuilder with the parameters
+    if (numArguments > 1) {
+      ExpressionContext paramsExpression = arguments.get(1);
+      Preconditions.checkArgument(paramsExpression.getType() == ExpressionContext.Type.LITERAL,
+          "Second argument of DISTINCT_COUNT_THETA_SKETCH aggregation function must be literal (parameters)");
+      Parameters parameters = new Parameters(paramsExpression.getLiteral());
+      int nominalEntries = parameters.getNominalEntries();
+      _updateSketchBuilder.setNominalEntries(nominalEntries);
+      _setOperationBuilder.setNominalEntries(nominalEntries);
+    }
 
-    // NOTE: This function expects at least 4 arguments: theta-sketch column, nominalEntries, predicate(s), post-aggregation expression.
-    Preconditions.checkArgument(numArguments > 3,
-        "DistinctCountThetaSketch expects at least four arguments (theta-sketch column, parameter(s), post-aggregation expression), got: ",
-        numArguments);
-
-    // Initialize the theta-sketch column
-    _thetaSketchColumn = arguments.get(0);
-    Preconditions.checkArgument(_thetaSketchColumn.getType() == ExpressionContext.Type.IDENTIFIER,
-        "First argument of DistinctCountThetaSketch must be identifier (theta-sketch column)");
-
-    // Initialize the theta-sketch parameters
-    ExpressionContext parametersExpression = arguments.get(1);
-    Preconditions.checkArgument(parametersExpression.getType() == ExpressionContext.Type.LITERAL,
-        "Second argument of DistinctCountThetaSketch must be literal (parameters)");
-    Parameters parameters = new Parameters(parametersExpression.getLiteral());
-
-    // Initialize the theta-sketch set operation builder
-    _setOperationBuilder = new SetOperationBuilder().setNominalEntries(parameters.getNominalEntries());
-
-    // Index of the original input predicates
-    // This list is zero indexed, whereas argument substitution is 1-indexed: index[0] = $1
-    _predicates = new ArrayList<>();
-
-    // Initialize the input expressions
-    // NOTE: It is expected to cover the theta-sketch column and the lhs of the predicates.
-    _inputExpressions = new ArrayList<>();
-    _inputExpressions.add(_thetaSketchColumn);
+    if (numArguments < 4) {
+      // Simple union without post-aggregation
 
-    // Initialize the post-aggregation expression
-    // NOTE: It is modeled as a filter
-    ExpressionContext postAggregationExpression = arguments.get(numArguments - 1);
-    Preconditions.checkArgument(parametersExpression.getType() == ExpressionContext.Type.LITERAL,
-        "Last argument of DistinctCountThetaSketch must be literal (post-aggregation expression)");
-    _postAggregationExpression = QueryContextConverterUtils
-        .getExpression(CalciteSqlParser.compileToExpression(postAggregationExpression.getLiteral()));
+      _inputExpressions = Collections.singletonList(_mainExpression);
+      _includeDefaultSketch = true;
+      _filterEvaluators = Collections.emptyList();
+      _postAggregationExpression = ExpressionContext.forIdentifier(DEFAULT_SKETCH_IDENTIFIER);
+    } else {
+      // Union with post-aggregation

Review comment:
       Just to ensure, we have test for this?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
##########
@@ -184,346 +192,1063 @@ public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int ma
   @Override
   public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    Map<Predicate, Union> unionMap = getUnionMap(aggregationResultHolder);
-
-    Sketch[] sketches = deserializeSketches(blockValSetMap.get(_thetaSketchColumn).getBytesValuesSV(), length);
-    for (PredicateInfo predicateInfo : _predicateInfoMap.values()) {
-      Predicate predicate = predicateInfo.getPredicate();
-      BlockValSet blockValSet = blockValSetMap.get(predicate.getLhs());
-      FieldSpec.DataType valueType = blockValSet.getValueType();
-      PredicateEvaluator predicateEvaluator = predicateInfo.getPredicateEvaluator(valueType);
-
-      Union union = unionMap.get(predicate);
-      switch (valueType) {
-        case INT:
-          int[] intValues = blockValSet.getIntValuesSV();
-          for (int i = 0; i < length; i++) {
-            if (predicateEvaluator.applySV(intValues[i])) {
-              union.update(sketches[i]);
+    int numExpressions = _inputExpressions.size();
+    boolean[] singleValues = new boolean[numExpressions];
+    DataType[] valueTypes = new DataType[numExpressions];
+    Object[] valueArrays = new Object[numExpressions];
+    extractValues(blockValSetMap, singleValues, valueTypes, valueArrays);
+    int numFilters = _filterEvaluators.size();
+
+    // Main expression is always index 0
+    if (valueTypes[0] != DataType.BYTES) {
+      List<UpdateSketch> updateSketches = getUpdateSketches(aggregationResultHolder);
+      if (singleValues[0]) {

Review comment:
       Are these if/else blocks re-factorable?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6004: Enhance DistinctCountThetaSketchAggregationFunction

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #6004:
URL: https://github.com/apache/incubator-pinot/pull/6004#discussion_r496922926



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
##########
@@ -776,6 +783,69 @@ public IdSet deserialize(ByteBuffer byteBuffer) {
     }
   };
 
+  public static final ObjectSerDe<List<Object>> LIST_SER_DE = new ObjectSerDe<List<Object>>() {
+
+    @Override
+    public byte[] serialize(List<Object> list) {
+      int size = list.size();
+
+      // Directly return the size (0) for empty list
+      if (size == 0) {
+        return new byte[Integer.BYTES];

Review comment:
       It is safer to not reuse this as we have no control on not modifying it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6004: Enhance DistinctCountThetaSketchAggregationFunction

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #6004:
URL: https://github.com/apache/incubator-pinot/pull/6004#discussion_r496923525



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
##########
@@ -776,6 +783,69 @@ public IdSet deserialize(ByteBuffer byteBuffer) {
     }
   };
 
+  public static final ObjectSerDe<List<Object>> LIST_SER_DE = new ObjectSerDe<List<Object>>() {
+
+    @Override
+    public byte[] serialize(List<Object> list) {
+      int size = list.size();
+
+      // Directly return the size (0) for empty list
+      if (size == 0) {
+        return new byte[Integer.BYTES];
+      }
+
+      // No need to close these 2 streams

Review comment:
       Added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6004: Enhance DistinctCountThetaSketchAggregationFunction

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #6004:
URL: https://github.com/apache/incubator-pinot/pull/6004#discussion_r496927468



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
##########
@@ -184,346 +192,1063 @@ public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int ma
   @Override
   public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    Map<Predicate, Union> unionMap = getUnionMap(aggregationResultHolder);
-
-    Sketch[] sketches = deserializeSketches(blockValSetMap.get(_thetaSketchColumn).getBytesValuesSV(), length);
-    for (PredicateInfo predicateInfo : _predicateInfoMap.values()) {
-      Predicate predicate = predicateInfo.getPredicate();
-      BlockValSet blockValSet = blockValSetMap.get(predicate.getLhs());
-      FieldSpec.DataType valueType = blockValSet.getValueType();
-      PredicateEvaluator predicateEvaluator = predicateInfo.getPredicateEvaluator(valueType);
-
-      Union union = unionMap.get(predicate);
-      switch (valueType) {
-        case INT:
-          int[] intValues = blockValSet.getIntValuesSV();
-          for (int i = 0; i < length; i++) {
-            if (predicateEvaluator.applySV(intValues[i])) {
-              union.update(sketches[i]);
+    int numExpressions = _inputExpressions.size();
+    boolean[] singleValues = new boolean[numExpressions];
+    DataType[] valueTypes = new DataType[numExpressions];
+    Object[] valueArrays = new Object[numExpressions];
+    extractValues(blockValSetMap, singleValues, valueTypes, valueArrays);
+    int numFilters = _filterEvaluators.size();
+
+    // Main expression is always index 0
+    if (valueTypes[0] != DataType.BYTES) {
+      List<UpdateSketch> updateSketches = getUpdateSketches(aggregationResultHolder);
+      if (singleValues[0]) {

Review comment:
       I tried and don't see a better way to organize the code. We need to have slightly different logic for each primitive type




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-commenter commented on pull request #6004: Enhance DistinctCountThetaSketchAggregationFunction

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #6004:
URL: https://github.com/apache/incubator-pinot/pull/6004#issuecomment-700934315


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6004?src=pr&el=h1) Report
   > Merging [#6004](https://codecov.io/gh/apache/incubator-pinot/pull/6004?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) will **decrease** coverage by `2.39%`.
   > The diff coverage is `37.52%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6004/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6004?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #6004      +/-   ##
   ==========================================
   - Coverage   66.44%   64.05%   -2.40%     
   ==========================================
     Files        1075     1211     +136     
     Lines       54773    56796    +2023     
     Branches     8168     8338     +170     
   ==========================================
   - Hits        36396    36380      -16     
   - Misses      15700    17774    +2074     
   + Partials     2677     2642      -35     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | #unittests | `64.05% <37.52%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6004?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...e/pinot/broker/api/resources/PinotBrokerDebug.java](https://codecov.io/gh/apache/incubator-pinot/pull/6004/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYXBpL3Jlc291cmNlcy9QaW5vdEJyb2tlckRlYnVnLmphdmE=) | `0.00% <0.00%> (-79.32%)` | :arrow_down: |
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6004/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `71.42% <ø> (-28.58%)` | :arrow_down: |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6004/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `33.96% <0.00%> (-32.71%)` | :arrow_down: |
   | [...ava/org/apache/pinot/client/AbstractResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6004/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Fic3RyYWN0UmVzdWx0U2V0LmphdmE=) | `53.33% <0.00%> (-3.81%)` | :arrow_down: |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/6004/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `35.55% <0.00%> (-13.29%)` | :arrow_down: |
   | [...inot/client/JsonAsyncHttpPinotClientTransport.java](https://codecov.io/gh/apache/incubator-pinot/pull/6004/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0pzb25Bc3luY0h0dHBQaW5vdENsaWVudFRyYW5zcG9ydC5qYXZh) | `10.90% <0.00%> (-51.10%)` | :arrow_down: |
   | [.../org/apache/pinot/client/ResultTableResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6004/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L1Jlc3VsdFRhYmxlUmVzdWx0U2V0LmphdmE=) | `0.00% <0.00%> (-34.29%)` | :arrow_down: |
   | [...not/common/assignment/InstancePartitionsUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6004/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vYXNzaWdubWVudC9JbnN0YW5jZVBhcnRpdGlvbnNVdGlscy5qYXZh) | `73.80% <ø> (+0.63%)` | :arrow_up: |
   | [.../apache/pinot/common/exception/QueryException.java](https://codecov.io/gh/apache/incubator-pinot/pull/6004/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZXhjZXB0aW9uL1F1ZXJ5RXhjZXB0aW9uLmphdmE=) | `90.27% <ø> (+5.55%)` | :arrow_up: |
   | [...pinot/common/function/AggregationFunctionType.java](https://codecov.io/gh/apache/incubator-pinot/pull/6004/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZnVuY3Rpb24vQWdncmVnYXRpb25GdW5jdGlvblR5cGUuamF2YQ==) | `96.49% <ø> (-3.51%)` | :arrow_down: |
   | ... and [1015 more](https://codecov.io/gh/apache/incubator-pinot/pull/6004/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6004?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6004?src=pr&el=footer). Last update [2379791...297874d](https://codecov.io/gh/apache/incubator-pinot/pull/6004?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mayankshriv commented on pull request #6004: Enhance DistinctCountThetaSketchAggregationFunction

Posted by GitBox <gi...@apache.org>.
mayankshriv commented on pull request #6004:
URL: https://github.com/apache/incubator-pinot/pull/6004#issuecomment-700811748


   > What's the motivation to add a new aggregation function as opposed to enhancing the existing one? Is there a backward compatibility issue? If not, it adds more confusion on the user side to have multiple variations of the same aggregation functions.
   
   We discussed offline and did some performance benchmark to ensure there is no regression. Based on the testing/benchmarking results, we decided to move the code under existing function instead of creating a new one.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6004: Enhance DistinctCountThetaSketchAggregationFunction

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #6004:
URL: https://github.com/apache/incubator-pinot/pull/6004#discussion_r496926008



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
##########
@@ -19,136 +19,144 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.datasketches.Util;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.AnotB;
 import org.apache.datasketches.theta.Intersection;
 import org.apache.datasketches.theta.SetOperationBuilder;
 import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.theta.UpdateSketchBuilder;
 import org.apache.pinot.common.function.AggregationFunctionType;
-import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.function.RawThetaSketchAggregationFunction.Parameters;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.FunctionContext;
 import org.apache.pinot.core.query.request.context.predicate.Predicate;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 
 
 /**
- * Implementation of {@link AggregationFunction} to perform the distinct count aggregation using
- * Theta Sketches.
- * <p>TODO: For performance concern, use {@code List<Sketch>} as the intermediate result.
+ * The {@code DistinctCountThetaSketchAggregationFunction} can be used in 2 modes:
+ * <ul>
+ *   <li>
+ *     Simple union without post-aggregation (1 or 2 arguments): main expression to aggregate on, optional theta-sketch
+ *     parameters
+ *     <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col)
+ *   </li>
+ *   <li>
+ *     Union with post-aggregation (at least 4 arguments): main expression to aggregate on, theta-sketch parameters,
+ *     filter(s), post-aggregation expression
+ *     <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col, '', 'dimName=''gender'' AND dimValue=''male''',
+ *     'dimName=''course'' AND dimValue=''math''', 'SET_INTERSECT($1,$2)')
+ *   </li>
+ * </ul>
+ * Currently there is only 1 parameter for the function:
+ * <ul>
+ *   <li>
+ *     nominalEntries: The nominal entries used to create the sketch. (Default 4096)
+ *   </li>
+ * </ul>
+ * <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col, 'nominalEntries=8192')
  */
-public class DistinctCountThetaSketchAggregationFunction implements AggregationFunction<Map<String, Sketch>, Long> {
-
-  public enum MergeFunction {
-    SET_UNION, SET_INTERSECT, SET_DIFF;
-
-    public static final ImmutableList<String> STRING_VALUES =
-        ImmutableList.of(SET_UNION.name(), SET_INTERSECT.name(), SET_DIFF.name());
-
-    public static final String CSV_VALUES = String.join(",", STRING_VALUES);
-
-    public static boolean isValid(String name) {
-      return SET_UNION.name().equalsIgnoreCase(name) || SET_INTERSECT.name().equalsIgnoreCase(name) || SET_DIFF.name()
-          .equalsIgnoreCase(name);
-    }
-  }
-
-  private static final Pattern ARGUMENT_SUBSTITUTION = Pattern.compile("\\$(\\d+)");
-
-  private final ExpressionContext _thetaSketchColumn;
-  private final SetOperationBuilder _setOperationBuilder;
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class DistinctCountThetaSketchAggregationFunction implements AggregationFunction<List<Sketch>, Long> {
+  private static final String SET_UNION = "SET_UNION";
+  private static final String SET_INTERSECT = "SET_INTERSECT";
+  private static final String SET_DIFF = "SET_DIFF";
+  private static final String DEFAULT_SKETCH_IDENTIFIER = "$0";
+  static final Sketch EMPTY_SKETCH = new UpdateSketchBuilder().build().compact();
+
+  private final ExpressionContext _mainExpression;
   private final List<ExpressionContext> _inputExpressions;
+  private final boolean _includeDefaultSketch;
+  private final List<FilterEvaluator> _filterEvaluators;
   private final ExpressionContext _postAggregationExpression;
-  private final List<Predicate> _predicates;
-  private final Map<Predicate, PredicateInfo> _predicateInfoMap;
+  private final UpdateSketchBuilder _updateSketchBuilder = new UpdateSketchBuilder();
+  private final SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
 
-  /**
-   * Constructor for the class.
-   * @param arguments List of parameters as arguments strings. At least three arguments are expected:
-   *                    <ul>
-   *                    <li> Required: First expression is interpreted as theta sketch column to aggregate on. </li>
-   *                    <li> Required: Second argument is the thetaSketchParams. </li>
-   *                    <li> Optional: Third to penultimate are predicates with LHS and RHS. </li>
-   *                    <li> Required: Last expression is the one that will be evaluated to compute final result. </li>
-   *                    </ul>
-   */
-  public DistinctCountThetaSketchAggregationFunction(List<ExpressionContext> arguments)
-      throws SqlParseException {
+  public DistinctCountThetaSketchAggregationFunction(List<ExpressionContext> arguments) {
     int numArguments = arguments.size();
+    _mainExpression = arguments.get(0);
+
+    // Initialize the UpdateSketchBuilder and SetOperationBuilder with the parameters
+    if (numArguments > 1) {
+      ExpressionContext paramsExpression = arguments.get(1);
+      Preconditions.checkArgument(paramsExpression.getType() == ExpressionContext.Type.LITERAL,
+          "Second argument of DISTINCT_COUNT_THETA_SKETCH aggregation function must be literal (parameters)");
+      Parameters parameters = new Parameters(paramsExpression.getLiteral());
+      int nominalEntries = parameters.getNominalEntries();
+      _updateSketchBuilder.setNominalEntries(nominalEntries);
+      _setOperationBuilder.setNominalEntries(nominalEntries);
+    }
 
-    // NOTE: This function expects at least 4 arguments: theta-sketch column, nominalEntries, predicate(s), post-aggregation expression.
-    Preconditions.checkArgument(numArguments > 3,
-        "DistinctCountThetaSketch expects at least four arguments (theta-sketch column, parameter(s), post-aggregation expression), got: ",
-        numArguments);
-
-    // Initialize the theta-sketch column
-    _thetaSketchColumn = arguments.get(0);
-    Preconditions.checkArgument(_thetaSketchColumn.getType() == ExpressionContext.Type.IDENTIFIER,
-        "First argument of DistinctCountThetaSketch must be identifier (theta-sketch column)");
-
-    // Initialize the theta-sketch parameters
-    ExpressionContext parametersExpression = arguments.get(1);
-    Preconditions.checkArgument(parametersExpression.getType() == ExpressionContext.Type.LITERAL,
-        "Second argument of DistinctCountThetaSketch must be literal (parameters)");
-    Parameters parameters = new Parameters(parametersExpression.getLiteral());
-
-    // Initialize the theta-sketch set operation builder
-    _setOperationBuilder = new SetOperationBuilder().setNominalEntries(parameters.getNominalEntries());
-
-    // Index of the original input predicates
-    // This list is zero indexed, whereas argument substitution is 1-indexed: index[0] = $1
-    _predicates = new ArrayList<>();
-
-    // Initialize the input expressions
-    // NOTE: It is expected to cover the theta-sketch column and the lhs of the predicates.
-    _inputExpressions = new ArrayList<>();
-    _inputExpressions.add(_thetaSketchColumn);
+    if (numArguments < 4) {
+      // Simple union without post-aggregation
 
-    // Initialize the post-aggregation expression
-    // NOTE: It is modeled as a filter
-    ExpressionContext postAggregationExpression = arguments.get(numArguments - 1);
-    Preconditions.checkArgument(parametersExpression.getType() == ExpressionContext.Type.LITERAL,
-        "Last argument of DistinctCountThetaSketch must be literal (post-aggregation expression)");
-    _postAggregationExpression = QueryContextConverterUtils
-        .getExpression(CalciteSqlParser.compileToExpression(postAggregationExpression.getLiteral()));
+      _inputExpressions = Collections.singletonList(_mainExpression);
+      _includeDefaultSketch = true;
+      _filterEvaluators = Collections.emptyList();
+      _postAggregationExpression = ExpressionContext.forIdentifier(DEFAULT_SKETCH_IDENTIFIER);
+    } else {
+      // Union with post-aggregation

Review comment:
       Yes, we have unit test and integration test for this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6004: Enhance DistinctCountThetaSketchAggregationFunction

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #6004:
URL: https://github.com/apache/incubator-pinot/pull/6004#discussion_r496925731



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
##########
@@ -19,136 +19,144 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.datasketches.Util;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.AnotB;
 import org.apache.datasketches.theta.Intersection;
 import org.apache.datasketches.theta.SetOperationBuilder;
 import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.theta.UpdateSketchBuilder;
 import org.apache.pinot.common.function.AggregationFunctionType;
-import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.function.RawThetaSketchAggregationFunction.Parameters;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.FunctionContext;
 import org.apache.pinot.core.query.request.context.predicate.Predicate;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 
 
 /**
- * Implementation of {@link AggregationFunction} to perform the distinct count aggregation using
- * Theta Sketches.
- * <p>TODO: For performance concern, use {@code List<Sketch>} as the intermediate result.
+ * The {@code DistinctCountThetaSketchAggregationFunction} can be used in 2 modes:
+ * <ul>
+ *   <li>
+ *     Simple union without post-aggregation (1 or 2 arguments): main expression to aggregate on, optional theta-sketch
+ *     parameters
+ *     <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col)
+ *   </li>
+ *   <li>
+ *     Union with post-aggregation (at least 4 arguments): main expression to aggregate on, theta-sketch parameters,
+ *     filter(s), post-aggregation expression
+ *     <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col, '', 'dimName=''gender'' AND dimValue=''male''',
+ *     'dimName=''course'' AND dimValue=''math''', 'SET_INTERSECT($1,$2)')
+ *   </li>
+ * </ul>
+ * Currently there is only 1 parameter for the function:
+ * <ul>
+ *   <li>
+ *     nominalEntries: The nominal entries used to create the sketch. (Default 4096)
+ *   </li>
+ * </ul>
+ * <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col, 'nominalEntries=8192')
  */
-public class DistinctCountThetaSketchAggregationFunction implements AggregationFunction<Map<String, Sketch>, Long> {
-
-  public enum MergeFunction {
-    SET_UNION, SET_INTERSECT, SET_DIFF;
-
-    public static final ImmutableList<String> STRING_VALUES =
-        ImmutableList.of(SET_UNION.name(), SET_INTERSECT.name(), SET_DIFF.name());
-
-    public static final String CSV_VALUES = String.join(",", STRING_VALUES);
-
-    public static boolean isValid(String name) {
-      return SET_UNION.name().equalsIgnoreCase(name) || SET_INTERSECT.name().equalsIgnoreCase(name) || SET_DIFF.name()
-          .equalsIgnoreCase(name);
-    }
-  }
-
-  private static final Pattern ARGUMENT_SUBSTITUTION = Pattern.compile("\\$(\\d+)");
-
-  private final ExpressionContext _thetaSketchColumn;
-  private final SetOperationBuilder _setOperationBuilder;
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class DistinctCountThetaSketchAggregationFunction implements AggregationFunction<List<Sketch>, Long> {
+  private static final String SET_UNION = "SET_UNION";

Review comment:
       Using string has these 2 benefits over enum:
   - Save the extra parsing of the enum
   - Simplify the handling of invalid operations




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang merged pull request #6004: Enhance DistinctCountThetaSketchAggregationFunction

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang merged pull request #6004:
URL: https://github.com/apache/incubator-pinot/pull/6004


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org