You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/09/03 22:33:38 UTC

[incubator-pinot] branch master updated: Add RawThetaSketchAggregationFunction (#5970)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a21ec8a  Add RawThetaSketchAggregationFunction (#5970)
a21ec8a is described below

commit a21ec8ac47a018efdbadf06160e2e6d46c8f7287
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Sep 3 15:33:25 2020 -0700

    Add RawThetaSketchAggregationFunction (#5970)
    
    Introduce `RawThetaSketchAggregationFunction` which collects the values for a given expression (can be single-valued or multi-valued) into a Sketch object, and returns the sketch as a base64 encoded string. It treats `BYTES` expression as serialized sketches.
    It takes an optional second argument as the parameters for the function, e.g. `RAW_THETA_SKETCH(col, 'nominalEntries=8192')`.
---
 .../common/function/AggregationFunctionType.java   |   1 +
 .../core/query/aggregation/ThetaSketchParams.java  |  66 ---
 .../function/AggregationFunctionFactory.java       |   2 +
 ...istinctCountThetaSketchAggregationFunction.java |  60 +--
 .../RawThetaSketchAggregationFunction.java         | 563 +++++++++++++++++++++
 .../BrokerRequestToQueryContextConverter.java      |   7 +-
 .../queries/DistinctCountThetaSketchTest.java      | 226 ++++-----
 7 files changed, 685 insertions(+), 240 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
index 2f3085c..5f96e53 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
@@ -43,6 +43,7 @@ public enum AggregationFunctionType {
   PERCENTILE("percentile"),
   PERCENTILEEST("percentileEst"),
   PERCENTILETDIGEST("percentileTDigest"),
+  RAWTHETASKETCH("rawThetaSketch"),
   IDSET("idSet"),
 
   // Geo aggregation functions
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/ThetaSketchParams.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/ThetaSketchParams.java
deleted file mode 100644
index c9c1f03..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/ThetaSketchParams.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.aggregation;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.pinot.spi.utils.JsonUtils;
-
-
-/**
- * Class to hold Theta Sketch Params, and is Json serializable.
- */
-@SuppressWarnings("unused")
-public class ThetaSketchParams {
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-  private static final String PARAMS_DELIMITER = ";";
-  private static final String PARAM_KEY_VALUE_SEPARATOR = "=";
-
-  @JsonProperty("nominalEntries")
-  private int _nominalEntries;
-
-  public int getNominalEntries() {
-    return _nominalEntries;
-  }
-
-  public void setNominalEntries(int nominalEntries) {
-    _nominalEntries = nominalEntries;
-  }
-
-  /**
-   * Creates a ThetaSketchParams object from the specified string. The specified string is
-   * expected to be of form: "key1 = value1; key2 = value2.."
-   * @param paramsString Param in string form
-   * @return Param object, null if string is null or empty
-   */
-  public static ThetaSketchParams fromString(String paramsString) {
-    if (paramsString == null || paramsString.isEmpty()) {
-      return null;
-    }
-
-    ObjectNode objectNode = JsonUtils.newObjectNode();
-    for (String pair : paramsString.split(PARAMS_DELIMITER)) {
-      String[] keyValue = pair.split(PARAM_KEY_VALUE_SEPARATOR);
-      objectNode.put(keyValue[0].replaceAll("\\s", ""), keyValue[1].replaceAll("\\s", ""));
-    }
-
-    return OBJECT_MAPPER.convertValue(objectNode, ThetaSketchParams.class);
-  }
-}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index d74db3e..f0a7d8e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -137,6 +137,8 @@ public class AggregationFunctionFactory {
             return new DistinctCountThetaSketchAggregationFunction(arguments);
           case DISTINCTCOUNTRAWTHETASKETCH:
             return new DistinctCountRawThetaSketchAggregationFunction(arguments);
+          case RAWTHETASKETCH:
+            return new RawThetaSketchAggregationFunction(arguments);
           case IDSET:
             return new IdSetAggregationFunction(arguments);
           case COUNTMV:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
index 42b8eaf..049bead 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
@@ -32,7 +32,6 @@ import org.apache.commons.collections.MapUtils;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.AnotB;
 import org.apache.datasketches.theta.Intersection;
-import org.apache.datasketches.theta.SetOperation;
 import org.apache.datasketches.theta.SetOperationBuilder;
 import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
@@ -43,7 +42,7 @@ import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.ThetaSketchParams;
+import org.apache.pinot.core.query.aggregation.function.RawThetaSketchAggregationFunction.Parameters;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
@@ -70,16 +69,14 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
     public static final String CSV_VALUES = String.join(",", STRING_VALUES);
 
     public static boolean isValid(String name) {
-      return SET_UNION.name().equalsIgnoreCase(name)
-          || SET_INTERSECT.name().equalsIgnoreCase(name)
-          || SET_DIFF.name().equalsIgnoreCase(name);
+      return SET_UNION.name().equalsIgnoreCase(name) || SET_INTERSECT.name().equalsIgnoreCase(name) || SET_DIFF.name()
+          .equalsIgnoreCase(name);
     }
   }
 
   private static final Pattern ARGUMENT_SUBSTITUTION = Pattern.compile("\\$(\\d+)");
 
   private final ExpressionContext _thetaSketchColumn;
-  private final ThetaSketchParams _thetaSketchParams;
   private final SetOperationBuilder _setOperationBuilder;
   private final List<ExpressionContext> _inputExpressions;
   private final ExpressionContext _postAggregationExpression;
@@ -111,13 +108,13 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
         "First argument of DistinctCountThetaSketch must be identifier (theta-sketch column)");
 
     // Initialize the theta-sketch parameters
-    ExpressionContext paramsExpression = arguments.get(1);
-    Preconditions.checkArgument(paramsExpression.getType() == ExpressionContext.Type.LITERAL,
+    ExpressionContext parametersExpression = arguments.get(1);
+    Preconditions.checkArgument(parametersExpression.getType() == ExpressionContext.Type.LITERAL,
         "Second argument of DistinctCountThetaSketch must be literal (parameters)");
-    _thetaSketchParams = ThetaSketchParams.fromString(paramsExpression.getLiteral());
+    Parameters parameters = new Parameters(parametersExpression.getLiteral());
 
     // Initialize the theta-sketch set operation builder
-    _setOperationBuilder = getSetOperationBuilder();
+    _setOperationBuilder = new SetOperationBuilder().setNominalEntries(parameters.getNominalEntries());
 
     // Index of the original input predicates
     // This list is zero indexed, whereas argument substitution is 1-indexed: index[0] = $1
@@ -131,7 +128,7 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
     // Initialize the post-aggregation expression
     // NOTE: It is modeled as a filter
     ExpressionContext postAggregationExpression = arguments.get(numArguments - 1);
-    Preconditions.checkArgument(paramsExpression.getType() == ExpressionContext.Type.LITERAL,
+    Preconditions.checkArgument(parametersExpression.getType() == ExpressionContext.Type.LITERAL,
         "Last argument of DistinctCountThetaSketch must be literal (post-aggregation expression)");
     _postAggregationExpression = QueryContextConverterUtils
         .getExpression(CalciteSqlParser.compileToExpression(postAggregationExpression.getLiteral()));
@@ -448,7 +445,7 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
       Sketch sketch = intermediateResult2.get(predicate);
       if (sketch != null) {
         // Merge the overlapping ones
-        Union union = getSetOperationBuilder().buildUnion();
+        Union union = _setOperationBuilder.buildUnion();
         union.update(entry.getValue());
         union.update(sketch);
         mergedResult.put(predicate, union.getResult());
@@ -485,7 +482,7 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
   @Override
   public Long extractFinalResult(Map<String, Sketch> intermediateResult) {
     Sketch finalSketch = extractFinalSketch(intermediateResult);
-    return Math.round(finalSketch.getEstimate());
+    return finalSketch != null ? Math.round(finalSketch.getEstimate()) : 0;
   }
 
   private Predicate getPredicate(String predicateString) {
@@ -544,25 +541,22 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
    * @param sketchMap Precomputed sketches for predicates that are part of the expression.
    * @return Overall evaluated sketch for the expression.
    */
-  private Sketch evalPostAggregationExpression(
-      ExpressionContext postAggregationExpression,
+  private Sketch evalPostAggregationExpression(ExpressionContext postAggregationExpression,
       Map<Predicate, Sketch> sketchMap) {
     if (postAggregationExpression.getType() == ExpressionContext.Type.LITERAL) {
       throw new IllegalArgumentException("Literal not supported in post-aggregation function");
     }
 
     if (postAggregationExpression.getType() == ExpressionContext.Type.IDENTIFIER) {
-      final Predicate exp =
-          _predicates.get(extractSubstitutionPosition(postAggregationExpression.getLiteral()) - 1);
+      final Predicate exp = _predicates.get(extractSubstitutionPosition(postAggregationExpression.getLiteral()) - 1);
       return sketchMap.get(exp);
     }
 
     // shouldn't throw exception because of the validation in the constructor
-    MergeFunction func =
-        MergeFunction.valueOf(postAggregationExpression.getFunction().getFunctionName().toUpperCase());
+    MergeFunction func = MergeFunction.valueOf(postAggregationExpression.getFunction().getFunctionName().toUpperCase());
 
     // handle functions recursively
-    switch(func) {
+    switch (func) {
       case SET_UNION:
         Union union = _setOperationBuilder.buildUnion();
         for (ExpressionContext exp : postAggregationExpression.getFunction().getArguments()) {
@@ -583,10 +577,8 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
         diff.update(a, b);
         return diff.getResult();
       default:
-        throw new IllegalStateException(
-            String.format(
-                "Invalid post-aggregation function: %s",
-                postAggregationExpression.getFunction().getFunctionName().toUpperCase()));
+        throw new IllegalStateException(String.format("Invalid post-aggregation function: %s",
+            postAggregationExpression.getFunction().getFunctionName().toUpperCase()));
     }
   }
 
@@ -606,16 +598,6 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
   }
 
   /**
-   * Returns the theta-sketch SetOperation builder properly configured.
-   * Currently, only setting of nominalEntries is supported.
-   * @return SetOperationBuilder
-   */
-  private SetOperationBuilder getSetOperationBuilder() {
-    return _thetaSketchParams == null ? SetOperation.builder()
-        : SetOperation.builder().setNominalEntries(_thetaSketchParams.getNominalEntries());
-  }
-
-  /**
    * Validates that the function context's substitution parameters ($1, $2, etc) does not exceed the number
    * of predicates passed into the post-aggregation function.
    *
@@ -636,19 +618,19 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
 
     if (context.getType() == ExpressionContext.Type.IDENTIFIER) {
       int id = extractSubstitutionPosition(context.getIdentifier());
-      if (id <= 0)
+      if (id <= 0) {
         throw new IllegalArgumentException("Argument substitution starts at $1");
-      if (id > numPredicates)
+      }
+      if (id > numPredicates) {
         throw new IllegalArgumentException("Argument substitution exceeded number of predicates");
+      }
       // if none of the invalid conditions are met above, exit out early
       return;
     }
 
     if (!MergeFunction.isValid(context.getFunction().getFunctionName())) {
       throw new IllegalArgumentException(
-          String.format(
-              "Invalid Theta Sketch aggregation function. Allowed: [%s]",
-              MergeFunction.CSV_VALUES));
+          String.format("Invalid Theta Sketch aggregation function. Allowed: [%s]", MergeFunction.CSV_VALUES));
     }
 
     switch (MergeFunction.valueOf(context.getFunction().getFunctionName().toUpperCase())) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/RawThetaSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/RawThetaSketchAggregationFunction.java
new file mode 100644
index 0000000..94c952d
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/RawThetaSketchAggregationFunction.java
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.datasketches.Util;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.theta.UpdateSketchBuilder;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * The {@code RawThetaSketchAggregationFunction} collects the values for a given expression (can be single-valued or
+ * multi-valued) into a {@link Sketch} object, and returns the sketch as a base64 encoded string. It treats BYTES
+ * expression as serialized sketches.
+ * <p>The function takes an optional second argument as the parameters for the function. Currently there is only 1
+ * parameter for the function:
+ * <ul>
+ *   <li>
+ *     nominalEntries: The nominal entries used to create the sketch. (Default 4096)
+ *   </li>
+ * </ul>
+ * <p>Example: RAW_THETA_SKETCH(col, 'nominalEntries=8192')
+ */
+public class RawThetaSketchAggregationFunction extends BaseSingleInputAggregationFunction<Sketch, String> {
+  private final UpdateSketchBuilder _updateSketchBuilder = new UpdateSketchBuilder();
+  private final SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
+
+  public RawThetaSketchAggregationFunction(List<ExpressionContext> arguments) {
+    super(arguments.get(0));
+
+    // Optional second argument for theta-sketch parameters
+    if (arguments.size() > 1) {
+      ExpressionContext paramsExpression = arguments.get(1);
+      Preconditions.checkArgument(paramsExpression.getType() == ExpressionContext.Type.LITERAL,
+          "Second argument of RAW_THETA_SKETCH aggregation function must be literal (parameters)");
+      Parameters parameters = new Parameters(paramsExpression.getLiteral());
+      int nominalEntries = parameters.getNominalEntries();
+      _updateSketchBuilder.setNominalEntries(nominalEntries);
+      _setOperationBuilder.setNominalEntries(nominalEntries);
+    }
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.RAWTHETASKETCH;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    FieldSpec.DataType valueType = blockValSet.getValueType();
+
+    if (valueType != FieldSpec.DataType.BYTES) {
+      UpdateSketch updateSketch = getUpdateSketch(aggregationResultHolder);
+      if (blockValSet.isSingleValue()) {
+        switch (valueType) {
+          case INT:
+            int[] intValues = blockValSet.getIntValuesSV();
+            for (int i = 0; i < length; i++) {
+              updateSketch.update(intValues[i]);
+            }
+            break;
+          case LONG:
+            long[] longValues = blockValSet.getLongValuesSV();
+            for (int i = 0; i < length; i++) {
+              updateSketch.update(longValues[i]);
+            }
+            break;
+          case FLOAT:
+            float[] floatValues = blockValSet.getFloatValuesSV();
+            for (int i = 0; i < length; i++) {
+              updateSketch.update(floatValues[i]);
+            }
+            break;
+          case DOUBLE:
+            double[] doubleValues = blockValSet.getDoubleValuesSV();
+            for (int i = 0; i < length; i++) {
+              updateSketch.update(doubleValues[i]);
+            }
+            break;
+          case STRING:
+            String[] stringValues = blockValSet.getStringValuesSV();
+            for (int i = 0; i < length; i++) {
+              updateSketch.update(stringValues[i]);
+            }
+            break;
+          default:
+            throw new IllegalStateException(
+                "Illegal single-value data type for RAW_THETA_SKETCH aggregation function: " + valueType);
+        }
+      } else {
+        switch (valueType) {
+          case INT:
+            int[][] intValues = blockValSet.getIntValuesMV();
+            for (int i = 0; i < length; i++) {
+              for (int value : intValues[i]) {
+                updateSketch.update(value);
+              }
+            }
+            break;
+          case LONG:
+            long[][] longValues = blockValSet.getLongValuesMV();
+            for (int i = 0; i < length; i++) {
+              for (long value : longValues[i]) {
+                updateSketch.update(value);
+              }
+            }
+            break;
+          case FLOAT:
+            float[][] floatValues = blockValSet.getFloatValuesMV();
+            for (int i = 0; i < length; i++) {
+              for (float value : floatValues[i]) {
+                updateSketch.update(value);
+              }
+            }
+            break;
+          case DOUBLE:
+            double[][] doubleValues = blockValSet.getDoubleValuesMV();
+            for (int i = 0; i < length; i++) {
+              for (double value : doubleValues[i]) {
+                updateSketch.update(value);
+              }
+            }
+            break;
+          case STRING:
+            String[][] stringValues = blockValSet.getStringValuesMV();
+            for (int i = 0; i < length; i++) {
+              for (String value : stringValues[i]) {
+                updateSketch.update(value);
+              }
+            }
+            break;
+          default:
+            throw new IllegalStateException(
+                "Illegal multi-value data type for RAW_THETA_SKETCH aggregation function: " + valueType);
+        }
+      }
+    } else {
+      // Serialized sketch
+      Union union = getUnion(aggregationResultHolder);
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        union.update(Sketch.wrap(Memory.wrap(bytesValues[i])));
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    FieldSpec.DataType valueType = blockValSet.getValueType();
+
+    if (blockValSet.isSingleValue()) {
+      switch (valueType) {
+        case INT:
+          int[] intValues = blockValSet.getIntValuesSV();
+          for (int i = 0; i < length; i++) {
+            getUpdateSketch(groupByResultHolder, groupKeyArray[i]).update(intValues[i]);
+          }
+          break;
+        case LONG:
+          long[] longValues = blockValSet.getLongValuesSV();
+          for (int i = 0; i < length; i++) {
+            getUpdateSketch(groupByResultHolder, groupKeyArray[i]).update(longValues[i]);
+          }
+          break;
+        case FLOAT:
+          float[] floatValues = blockValSet.getFloatValuesSV();
+          for (int i = 0; i < length; i++) {
+            getUpdateSketch(groupByResultHolder, groupKeyArray[i]).update(floatValues[i]);
+          }
+          break;
+        case DOUBLE:
+          double[] doubleValues = blockValSet.getDoubleValuesSV();
+          for (int i = 0; i < length; i++) {
+            getUpdateSketch(groupByResultHolder, groupKeyArray[i]).update(doubleValues[i]);
+          }
+          break;
+        case STRING:
+          String[] stringValues = blockValSet.getStringValuesSV();
+          for (int i = 0; i < length; i++) {
+            getUpdateSketch(groupByResultHolder, groupKeyArray[i]).update(stringValues[i]);
+          }
+          break;
+        case BYTES:
+          // Serialized sketch
+          byte[][] bytesValues = blockValSet.getBytesValuesSV();
+          for (int i = 0; i < length; i++) {
+            getUnion(groupByResultHolder, groupKeyArray[i]).update(Sketch.wrap(Memory.wrap(bytesValues[i])));
+          }
+          break;
+        default:
+          throw new IllegalStateException(
+              "Illegal single-value data type for RAW_THETA_SKETCH aggregation function: " + valueType);
+      }
+    } else {
+      switch (valueType) {
+        case INT:
+          int[][] intValues = blockValSet.getIntValuesMV();
+          for (int i = 0; i < length; i++) {
+            UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKeyArray[i]);
+            for (int value : intValues[i]) {
+              updateSketch.update(value);
+            }
+          }
+          break;
+        case LONG:
+          long[][] longValues = blockValSet.getLongValuesMV();
+          for (int i = 0; i < length; i++) {
+            UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKeyArray[i]);
+            for (long value : longValues[i]) {
+              updateSketch.update(value);
+            }
+          }
+          break;
+        case FLOAT:
+          float[][] floatValues = blockValSet.getFloatValuesMV();
+          for (int i = 0; i < length; i++) {
+            UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKeyArray[i]);
+            for (float value : floatValues[i]) {
+              updateSketch.update(value);
+            }
+          }
+          break;
+        case DOUBLE:
+          double[][] doubleValues = blockValSet.getDoubleValuesMV();
+          for (int i = 0; i < length; i++) {
+            UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKeyArray[i]);
+            for (double value : doubleValues[i]) {
+              updateSketch.update(value);
+            }
+          }
+          break;
+        case STRING:
+          String[][] stringValues = blockValSet.getStringValuesMV();
+          for (int i = 0; i < length; i++) {
+            UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKeyArray[i]);
+            for (String value : stringValues[i]) {
+              updateSketch.update(value);
+            }
+          }
+          break;
+        default:
+          throw new IllegalStateException(
+              "Illegal multi-value data type for RAW_THETA_SKETCH aggregation function: " + valueType);
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    FieldSpec.DataType valueType = blockValSet.getValueType();
+
+    if (blockValSet.isSingleValue()) {
+      switch (valueType) {
+        case INT:
+          int[] intValues = blockValSet.getIntValuesSV();
+          for (int i = 0; i < length; i++) {
+            int value = intValues[i];
+            for (int groupKey : groupKeysArray[i]) {
+              getUpdateSketch(groupByResultHolder, groupKey).update(value);
+            }
+          }
+          break;
+        case LONG:
+          long[] longValues = blockValSet.getLongValuesSV();
+          for (int i = 0; i < length; i++) {
+            long value = longValues[i];
+            for (int groupKey : groupKeysArray[i]) {
+              getUpdateSketch(groupByResultHolder, groupKey).update(value);
+            }
+          }
+          break;
+        case FLOAT:
+          float[] floatValues = blockValSet.getFloatValuesSV();
+          for (int i = 0; i < length; i++) {
+            float value = floatValues[i];
+            for (int groupKey : groupKeysArray[i]) {
+              getUpdateSketch(groupByResultHolder, groupKey).update(value);
+            }
+          }
+          break;
+        case DOUBLE:
+          double[] doubleValues = blockValSet.getDoubleValuesSV();
+          for (int i = 0; i < length; i++) {
+            double value = doubleValues[i];
+            for (int groupKey : groupKeysArray[i]) {
+              getUpdateSketch(groupByResultHolder, groupKey).update(value);
+            }
+          }
+          break;
+        case STRING:
+          String[] stringValues = blockValSet.getStringValuesSV();
+          for (int i = 0; i < length; i++) {
+            String value = stringValues[i];
+            for (int groupKey : groupKeysArray[i]) {
+              getUpdateSketch(groupByResultHolder, groupKey).update(value);
+            }
+          }
+          break;
+        case BYTES:
+          // Serialized sketch
+          byte[][] bytesValues = blockValSet.getBytesValuesSV();
+          for (int i = 0; i < length; i++) {
+            Sketch sketch = Sketch.wrap(Memory.wrap(bytesValues[i]));
+            for (int groupKey : groupKeysArray[i]) {
+              getUnion(groupByResultHolder, groupKey).update(sketch);
+            }
+          }
+          break;
+        default:
+          throw new IllegalStateException(
+              "Illegal single-value data type for RAW_THETA_SKETCH aggregation function: " + valueType);
+      }
+    } else {
+      switch (valueType) {
+        case INT:
+          int[][] intValues = blockValSet.getIntValuesMV();
+          for (int i = 0; i < length; i++) {
+            int[] values = intValues[i];
+            for (int groupKey : groupKeysArray[i]) {
+              UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKey);
+              for (int value : values) {
+                updateSketch.update(value);
+              }
+            }
+          }
+          break;
+        case LONG:
+          long[][] longValues = blockValSet.getLongValuesMV();
+          for (int i = 0; i < length; i++) {
+            long[] values = longValues[i];
+            for (int groupKey : groupKeysArray[i]) {
+              UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKey);
+              for (long value : values) {
+                updateSketch.update(value);
+              }
+            }
+          }
+          break;
+        case FLOAT:
+          float[][] floatValues = blockValSet.getFloatValuesMV();
+          for (int i = 0; i < length; i++) {
+            float[] values = floatValues[i];
+            for (int groupKey : groupKeysArray[i]) {
+              UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKey);
+              for (float value : values) {
+                updateSketch.update(value);
+              }
+            }
+          }
+          break;
+        case DOUBLE:
+          double[][] doubleValues = blockValSet.getDoubleValuesMV();
+          for (int i = 0; i < length; i++) {
+            double[] values = doubleValues[i];
+            for (int groupKey : groupKeysArray[i]) {
+              UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKey);
+              for (double value : values) {
+                updateSketch.update(value);
+              }
+            }
+          }
+          break;
+        case STRING:
+          String[][] stringValues = blockValSet.getStringValuesMV();
+          for (int i = 0; i < length; i++) {
+            String[] values = stringValues[i];
+            for (int groupKey : groupKeysArray[i]) {
+              UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKey);
+              for (String value : values) {
+                updateSketch.update(value);
+              }
+            }
+          }
+          break;
+        default:
+          throw new IllegalStateException(
+              "Illegal multi-value data type for RAW_THETA_SKETCH aggregation function: " + valueType);
+      }
+    }
+  }
+
+  @Override
+  public Sketch extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    Object result = aggregationResultHolder.getResult();
+    if (result == null) {
+      return _updateSketchBuilder.build();
+    } else {
+      if (result instanceof Sketch) {
+        return (Sketch) result;
+      } else {
+        assert result instanceof Union;
+        return ((Union) result).getResult();
+      }
+    }
+  }
+
+  @Override
+  public Sketch extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+    Object result = groupByResultHolder.getResult(groupKey);
+    if (result instanceof Sketch) {
+      return (Sketch) result;
+    } else {
+      assert result instanceof Union;
+      return ((Union) result).getResult();
+    }
+  }
+
+  @Override
+  public Sketch merge(Sketch sketch1, Sketch sketch2) {
+    Union union = _setOperationBuilder.buildUnion();
+    union.update(sketch1);
+    union.update(sketch2);
+    return union.getResult();
+  }
+
+  @Override
+  public boolean isIntermediateResultComparable() {
+    return false;
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.STRING;
+  }
+
+  @Override
+  public String extractFinalResult(Sketch sketch) {
+    return Base64.getEncoder().encodeToString(sketch.compact().toByteArray());
+  }
+
+  /**
+   * Returns the UpdateSketch from the result holder or creates a new one if it does not exist.
+   */
+  private UpdateSketch getUpdateSketch(AggregationResultHolder aggregationResultHolder) {
+    UpdateSketch updateSketch = aggregationResultHolder.getResult();
+    if (updateSketch == null) {
+      updateSketch = _updateSketchBuilder.build();
+      aggregationResultHolder.setValue(updateSketch);
+    }
+    return updateSketch;
+  }
+
+  /**
+   * Returns the Union from the result holder or creates a new one if it does not exist.
+   */
+  private Union getUnion(AggregationResultHolder aggregationResultHolder) {
+    Union union = aggregationResultHolder.getResult();
+    if (union == null) {
+      union = _setOperationBuilder.buildUnion();
+      aggregationResultHolder.setValue(union);
+    }
+    return union;
+  }
+
+  /**
+   * Returns the UpdateSketch for the given group key or creates a new one if it does not exist.
+   */
+  private UpdateSketch getUpdateSketch(GroupByResultHolder groupByResultHolder, int groupKey) {
+    UpdateSketch updateSketch = groupByResultHolder.getResult(groupKey);
+    if (updateSketch == null) {
+      updateSketch = _updateSketchBuilder.build();
+      groupByResultHolder.setValueForKey(groupKey, updateSketch);
+    }
+    return updateSketch;
+  }
+
+  /**
+   * Returns the UpdateSketch for the given group key or creates a new one if it does not exist.
+   */
+  private Union getUnion(GroupByResultHolder groupByResultHolder, int groupKey) {
+    Union union = groupByResultHolder.getResult(groupKey);
+    if (union == null) {
+      union = _setOperationBuilder.buildUnion();
+      groupByResultHolder.setValueForKey(groupKey, union);
+    }
+    return union;
+  }
+
+  /**
+   * Helper class to wrap the theta-sketch parameters.
+   */
+  static class Parameters {
+    private static final char PARAMETER_DELIMITER = ';';
+    private static final char PARAMETER_KEY_VALUE_SEPARATOR = '=';
+    private static final String NOMINAL_ENTRIES_KEY = "nominalEntries";
+
+    private int _nominalEntries = Util.DEFAULT_NOMINAL_ENTRIES;
+
+    Parameters(String parametersString) {
+      StringUtils.deleteWhitespace(parametersString);
+      String[] keyValuePairs = StringUtils.split(parametersString, PARAMETER_DELIMITER);
+      for (String keyValuePair : keyValuePairs) {
+        String[] keyAndValue = StringUtils.split(keyValuePair, PARAMETER_KEY_VALUE_SEPARATOR);
+        Preconditions.checkArgument(keyAndValue.length == 2, "Invalid parameter: %s", keyValuePair);
+        String key = keyAndValue[0];
+        String value = keyAndValue[1];
+        if (key.equalsIgnoreCase(NOMINAL_ENTRIES_KEY)) {
+          _nominalEntries = Integer.parseInt(value);
+        } else {
+          throw new IllegalArgumentException("Invalid parameter key: " + key);
+        }
+      }
+    }
+
+    int getNominalEntries() {
+      return _nominalEntries;
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
index 2859450..57beb3c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
@@ -120,10 +120,11 @@ public class BrokerRequestToQueryContextConverter {
           List<ExpressionContext> arguments = new ArrayList<>(numArguments);
           if (functionName.equalsIgnoreCase(AggregationFunctionType.DISTINCTCOUNTTHETASKETCH.getName()) || functionName
               .equalsIgnoreCase(AggregationFunctionType.DISTINCTCOUNTRAWTHETASKETCH.getName()) || functionName
+              .equalsIgnoreCase(AggregationFunctionType.RAWTHETASKETCH.name()) || functionName
               .equalsIgnoreCase(AggregationFunctionType.IDSET.getName())) {
-            // NOTE: For DistinctCountThetaSketch, DistinctCountRawThetaSketch and IdSet, because of the legacy behavior
-            //       of PQL compiler treating string literal as identifier in aggregation, here we treat all expressions
-            //       except for the first one as string literal.
+            // NOTE: For DistinctCountThetaSketch, DistinctCountRawThetaSketch, RawThetaSketch and IdSet, because of the
+            //       legacy behavior of PQL compiler treating string literal as identifier in aggregation, here we treat
+            //       all expressions except for the first one as string literal.
             arguments.add(QueryContextConverterUtils.getExpression(stringExpressions.get(0)));
             for (int i = 1; i < numArguments; i++) {
               arguments.add(ExpressionContext.forLiteral(stringExpressions.get(i)));
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
index 0ad20a6..2627b78 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
@@ -18,23 +18,21 @@
  */
 package org.apache.pinot.queries;
 
-import static org.apache.pinot.core.query.aggregation.function.DistinctCountThetaSketchAggregationFunction.MergeFunction;
-
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
-import joptsimple.internal.Strings;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.UpdateSketch;
 import org.apache.datasketches.theta.UpdateSketchBuilder;
 import org.apache.pinot.common.function.AggregationFunctionType;
-import org.apache.pinot.common.response.broker.AggregationResult;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.GroupByResult;
 import org.apache.pinot.common.segment.ReadMode;
@@ -52,14 +50,14 @@ import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+
 
 /**
  * Unit tests for {@link org.apache.pinot.core.query.aggregation.function.DistinctCountThetaSketchAggregationFunction}.
@@ -80,6 +78,7 @@ public class DistinctCountThetaSketchTest extends BaseQueriesTest {
 
   private static final long RANDOM_SEED = System.nanoTime();
   private static final Random RANDOM = new Random(RANDOM_SEED);
+  private static final String ERROR_MESSAGE = "Random seed: " + RANDOM_SEED;
 
   private IndexSegment _indexSegment;
   private List<IndexSegment> _indexSegments;
@@ -128,7 +127,7 @@ public class DistinctCountThetaSketchTest extends BaseQueriesTest {
 
   @DataProvider(name = "badQueries")
   public Object[][] badQueries() {
-    return new Object[][] {
+    return new Object[][]{
         // need at least 4 arguments in agg func
         {"select distinctCountThetaSketch(colTS, 'nominalEntries=123', '$0') from testTable"},
         // substitution arguments should start at $1
@@ -142,59 +141,47 @@ public class DistinctCountThetaSketchTest extends BaseQueriesTest {
         // union with < 2 arguments
         {"select distinctCountThetaSketch(colTS, 'nominalEntries=123', 'colA = 1', 'SET_UNION($1)')"},
         // intersect with < 2 arguments
-        {"select distinctCountThetaSketch(colTS, 'nominalEntries=123', 'colA = 1', 'SET_INTERSECT($1)')"}
-    };
+        {"select distinctCountThetaSketch(colTS, 'nominalEntries=123', 'colA = 1', 'SET_INTERSECT($1)')"}};
   }
 
   private void testThetaSketches(boolean groupBy, boolean sql) {
-    String tsQuery, distinctQuery;
-    String thetaSketchParams = "nominalEntries=1001";
-
-    List<String> predicateStrings = Collections.singletonList("colA = 1");
-    String substitution = "$1";
-    String whereClause = Strings.join(predicateStrings, " or ");
-    tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, substitution, groupBy, false);
-    distinctQuery = buildQuery(whereClause, null, null, null, groupBy, false);
-    testQuery(tsQuery, distinctQuery, groupBy, sql, false);
+    String parameters = "nominalEntries=1001";
 
-    tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, substitution, groupBy, true);
-    testQuery(tsQuery, distinctQuery, groupBy, sql, true);
+    List<String> predicates = Collections.singletonList("colA = 1");
+    String postAggregationExpression = "$1";
+    String filter = "colA = 1";
+    testThetaSketch(parameters, predicates, postAggregationExpression, filter, groupBy, sql);
 
     // Test Intersection (AND)
-    predicateStrings = Arrays.asList("colA = 1", "colB >= 2.0", "colC <> 'colC_1'");
-    substitution = "SET_INTERSECT($1, $2, $3)";
-    whereClause = Strings.join(predicateStrings, " and ");
-    tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, substitution, groupBy, false);
-    distinctQuery = buildQuery(whereClause, null, null, null, groupBy, false);
-    testQuery(tsQuery, distinctQuery, groupBy, sql, false);
-
-    tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, substitution, groupBy, true);
-    testQuery(tsQuery, distinctQuery, groupBy, sql, true);
+    predicates = Arrays.asList("colA = 1", "colB >= 2.0", "colC <> 'colC_1'");
+    postAggregationExpression = "SET_INTERSECT($1, $2, $3)";
+    filter = StringUtils.join(predicates, " and ");
+    testThetaSketch(parameters, predicates, postAggregationExpression, filter, groupBy, sql);
 
     // Test Union (OR)
-    predicateStrings = Arrays.asList("colA = 1", "colB = 1.9");
-    substitution = "SET_UNION($1, $2)";
-    whereClause = Strings.join(predicateStrings, " or ");
-    tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, substitution, groupBy, false);
-    distinctQuery = buildQuery(whereClause, null, null, null, groupBy, false);
-    testQuery(tsQuery, distinctQuery, groupBy, sql, false);
-
-    tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, substitution, groupBy, true);
-    testQuery(tsQuery, distinctQuery, groupBy, sql, true);
+    predicates = Arrays.asList("colA = 1", "colB = 1.9");
+    postAggregationExpression = "SET_UNION($1, $2)";
+    filter = StringUtils.join(predicates, " or ");
+    testThetaSketch(parameters, predicates, postAggregationExpression, filter, groupBy, sql);
 
     // Test complex predicates
-    predicateStrings = Arrays.asList("colA in (1, 2)", "colB not in (3.0)", "colC between 'colC_1' and 'colC_5'");
-    // operator precedence. ORs are evaluated after ANDs
-    substitution = "SET_UNION(SET_INTERSECT($1, $2), SET_INTERSECT($1, $3))";
-    whereClause =
-        predicateStrings.get(0) + " and " + predicateStrings.get(1) + " or " + predicateStrings.get(0) + " and "
-            + predicateStrings.get(2);
-    tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, substitution, groupBy, false);
-    distinctQuery = buildQuery(whereClause, null, null, null, groupBy, false);
-    testQuery(tsQuery, distinctQuery, groupBy, sql, false);
-
-    tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, substitution, groupBy, true);
-    testQuery(tsQuery, distinctQuery, groupBy, sql, true);
+    predicates = Arrays.asList("colA in (1, 2)", "colB not in (3.0)", "colC between 'colC_1' and 'colC_5'");
+    postAggregationExpression = "SET_UNION(SET_INTERSECT($1, $2), SET_INTERSECT($1, $3))";
+    filter = '(' + predicates.get(0) + " and " + predicates.get(1) + ") or (" + predicates.get(0) + " and " + predicates
+        .get(2) + ')';
+    testThetaSketch(parameters, predicates, postAggregationExpression, filter, groupBy, sql);
+  }
+
+  private void testThetaSketch(String parameters, List<String> predicates, String postAggregationExpression,
+      String filter, boolean groupBy, boolean sql) {
+    String distinctCountThetaSketchQuery =
+        buildDistinctCountThetaSketchQuery(parameters, predicates, postAggregationExpression, filter, groupBy);
+    String rawThetaSketchQuery1 = buildRawThetaSketchQuery(THETA_SKETCH_COLUMN, parameters, filter, groupBy);
+    String rawThetaSketchQuery2 = buildRawThetaSketchQuery(DISTINCT_COLUMN, parameters, filter, groupBy);
+    String distinctCountQuery = buildDistinctCountQuery(filter, groupBy);
+    testQuery(distinctCountThetaSketchQuery, distinctCountQuery, groupBy, sql, false);
+    testQuery(rawThetaSketchQuery1, distinctCountQuery, groupBy, sql, true);
+    testQuery(rawThetaSketchQuery2, distinctCountQuery, groupBy, sql, true);
   }
 
   private void testQuery(String tsQuery, String distinctQuery, boolean groupBy, boolean sql, boolean raw) {
@@ -229,111 +216,86 @@ public class DistinctCountThetaSketchTest extends BaseQueriesTest {
     }
   }
 
-  private void compareAggregationPql(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse,
-      boolean raw) {
-    List<AggregationResult> actualResults = actualResponse.getAggregationResults();
-    Assert.assertEquals(actualResults.size(), 1);
-    double actual = getSketchValue((String) actualResults.get(0).getValue(), raw);
-
-    List<AggregationResult> expectedResults = expectedResponse.getAggregationResults();
-    double expected = Double.parseDouble((String) expectedResults.get(0).getValue());
-
-    Assert.assertEquals(actual, expected, (expected * 0.1), // Allow for 10 % error.
-        "Distinct count mismatch: actual: " + actual + "expected: " + expected + "seed:" + RANDOM_SEED);
-  }
-
   private void compareSql(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse, boolean raw) {
     List<Object[]> actualRows = actualResponse.getResultTable().getRows();
     List<Object[]> expectedRows = expectedResponse.getResultTable().getRows();
-
-    Assert.assertEquals(actualRows.size(), expectedRows.size());
-
-    for (int i = 0; i < actualRows.size(); i++) {
-      double actual = getSketchValue(actualRows.get(i)[0].toString(), raw);
-      double expected = (Integer) expectedRows.get(i)[0];
-      Assert.assertEquals(actual, expected);
+    int numRows = actualRows.size();
+    assertEquals(numRows, expectedRows.size(), ERROR_MESSAGE);
+    for (int i = 0; i < numRows; i++) {
+      int actual = getSketchValue(actualRows.get(i)[0].toString(), raw);
+      int expected = (int) expectedRows.get(i)[0];
+      assertEquals(actual, expected, ERROR_MESSAGE);
     }
   }
 
-  private void compareGroupByPql(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse,
+  private void compareAggregationPql(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse,
       boolean raw) {
-    AggregationResult actualResult = actualResponse.getAggregationResults().get(0);
-    List<GroupByResult> actualGroupBy = actualResult.getGroupByResult();
-
-    AggregationResult expectedResult = expectedResponse.getAggregationResults().get(0);
-    List<GroupByResult> expectedGroupBy = expectedResult.getGroupByResult();
-
-    Assert.assertEquals(actualGroupBy.size(), expectedGroupBy.size());
-    for (int i = 0; i < actualGroupBy.size(); i++) {
-      double actual = getSketchValue((String) actualGroupBy.get(i).getValue(), raw);
-      double expected = Double.parseDouble((String) expectedGroupBy.get(i).getValue());
+    int actual = getSketchValue((String) actualResponse.getAggregationResults().get(0).getValue(), raw);
+    int expected = Integer.parseInt((String) expectedResponse.getAggregationResults().get(0).getValue());
+    assertEquals(actual, expected, ERROR_MESSAGE);
+  }
 
-      Assert.assertEquals(actual, expected, (expected * 0.1), // Allow for 10 % error.
-          "Distinct count mismatch: actual: " + actual + "expected: " + expected + "seed:" + RANDOM_SEED);
+  private void compareGroupByPql(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse,
+      boolean raw) {
+    List<GroupByResult> actualResults = actualResponse.getAggregationResults().get(0).getGroupByResult();
+    List<GroupByResult> expectedResults = expectedResponse.getAggregationResults().get(0).getGroupByResult();
+    int numGroups = actualResults.size();
+    assertEquals(numGroups, expectedResults.size(), ERROR_MESSAGE);
+    for (int i = 0; i < numGroups; i++) {
+      int actual = getSketchValue((String) actualResults.get(i).getValue(), raw);
+      int expected = Integer.parseInt((String) expectedResults.get(i).getValue());
+      assertEquals(actual, expected, ERROR_MESSAGE);
     }
   }
 
-  private double getSketchValue(String value, boolean raw) {
+  private int getSketchValue(String value, boolean raw) {
     if (!raw) {
-      return Double.parseDouble(value);
+      return Integer.parseInt(value);
+    } else {
+      byte[] bytes = Base64.getDecoder().decode(value);
+      return (int) Math.round(Sketch.wrap(Memory.wrap(bytes)).getEstimate());
     }
-
-    byte[] bytes = BytesUtils.toBytes(value);
-    return Sketch.wrap(Memory.wrap(bytes)).getEstimate();
   }
 
-  private String buildQuery(String whereClause, String thetaSketchParams, List<String> thetaSketchPredicates,
-      String postAggregationExpression, boolean groupBy, boolean raw) {
-    String column;
-    String aggrFunction;
-    boolean thetaSketch = (postAggregationExpression != null);
-
-    if (thetaSketch) {
-      aggrFunction = (raw) ? AggregationFunctionType.DISTINCTCOUNTRAWTHETASKETCH.getName()
-          : AggregationFunctionType.DISTINCTCOUNTTHETASKETCH.getName();
-      column = THETA_SKETCH_COLUMN;
-    } else {
-      aggrFunction = AggregationFunctionType.DISTINCTCOUNT.getName();
-      column = DISTINCT_COLUMN;
+  private String buildDistinctCountThetaSketchQuery(String parameters, List<String> predicates,
+      String postAggregationExpression, String filter, boolean groupBy) {
+    StringBuilder stringBuilder =
+        new StringBuilder("select ").append(AggregationFunctionType.DISTINCTCOUNTTHETASKETCH.getName()).append('(')
+            .append(THETA_SKETCH_COLUMN).append(",'").append(parameters).append("','");
+    for (String predicate : predicates) {
+      stringBuilder.append(predicate.replace("'", "''")).append("','");
     }
+    stringBuilder.append(postAggregationExpression.replace("'", "''")).append("')");
 
-    StringBuilder sb = new StringBuilder("select ");
-    sb.append(aggrFunction);
-    sb.append("(");
-    sb.append(column);
-
-    if (thetaSketch) {
-      sb.append(", ");
-
-      sb.append("'");
-      if (thetaSketchParams != null) {
-        sb.append(thetaSketchParams.replace("'", "''"));
-      }
-      sb.append("', ");
-
-      for (String predicate : thetaSketchPredicates) {
-        sb.append('\'');
-        sb.append(predicate.replace("'", "''"));
-        sb.append('\'');
-        sb.append(", ");
-      }
-
-      sb.append('\'');
-      sb.append(postAggregationExpression.replace("'", "''"));
-      sb.append('\'');
+    stringBuilder.append(" from ").append(TABLE_NAME).append(" where ").append(filter);
+    if (groupBy) {
+      stringBuilder.append(" group by colA, colB");
     }
+    return stringBuilder.toString();
+  }
 
-    sb.append(") from ");
-
-    sb.append(TABLE_NAME);
-    sb.append(" where ");
-    sb.append(whereClause);
+  private String buildRawThetaSketchQuery(String column, String parameters, String filter, boolean groupBy) {
+    StringBuilder stringBuilder =
+        new StringBuilder("select ").append(AggregationFunctionType.RAWTHETASKETCH.getName()).append('(').append(column)
+            .append(",'").append(parameters).append("')");
 
+    stringBuilder.append(" from ").append(TABLE_NAME).append(" where ").append(filter);
     if (groupBy) {
-      sb.append(" group by colA, colB");
+      stringBuilder.append(" group by colA, colB");
     }
+    return stringBuilder.toString();
+  }
+
+  private String buildDistinctCountQuery(String filter, boolean groupBy) {
+    StringBuilder stringBuilder =
+        new StringBuilder("select ").append(AggregationFunctionType.DISTINCTCOUNT.getName()).append('(')
+            .append(DISTINCT_COLUMN).append(')');
 
-    return sb.toString();
+    stringBuilder.append(" from ").append(TABLE_NAME).append(" where ").append(filter);
+    if (groupBy) {
+      stringBuilder.append(" group by colA, colB");
+    }
+    return stringBuilder.toString();
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org