You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "andimiller (via GitHub)" <gi...@apache.org> on 2023/03/15 17:20:21 UTC

[GitHub] [pinot] andimiller opened a new pull request, #10427: Tuple sketch support

andimiller opened a new pull request, #10427:
URL: https://github.com/apache/pinot/pull/10427

   This adds support for `BYTES` columns containing Tuple Sketches with Integer as the summary type.
   
   The added classes currently support `Sum` as the semigroup, but are generic so others can be added.
   
   Feature breakdown:
   
   1. Add transform functions that can be used to create Integer Tuple Sketches during ingestion, eg. `toIntegerSumTupleSketch(colA, colbB, 16)`
   2. Add Codecs that use the Datasketches serialization
   3. Add aggregation functions:
     * `DISTINCT_COUNT_TUPLE_SKETCH` will just get the estimate for the number of unique keys, same as Theta or HLL
     * `DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and return the raw sketch
     * `SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and estimate the sum of the value side
     * `AVG_VALUES_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and estimate the average of the value side
   4. Add `ValueAggregator<_, _>`s for use in `StarTree` indexes for all 4 above aggregations
   5. Add `ValueAggregator`s for use in rollups for all 4 above aggregations


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1140346022


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {

Review Comment:
   I've followed the way it was implemented for Theta, using the simplest one as the base and inheriting it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1179227437


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class AvgValueIntegerTupleSketchAggregationFunction
+    extends IntegerTupleSketchAggregationFunction {
+
+  public AvgValueIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch

Review Comment:
   I have expanded the todo and added `// ie, if a Mode argument other than SUM is passed in, switch to the matching AggregationFunctionType`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] swaminathanmanish commented on pull request #10427: Integer Tuple Sketch support

Posted by "swaminathanmanish (via GitHub)" <gi...@apache.org>.
swaminathanmanish commented on PR #10427:
URL: https://github.com/apache/pinot/pull/10427#issuecomment-1525956251

   Thanks for taking care of comments !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1140465158


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/***
+ * This is the base class for all Integer Tuple Sketch aggregations
+ *
+ * Note that it only supports BYTES columns containing serialized sketches currently, but could be expanded to more
+ */
+public class IntegerTupleSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, Comparable> {
+  ExpressionContext _expressionContext;
+  IntegerSummarySetOperations _setOps;
+  int _entries;

Review Comment:
   yup, I forgot you can do that in java, fixed



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/***
+ * This is the base class for all Integer Tuple Sketch aggregations
+ *
+ * Note that it only supports BYTES columns containing serialized sketches currently, but could be expanded to more
+ */
+public class IntegerTupleSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, Comparable> {
+  ExpressionContext _expressionContext;
+  IntegerSummarySetOperations _setOps;
+  int _entries;
+
+  public IntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments.get(0));
+    _expressionContext = arguments.get(0);
+    _setOps = new IntegerSummarySetOperations(mode, mode);
+    _entries = 4096;

Review Comment:
   yup, removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] davecromberge commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "davecromberge (via GitHub)" <gi...@apache.org>.
davecromberge commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1139439869


##########
pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java:
##########
@@ -213,6 +216,8 @@ public static ObjectType getObjectType(Object value) {
         return ObjectType.VarianceTuple;
       } else if (value instanceof PinotFourthMoment) {
         return ObjectType.PinotFourthMoment;
+      } else if (value instanceof org.apache.datasketches.tuple.Sketch) {
+        return ObjectType.IntegerTupleSketch;

Review Comment:
   Is this a safe assumption?  Is it also necessary to inspect the summary type to verify integer?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {
+
+  public SumValuesIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.SUMVALUESINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    double retainedTotal = 0L;
+    SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+    while (summaries.next()) {
+      retainedTotal += summaries.getSummary().getValue();
+    }
+    double estimate = retainedTotal / union.getResult().getRetainedEntries() * union.getResult().getEstimate();

Review Comment:
   You could multiply by theta instead which should be equivalent to the number of retained entries divided by the estimate.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class AvgIntegerTupleSketchAggregationFunction
+    extends IntegerTupleSketchAggregationFunction {
+
+  public AvgIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.AVGVALUEINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    double retainedTotal = 0L;
+    SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+    while (summaries.next()) {
+      retainedTotal += summaries.getSummary().getValue();
+    }
+    double estimate = retainedTotal / union.getResult().getRetainedEntries();
+    return Double.valueOf(estimate).longValue();

Review Comment:
   Does calling getResult() on the union multiple times recompute the result?



##########
pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java:
##########
@@ -918,6 +923,28 @@ public Sketch deserialize(ByteBuffer byteBuffer) {
     }
   };
 
+  public static final ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>> DATA_SKETCH_INT_TUPLE_SER_DE =
+      new ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>>() {
+        @Override
+        public byte[] serialize(org.apache.datasketches.tuple.Sketch<IntegerSummary> value) {
+          return value.compact().toByteArray();
+        }
+
+        @Override
+        public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(byte[] bytes) {
+          return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+              new IntegerSummaryDeserializer());
+        }
+
+        @Override
+        public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(ByteBuffer byteBuffer) {
+          byte[] bytes = new byte[byteBuffer.remaining()];
+          byteBuffer.get(bytes);
+          return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+              new IntegerSummaryDeserializer());

Review Comment:
   The Datasketches Memory can also wrap a byte buffer directly.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {
+
+  public SumValuesIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.SUMVALUESINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    double retainedTotal = 0L;
+    SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+    while (summaries.next()) {
+      retainedTotal += summaries.getSummary().getValue();
+    }
+    double estimate = retainedTotal / union.getResult().getRetainedEntries() * union.getResult().getEstimate();
+    return Double.valueOf(estimate).longValue();
+  }

Review Comment:
   Does the serde always deserialise bytes to a compact sketch?  It could be better to use the base `Sketch` abstraction for cases where the sketches have been created outside the system and not compacted.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java:
##########
@@ -299,13 +299,21 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS);
           case FOURTHMOMENT:
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT);
+          case DISTINCTCOUNTTUPLESKETCH:
+            // mode actually doesn't matter here because we only care about keys, not values
+            return new DistinctCountIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+            return new IntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case SUMVALUESINTEGERSUMTUPLESKETCH:
+            return new SumValuesIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case AVGVALUEINTEGERSUMTUPLESKETCH:
+            return new AvgIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);

Review Comment:
   `DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH` is consistent with the current naming scheme but it's somewhat redundant to have the `DISTINCTCOUNT` prefix since it's clear that we are using a tuple (or theta) sketch.    This is just a subjective remark and not necessary to change it since this is consistent with the theta naming.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class AvgIntegerTupleSketchAggregationFunction
+    extends IntegerTupleSketchAggregationFunction {
+
+  public AvgIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.AVGVALUEINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    double retainedTotal = 0L;
+    SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+    while (summaries.next()) {
+      retainedTotal += summaries.getSummary().getValue();
+    }
+    double estimate = retainedTotal / union.getResult().getRetainedEntries();

Review Comment:
   It could be a good idea to first check if the the union is empty, otherwise you may encounter division by zero.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/***
+ * This is the base class for all Integer Tuple Sketch aggregations
+ *
+ * Note that it only supports BYTES columns containing serialized sketches currently, but could be expanded to more
+ */
+public class IntegerTupleSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, Comparable> {
+  ExpressionContext _expressionContext;
+  IntegerSummarySetOperations _setOps;
+  int _entries;

Review Comment:
   Would these be final if they are only set in the constructor?



##########
pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java:
##########
@@ -131,4 +133,49 @@ public static byte[] toHLL(@Nullable Object input, int log2m) {
     }
     return ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize(hll);
   }
+
+  /**
+   * Create a Tuple Sketch containing the key and value supplied
+   *
+   * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
+   * @param value an Integer we want to associate as the value to go along with the key, may be null to return an
+   *              empty sketch
+   * @return serialized tuple sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toIntegerSumTupleSketch(@Nullable Object key, @Nullable Integer value) {
+    return toIntegerSumTupleSketch(key, value, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+  }
+
+  /**
+   * Create a Tuple Sketch containing the key and value supplied
+   *
+   * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
+   * @param value an Integer we want to associate as the value to go along with the key, may be null to return an
+   *              empty sketch
+   * @param lgK integer representing the log of the maximum number of retained entries in the sketch, between 4 and 26
+   * @return serialized tuple sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toIntegerSumTupleSketch(@Nullable Object key, Integer value, int lgK) {
+    IntegerSketch is = new IntegerSketch(lgK, IntegerSummary.Mode.Sum);
+    if (value != null) {
+      if (key instanceof Integer) {

Review Comment:
   The tradeoff with this decision is that we may lose the "key" from a distinct count for a null value, but I don't see a sensible way around this beside defaulting to some sentinel which may not be applicable to all use cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] swaminathanmanish commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "swaminathanmanish (via GitHub)" <gi...@apache.org>.
swaminathanmanish commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1139067772


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -96,6 +96,9 @@ public static class Helix {
     // https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html
     public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536;
 
+
+    public static final int DEFAULT_TUPLE_SKETCH_LGK = 16;

Review Comment:
   Any references that can help explain this value? 



##########
pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java:
##########
@@ -131,4 +133,49 @@ public static byte[] toHLL(@Nullable Object input, int log2m) {
     }
     return ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize(hll);
   }
+
+  /**
+   * Create a Tuple Sketch containing the key and value supplied
+   *
+   * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
+   * @param value an Integer we want to associate as the value to go along with the key, may be null to return an
+   *              empty sketch
+   * @return serialized tuple sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toIntegerSumTupleSketch(@Nullable Object key, @Nullable Integer value) {
+    return toIntegerSumTupleSketch(key, value, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+  }
+
+  /**
+   * Create a Tuple Sketch containing the key and value supplied
+   *
+   * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
+   * @param value an Integer we want to associate as the value to go along with the key, may be null to return an
+   *              empty sketch
+   * @param lgK integer representing the log of the maximum number of retained entries in the sketch, between 4 and 26
+   * @return serialized tuple sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toIntegerSumTupleSketch(@Nullable Object key, Integer value, int lgK) {
+    IntegerSketch is = new IntegerSketch(lgK, IntegerSummary.Mode.Sum);
+    if (value != null) {
+      if (key instanceof Integer) {
+        is.update((Integer) key, value);
+      } else if (key instanceof Long) {
+        is.update((Long) key, value);
+      } else if (key instanceof Float) {
+        is.update((float) key, value);
+      } else if (key instanceof Double) {
+        is.update((double) key, value);
+      } else if (key instanceof BigDecimal) {
+        is.update(((BigDecimal) key).toString(), value);
+      } else if (key instanceof String) {
+        is.update((String) key, value);
+      } else if (key instanceof byte[]) {
+        is.update((byte[]) key, value);
+      }

Review Comment:
   In case you want to validate/catch invalid types, consider throwing an IllegalStateException/illegalArg exception ?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+public class IntegerTupleSketchValueAggregator implements ValueAggregator<byte[], Sketch<IntegerSummary>> {

Review Comment:
   Can the raw type (R) be Sketch<IntegerSummary>, instead of byte[] here ? Looking at the other sketch implementation (DistinctCountThetaSketchValueAggregator), which has Object as the raw type, I just wanted to check. 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java:
##########
@@ -299,13 +299,21 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS);
           case FOURTHMOMENT:
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT);
+          case DISTINCTCOUNTTUPLESKETCH:
+            // mode actually doesn't matter here because we only care about keys, not values
+            return new DistinctCountIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);

Review Comment:
   is there a reason why we pass IntegerSummary.Mode.Sum as a parameter ? We are already differentiating based on the aggregation implementations IntegerTupleSketchAggregationFunction vs AvgIntegerTupleSketchAggregationFunction vs SumValuesIntegerTupleSketchAggregationFunction 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java:
##########
@@ -66,6 +67,11 @@ public static ValueAggregator getValueAggregator(AggregationFunctionType aggrega
       case DISTINCTCOUNTTHETASKETCH:
       case DISTINCTCOUNTRAWTHETASKETCH:
         return new DistinctCountThetaSketchValueAggregator();
+      case DISTINCTCOUNTTUPLESKETCH:
+      case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+      case AVGVALUEINTEGERSUMTUPLESKETCH:
+      case SUMVALUESINTEGERSUMTUPLESKETCH:
+        return new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);

Review Comment:
   Thanks! That makes sense. Other functions are on top of sum. 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/***
+ * This is the base class for all Integer Tuple Sketch aggregations
+ *
+ * Note that it only supports BYTES columns containing serialized sketches currently, but could be expanded to more
+ */
+public class IntegerTupleSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, Comparable> {
+  ExpressionContext _expressionContext;
+  IntegerSummarySetOperations _setOps;
+  int _entries;
+
+  public IntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments.get(0));
+    _expressionContext = arguments.get(0);
+    _setOps = new IntegerSummarySetOperations(mode, mode);
+    _entries = 4096;

Review Comment:
   assignment not needed ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1140475437


##########
pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java:
##########
@@ -131,4 +133,49 @@ public static byte[] toHLL(@Nullable Object input, int log2m) {
     }
     return ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize(hll);
   }
+
+  /**
+   * Create a Tuple Sketch containing the key and value supplied
+   *
+   * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
+   * @param value an Integer we want to associate as the value to go along with the key, may be null to return an
+   *              empty sketch
+   * @return serialized tuple sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toIntegerSumTupleSketch(@Nullable Object key, @Nullable Integer value) {
+    return toIntegerSumTupleSketch(key, value, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+  }
+
+  /**
+   * Create a Tuple Sketch containing the key and value supplied
+   *
+   * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
+   * @param value an Integer we want to associate as the value to go along with the key, may be null to return an
+   *              empty sketch
+   * @param lgK integer representing the log of the maximum number of retained entries in the sketch, between 4 and 26
+   * @return serialized tuple sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toIntegerSumTupleSketch(@Nullable Object key, Integer value, int lgK) {
+    IntegerSketch is = new IntegerSketch(lgK, IntegerSummary.Mode.Sum);
+    if (value != null) {
+      if (key instanceof Integer) {
+        is.update((Integer) key, value);
+      } else if (key instanceof Long) {
+        is.update((Long) key, value);
+      } else if (key instanceof Float) {
+        is.update((float) key, value);
+      } else if (key instanceof Double) {
+        is.update((double) key, value);
+      } else if (key instanceof BigDecimal) {
+        is.update(((BigDecimal) key).toString(), value);
+      } else if (key instanceof String) {
+        is.update((String) key, value);
+      } else if (key instanceof byte[]) {
+        is.update((byte[]) key, value);
+      }

Review Comment:
   done, added it for theta too and expanded the tests to cover



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1140492865


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {
+
+  public SumValuesIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.SUMVALUESINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    double retainedTotal = 0L;
+    SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+    while (summaries.next()) {
+      retainedTotal += summaries.getSummary().getValue();
+    }
+    double estimate = retainedTotal / union.getResult().getRetainedEntries() * union.getResult().getEstimate();

Review Comment:
   yes, it is equivalent to `double estimate = retainedTotal / result.getTheta()` so I've swapped it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] swaminathanmanish commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "swaminathanmanish (via GitHub)" <gi...@apache.org>.
swaminathanmanish commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1139468915


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {

Review Comment:
   Would composition + delegation make the APIs for Sum, Avg, distinct clearer than inheritance ? That way we know when/how IntegerTupleSketchAggregationFunction is exactly used and it'll decouple the Integer API from the rest. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] swaminathanmanish commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "swaminathanmanish (via GitHub)" <gi...@apache.org>.
swaminathanmanish commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1142224694


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/***
+ * This is the base class for all Integer Tuple Sketch aggregations
+ *
+ * Note that it only supports BYTES columns containing serialized sketches currently, but could be expanded to more
+ */
+public class IntegerTupleSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, Comparable> {
+  final ExpressionContext _expressionContext;
+  final IntegerSummarySetOperations _setOps;
+  final int _entries;
+
+  public IntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments.get(0));
+    _expressionContext = arguments.get(0);
+    _setOps = new IntegerSummarySetOperations(mode, mode);
+    if (arguments.size() == 2) {
+      _entries = ((Long) arguments.get(1).getLiteral().getValue()).intValue();
+    } else {
+      _entries = (int) Math.pow(2, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+    }
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized Integer Tuple Sketch
+    FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+    if (storedType == FieldSpec.DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        List<CompactSketch<IntegerSummary>> integerSketch = aggregationResultHolder.getResult();
+        if (integerSketch != null) {
+          List<CompactSketch<IntegerSummary>> sketches =
+              Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
+                  .map(Sketch::compact).collect(Collectors.toList());
+          aggregationResultHolder.setValue(merge(aggregationResultHolder.getResult(), sketches));
+        } else {
+          List<CompactSketch<IntegerSummary>> sketches =
+              Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
+                  .map(Sketch::compact).collect(Collectors.toList());
+          aggregationResultHolder.setValue(sketches);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging Tuple Sketches", e);
+      }
+    } else {
+      throw new IllegalStateException("Illegal data type for " + getType() + " aggregation function: " + storedType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized Integer Tuple Sketch
+    FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+
+    if (storedType == FieldSpec.DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        for (int i = 0; i < length; i++) {
+          byte[] value = bytesValues[i];
+          int groupKey = groupKeyArray[i];
+          CompactSketch<IntegerSummary> newSketch =
+              ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
+          if (groupByResultHolder.getResult(groupKey) == null) {
+            ArrayList<CompactSketch<IntegerSummary>> newList = new ArrayList<>();
+            newList.add(newSketch);
+            groupByResultHolder.setValueForKey(groupKey, newList);
+          } else {
+            groupByResultHolder.<List<CompactSketch<IntegerSummary>>>getResult(groupKey).add(newSketch);
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging Tuple Sketches", e);
+      }
+    } else {
+      throw new IllegalStateException(
+          "Illegal data type for INTEGER_TUPLE_SKETCH_UNION aggregation function: " + storedType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    byte[][] valueArray = blockValSetMap.get(_expression).getBytesValuesSV();
+    for (int i = 0; i < length; i++) {
+      byte[] value = valueArray[i];
+      CompactSketch<IntegerSummary> newSketch =
+          ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
+      for (int groupKey : groupKeysArray[i]) {
+        if (groupByResultHolder.getResult(groupKey) == null) {
+          groupByResultHolder.setValueForKey(groupKey, Collections.singletonList(newSketch));
+        } else {
+          groupByResultHolder.<List<CompactSketch<IntegerSummary>>>getResult(groupKey).add(newSketch);
+        }
+      }
+    }
+  }
+
+  @Override
+  public List<CompactSketch<IntegerSummary>> extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    return aggregationResultHolder.getResult();
+  }
+
+  @Override
+  public List<CompactSketch<IntegerSummary>> extractGroupByResult(GroupByResultHolder groupByResultHolder,
+      int groupKey) {
+    return groupByResultHolder.getResult(groupKey);
+  }
+
+  @Override
+  public List<CompactSketch<IntegerSummary>> merge(List<CompactSketch<IntegerSummary>> intermediateResult1,
+      List<CompactSketch<IntegerSummary>> intermediateResult2) {
+    if (intermediateResult1 == null && intermediateResult2 != null) {
+      return intermediateResult2;
+    } else if (intermediateResult1 != null && intermediateResult2 == null) {
+      return intermediateResult1;
+    } else if (intermediateResult1 == null && intermediateResult2 == null) {
+      return new ArrayList<>(0);
+    }
+    ArrayList<CompactSketch<IntegerSummary>> merged =
+        new ArrayList<>(intermediateResult1.size() + intermediateResult2.size());
+    merged.addAll(intermediateResult1);

Review Comment:
   Just curious - We dont want to do a union here for the merge?  Im looking at DistinctCountThetaSketchAggregationFunction for reference. 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class AvgValueIntegerTupleSketchAggregationFunction
+    extends IntegerTupleSketchAggregationFunction {
+
+  public AvgValueIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch

Review Comment:
   Could you elaborate what this TODO is for ? Is it related to the Integer summary modes ?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {

Review Comment:
   Yes makes sense to keep them consistent. 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/***
+ * This is the base class for all Integer Tuple Sketch aggregations
+ *
+ * Note that it only supports BYTES columns containing serialized sketches currently, but could be expanded to more
+ */
+public class IntegerTupleSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, Comparable> {
+  final ExpressionContext _expressionContext;
+  final IntegerSummarySetOperations _setOps;
+  final int _entries;
+
+  public IntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments.get(0));
+    _expressionContext = arguments.get(0);

Review Comment:
   Some validations on the input could be useful ? 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/***
+ * This is the base class for all Integer Tuple Sketch aggregations
+ *
+ * Note that it only supports BYTES columns containing serialized sketches currently, but could be expanded to more
+ */
+public class IntegerTupleSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, Comparable> {
+  final ExpressionContext _expressionContext;
+  final IntegerSummarySetOperations _setOps;
+  final int _entries;
+
+  public IntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments.get(0));
+    _expressionContext = arguments.get(0);
+    _setOps = new IntegerSummarySetOperations(mode, mode);
+    if (arguments.size() == 2) {
+      _entries = ((Long) arguments.get(1).getLiteral().getValue()).intValue();
+    } else {
+      _entries = (int) Math.pow(2, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+    }
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized Integer Tuple Sketch
+    FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+    if (storedType == FieldSpec.DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        List<CompactSketch<IntegerSummary>> integerSketch = aggregationResultHolder.getResult();
+        if (integerSketch != null) {
+          List<CompactSketch<IntegerSummary>> sketches =
+              Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
+                  .map(Sketch::compact).collect(Collectors.toList());
+          aggregationResultHolder.setValue(merge(aggregationResultHolder.getResult(), sketches));
+        } else {
+          List<CompactSketch<IntegerSummary>> sketches =
+              Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
+                  .map(Sketch::compact).collect(Collectors.toList());
+          aggregationResultHolder.setValue(sketches);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging Tuple Sketches", e);
+      }
+    } else {
+      throw new IllegalStateException("Illegal data type for " + getType() + " aggregation function: " + storedType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized Integer Tuple Sketch
+    FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+
+    if (storedType == FieldSpec.DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        for (int i = 0; i < length; i++) {
+          byte[] value = bytesValues[i];
+          int groupKey = groupKeyArray[i];
+          CompactSketch<IntegerSummary> newSketch =
+              ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
+          if (groupByResultHolder.getResult(groupKey) == null) {
+            ArrayList<CompactSketch<IntegerSummary>> newList = new ArrayList<>();
+            newList.add(newSketch);
+            groupByResultHolder.setValueForKey(groupKey, newList);
+          } else {
+            groupByResultHolder.<List<CompactSketch<IntegerSummary>>>getResult(groupKey).add(newSketch);
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging Tuple Sketches", e);

Review Comment:
   I guess this is groupBy and not merging tuple sketches ? 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/***
+ * This is the base class for all Integer Tuple Sketch aggregations
+ *
+ * Note that it only supports BYTES columns containing serialized sketches currently, but could be expanded to more
+ */
+public class IntegerTupleSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, Comparable> {
+  final ExpressionContext _expressionContext;
+  final IntegerSummarySetOperations _setOps;
+  final int _entries;
+
+  public IntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments.get(0));
+    _expressionContext = arguments.get(0);
+    _setOps = new IntegerSummarySetOperations(mode, mode);
+    if (arguments.size() == 2) {
+      _entries = ((Long) arguments.get(1).getLiteral().getValue()).intValue();
+    } else {
+      _entries = (int) Math.pow(2, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+    }
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized Integer Tuple Sketch
+    FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+    if (storedType == FieldSpec.DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        List<CompactSketch<IntegerSummary>> integerSketch = aggregationResultHolder.getResult();
+        if (integerSketch != null) {
+          List<CompactSketch<IntegerSummary>> sketches =
+              Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
+                  .map(Sketch::compact).collect(Collectors.toList());
+          aggregationResultHolder.setValue(merge(aggregationResultHolder.getResult(), sketches));
+        } else {
+          List<CompactSketch<IntegerSummary>> sketches =
+              Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
+                  .map(Sketch::compact).collect(Collectors.toList());
+          aggregationResultHolder.setValue(sketches);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging Tuple Sketches", e);
+      }
+    } else {
+      throw new IllegalStateException("Illegal data type for " + getType() + " aggregation function: " + storedType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized Integer Tuple Sketch
+    FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+
+    if (storedType == FieldSpec.DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        for (int i = 0; i < length; i++) {
+          byte[] value = bytesValues[i];
+          int groupKey = groupKeyArray[i];
+          CompactSketch<IntegerSummary> newSketch =
+              ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
+          if (groupByResultHolder.getResult(groupKey) == null) {
+            ArrayList<CompactSketch<IntegerSummary>> newList = new ArrayList<>();
+            newList.add(newSketch);
+            groupByResultHolder.setValueForKey(groupKey, newList);
+          } else {
+            groupByResultHolder.<List<CompactSketch<IntegerSummary>>>getResult(groupKey).add(newSketch);
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging Tuple Sketches", e);
+      }
+    } else {
+      throw new IllegalStateException(
+          "Illegal data type for INTEGER_TUPLE_SKETCH_UNION aggregation function: " + storedType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    byte[][] valueArray = blockValSetMap.get(_expression).getBytesValuesSV();
+    for (int i = 0; i < length; i++) {
+      byte[] value = valueArray[i];
+      CompactSketch<IntegerSummary> newSketch =
+          ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
+      for (int groupKey : groupKeysArray[i]) {

Review Comment:
   This looks exactly the same as aggregateGroupBySV except that we iterate over group keys as it can be multivalued ? 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {
+
+  public SumValuesIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.SUMVALUESINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    double retainedTotal = 0L;
+    SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+    while (summaries.next()) {
+      retainedTotal += summaries.getSummary().getValue();
+    }
+    double estimate = retainedTotal / union.getResult().getRetainedEntries() * union.getResult().getEstimate();
+    return Double.valueOf(estimate).longValue();
+  }

Review Comment:
   I had the same question as well :). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1140394504


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class AvgIntegerTupleSketchAggregationFunction
+    extends IntegerTupleSketchAggregationFunction {
+
+  public AvgIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.AVGVALUEINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    double retainedTotal = 0L;
+    SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+    while (summaries.next()) {
+      retainedTotal += summaries.getSummary().getValue();
+    }
+    double estimate = retainedTotal / union.getResult().getRetainedEntries();
+    return Double.valueOf(estimate).longValue();

Review Comment:
   it may do yes, I will cache that in a value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1140437330


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+public class IntegerTupleSketchValueAggregator implements ValueAggregator<byte[], Sketch<IntegerSummary>> {

Review Comment:
   it may need to be Object, this was a good catch, doing more local testing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1139030108


##########
pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java:
##########
@@ -131,4 +133,49 @@ public static byte[] toHLL(@Nullable Object input, int log2m) {
     }
     return ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize(hll);
   }
+
+  /**
+   * Create a Tuple Sketch containing the key and value supplied
+   *
+   * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
+   * @param value an Integer we want to associate as the value to go along with the key, may be null to return an
+   *              empty sketch
+   * @return serialized tuple sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toIntegerSumTupleSketch(@Nullable Object key, @Nullable Integer value) {
+    return toIntegerSumTupleSketch(key, value, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+  }
+
+  /**
+   * Create a Tuple Sketch containing the key and value supplied
+   *
+   * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
+   * @param value an Integer we want to associate as the value to go along with the key, may be null to return an
+   *              empty sketch
+   * @param lgK integer representing the log of the maximum number of retained entries in the sketch, between 4 and 26
+   * @return serialized tuple sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toIntegerSumTupleSketch(@Nullable Object key, Integer value, int lgK) {
+    IntegerSketch is = new IntegerSketch(lgK, IntegerSummary.Mode.Sum);
+    if (value != null) {
+      if (key instanceof Integer) {

Review Comment:
   the key can be null, since both `key` and `value` are expected to be columns, if either is null then it'll output an empty sketch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] swaminathanmanish commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "swaminathanmanish (via GitHub)" <gi...@apache.org>.
swaminathanmanish commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1139037004


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java:
##########
@@ -66,6 +67,11 @@ public static ValueAggregator getValueAggregator(AggregationFunctionType aggrega
       case DISTINCTCOUNTTHETASKETCH:
       case DISTINCTCOUNTRAWTHETASKETCH:
         return new DistinctCountThetaSketchValueAggregator();
+      case DISTINCTCOUNTTUPLESKETCH:
+      case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+      case AVGVALUEINTEGERSUMTUPLESKETCH:
+      case SUMVALUESINTEGERSUMTUPLESKETCH:
+        return new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);

Review Comment:
   For AVGVALUEINTEGERSUMTUPLESKETCH , is the plan to support 'AvgValue' mode, later ? Looks like IntegerSummary does not support Avg now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1139040907


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java:
##########
@@ -66,6 +67,11 @@ public static ValueAggregator getValueAggregator(AggregationFunctionType aggrega
       case DISTINCTCOUNTTHETASKETCH:
       case DISTINCTCOUNTRAWTHETASKETCH:
         return new DistinctCountThetaSketchValueAggregator();
+      case DISTINCTCOUNTTUPLESKETCH:
+      case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+      case AVGVALUEINTEGERSUMTUPLESKETCH:
+      case SUMVALUESINTEGERSUMTUPLESKETCH:
+        return new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);

Review Comment:
   these methods have `SumTupleSketch` in because they use `Sum` as their mode for combining summaries, `avgValue` here is performed afterwards, so it does an average of the sum per key
   
   it is possible to support other things like histograms or quantiles, but I only added `sumValues` and `avgValue` as the first two examples of how to use this sketch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] swaminathanmanish commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "swaminathanmanish (via GitHub)" <gi...@apache.org>.
swaminathanmanish commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1139024122


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+public class IntegerTupleSketchValueAggregator implements ValueAggregator<byte[], Sketch<IntegerSummary>> {
+  public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+
+  // This changes a lot similar to the Bitmap aggregator
+  private int _maxByteSize;
+
+  private final IntegerSummary.Mode _mode;
+
+  public IntegerTupleSketchValueAggregator(IntegerSummary.Mode mode) {
+    _mode = mode;
+  }
+
+  @Override
+  public AggregationFunctionType getAggregationType() {
+    return AggregationFunctionType.DISTINCTCOUNTTUPLESKETCH;
+  }
+
+  @Override
+  public DataType getAggregatedValueType() {
+    return AGGREGATED_VALUE_TYPE;
+  }
+
+  // Utility method to merge two sketches
+  private Sketch<IntegerSummary> union(Sketch<IntegerSummary> a, Sketch<IntegerSummary> b) {
+    return new Union<>(new IntegerSummarySetOperations(_mode, _mode)).union(a, b);
+  }
+
+  @Override
+  public Sketch<IntegerSummary> getInitialAggregatedValue(byte[] rawValue) {
+    Sketch<IntegerSummary> initialValue = deserializeAggregatedValue(rawValue);
+    _maxByteSize = Math.max(_maxByteSize, rawValue.length);
+    return initialValue;
+  }
+
+  @Override
+  public Sketch<IntegerSummary> applyRawValue(Sketch<IntegerSummary> value, byte[] rawValue) {
+    Sketch<IntegerSummary> right = deserializeAggregatedValue(rawValue);
+    Sketch<IntegerSummary> result = union(value, right).compact();
+    _maxByteSize = Math.max(_maxByteSize, result.toByteArray().length);
+    return result;
+  }
+
+  @Override
+  public Sketch<IntegerSummary> applyAggregatedValue(Sketch<IntegerSummary> value,
+      Sketch<IntegerSummary> aggregatedValue) {
+    Sketch<IntegerSummary> result = union(value, aggregatedValue);
+    _maxByteSize = Math.max(_maxByteSize, result.toByteArray().length);
+    return result;
+  }
+
+  @Override
+  public Sketch<IntegerSummary> cloneAggregatedValue(Sketch<IntegerSummary> value) {
+    return deserializeAggregatedValue(serializeAggregatedValue(value));
+  }
+
+  @Override
+  public int getMaxAggregatedValueByteSize() {
+    return _maxByteSize;
+  }
+
+  @Override
+  public byte[] serializeAggregatedValue(Sketch<IntegerSummary> value) {
+    return CustomSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(value);

Review Comment:
   Just curious to know if there's a reason why we have 2 ser/deser utilities (CustomSerDeUtils, ObjectSerDeUtils) ?  @Jackie-Jiang 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1139467007


##########
pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java:
##########
@@ -213,6 +216,8 @@ public static ObjectType getObjectType(Object value) {
         return ObjectType.VarianceTuple;
       } else if (value instanceof PinotFourthMoment) {
         return ObjectType.PinotFourthMoment;
+      } else if (value instanceof org.apache.datasketches.tuple.Sketch) {
+        return ObjectType.IntegerTupleSketch;

Review Comment:
   right now it is, but to add other types of tuple Sketch we'd need to add wrapper types



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on pull request #10427: Tuple sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on PR #10427:
URL: https://github.com/apache/pinot/pull/10427#issuecomment-1470443240

   I could do with some advice on the best place to add tests for the aggregation functions, I've been looking through the existing tests and can't find anywhere suitable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1179223584


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/***
+ * This is the base class for all Integer Tuple Sketch aggregations
+ *
+ * Note that it only supports BYTES columns containing serialized sketches currently, but could be expanded to more
+ */
+public class IntegerTupleSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, Comparable> {
+  final ExpressionContext _expressionContext;
+  final IntegerSummarySetOperations _setOps;
+  final int _entries;
+
+  public IntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments.get(0));
+    _expressionContext = arguments.get(0);
+    _setOps = new IntegerSummarySetOperations(mode, mode);
+    if (arguments.size() == 2) {
+      _entries = ((Long) arguments.get(1).getLiteral().getValue()).intValue();
+    } else {
+      _entries = (int) Math.pow(2, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+    }
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized Integer Tuple Sketch
+    FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+    if (storedType == FieldSpec.DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        List<CompactSketch<IntegerSummary>> integerSketch = aggregationResultHolder.getResult();
+        if (integerSketch != null) {
+          List<CompactSketch<IntegerSummary>> sketches =
+              Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
+                  .map(Sketch::compact).collect(Collectors.toList());
+          aggregationResultHolder.setValue(merge(aggregationResultHolder.getResult(), sketches));
+        } else {
+          List<CompactSketch<IntegerSummary>> sketches =
+              Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
+                  .map(Sketch::compact).collect(Collectors.toList());
+          aggregationResultHolder.setValue(sketches);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging Tuple Sketches", e);
+      }
+    } else {
+      throw new IllegalStateException("Illegal data type for " + getType() + " aggregation function: " + storedType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized Integer Tuple Sketch
+    FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+
+    if (storedType == FieldSpec.DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        for (int i = 0; i < length; i++) {
+          byte[] value = bytesValues[i];
+          int groupKey = groupKeyArray[i];
+          CompactSketch<IntegerSummary> newSketch =
+              ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
+          if (groupByResultHolder.getResult(groupKey) == null) {
+            ArrayList<CompactSketch<IntegerSummary>> newList = new ArrayList<>();
+            newList.add(newSketch);
+            groupByResultHolder.setValueForKey(groupKey, newList);
+          } else {
+            groupByResultHolder.<List<CompactSketch<IntegerSummary>>>getResult(groupKey).add(newSketch);
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging Tuple Sketches", e);
+      }
+    } else {
+      throw new IllegalStateException(
+          "Illegal data type for INTEGER_TUPLE_SKETCH_UNION aggregation function: " + storedType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    byte[][] valueArray = blockValSetMap.get(_expression).getBytesValuesSV();
+    for (int i = 0; i < length; i++) {
+      byte[] value = valueArray[i];
+      CompactSketch<IntegerSummary> newSketch =
+          ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
+      for (int groupKey : groupKeysArray[i]) {
+        if (groupByResultHolder.getResult(groupKey) == null) {
+          groupByResultHolder.setValueForKey(groupKey, Collections.singletonList(newSketch));
+        } else {
+          groupByResultHolder.<List<CompactSketch<IntegerSummary>>>getResult(groupKey).add(newSketch);
+        }
+      }
+    }
+  }
+
+  @Override
+  public List<CompactSketch<IntegerSummary>> extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    return aggregationResultHolder.getResult();
+  }
+
+  @Override
+  public List<CompactSketch<IntegerSummary>> extractGroupByResult(GroupByResultHolder groupByResultHolder,
+      int groupKey) {
+    return groupByResultHolder.getResult(groupKey);
+  }
+
+  @Override
+  public List<CompactSketch<IntegerSummary>> merge(List<CompactSketch<IntegerSummary>> intermediateResult1,
+      List<CompactSketch<IntegerSummary>> intermediateResult2) {
+    if (intermediateResult1 == null && intermediateResult2 != null) {
+      return intermediateResult2;
+    } else if (intermediateResult1 != null && intermediateResult2 == null) {
+      return intermediateResult1;
+    } else if (intermediateResult1 == null && intermediateResult2 == null) {
+      return new ArrayList<>(0);
+    }
+    ArrayList<CompactSketch<IntegerSummary>> merged =
+        new ArrayList<>(intermediateResult1.size() + intermediateResult2.size());
+    merged.addAll(intermediateResult1);

Review Comment:
   this is an optimisation similar to the one used in the Theta version, where merges can be quite expensive, and it's better to delay the merge til we have a lot of sketches to combine, hence using `List` as the intermediate type



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #10427: Integer Tuple Sketch support

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #10427:
URL: https://github.com/apache/pinot/pull/10427#issuecomment-1470535255

   ## [Codecov](https://codecov.io/gh/apache/pinot/pull/10427?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#10427](https://codecov.io/gh/apache/pinot/pull/10427?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (bb8054f) into [master](https://codecov.io/gh/apache/pinot/commit/856503256fa55ec5ff7e48f3beea9fea88a9a406?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8565032) will **decrease** coverage by `45.97%`.
   > The diff coverage is `2.17%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #10427       +/-   ##
   =============================================
   - Coverage     70.29%   24.33%   -45.97%     
   + Complexity     6105       49     -6056     
   =============================================
     Files          2053     2034       -19     
     Lines        111397   110810      -587     
     Branches      16939    16867       -72     
   =============================================
   - Hits          78308    26965    -51343     
   - Misses        27583    81026    +53443     
   + Partials       5506     2819     -2687     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `24.33% <2.17%> (-0.06%)` | :arrow_down: |
   | integration2 | `?` | |
   | unittests1 | `?` | |
   | unittests2 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/10427?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...he/pinot/core/function/scalar/SketchFunctions.java](https://codecov.io/gh/apache/pinot/pull/10427?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9mdW5jdGlvbi9zY2FsYXIvU2tldGNoRnVuY3Rpb25zLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...gregation/function/AggregationFunctionFactory.java](https://codecov.io/gh/apache/pinot/pull/10427?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9BZ2dyZWdhdGlvbkZ1bmN0aW9uRmFjdG9yeS5qYXZh) | `10.66% <0.00%> (-71.77%)` | :arrow_down: |
   | [...tion/AvgIntegerTupleSketchAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/10427?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9BdmdJbnRlZ2VyVHVwbGVTa2V0Y2hBZ2dyZWdhdGlvbkZ1bmN0aW9uLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...nctCountIntegerTupleSketchAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/10427?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9EaXN0aW5jdENvdW50SW50ZWdlclR1cGxlU2tldGNoQWdncmVnYXRpb25GdW5jdGlvbi5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...unction/IntegerTupleSketchAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/10427?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9JbnRlZ2VyVHVwbGVTa2V0Y2hBZ2dyZWdhdGlvbkZ1bmN0aW9uLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...umValuesIntegerTupleSketchAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/10427?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9TdW1WYWx1ZXNJbnRlZ2VyVHVwbGVTa2V0Y2hBZ2dyZWdhdGlvbkZ1bmN0aW9uLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...ssing/aggregator/IntegerTupleSketchAggregator.java](https://codecov.io/gh/apache/pinot/pull/10427?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9zZWdtZW50L3Byb2Nlc3NpbmcvYWdncmVnYXRvci9JbnRlZ2VyVHVwbGVTa2V0Y2hBZ2dyZWdhdG9yLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [.../processing/aggregator/ValueAggregatorFactory.java](https://codecov.io/gh/apache/pinot/pull/10427?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9zZWdtZW50L3Byb2Nlc3NpbmcvYWdncmVnYXRvci9WYWx1ZUFnZ3JlZ2F0b3JGYWN0b3J5LmphdmE=) | `12.50% <0.00%> (-30.36%)` | :arrow_down: |
   | [.../aggregator/IntegerTupleSketchValueAggregator.java](https://codecov.io/gh/apache/pinot/pull/10427?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9hZ2dyZWdhdG9yL0ludGVnZXJUdXBsZVNrZXRjaFZhbHVlQWdncmVnYXRvci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...gment/local/aggregator/ValueAggregatorFactory.java](https://codecov.io/gh/apache/pinot/pull/10427?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9hZ2dyZWdhdG9yL1ZhbHVlQWdncmVnYXRvckZhY3RvcnkuamF2YQ==) | `0.00% <0.00%> (-85.72%)` | :arrow_down: |
   | ... and [4 more](https://codecov.io/gh/apache/pinot/pull/10427?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ... and [1529 files with indirect coverage changes](https://codecov.io/gh/apache/pinot/pull/10427/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1140347241


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+public class IntegerTupleSketchValueAggregator implements ValueAggregator<byte[], Sketch<IntegerSummary>> {

Review Comment:
   it can be but `Sketch` isn't thread-safe, and I swapped this to `byte[]` while hunting down some thread safety issues, I will see if I can swap it back now that I've made all the `Union` use thread safe



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1140463509


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class AvgIntegerTupleSketchAggregationFunction
+    extends IntegerTupleSketchAggregationFunction {
+
+  public AvgIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.AVGVALUEINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    double retainedTotal = 0L;
+    SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+    while (summaries.next()) {
+      retainedTotal += summaries.getSummary().getValue();
+    }
+    double estimate = retainedTotal / union.getResult().getRetainedEntries();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1140466817


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+public class IntegerTupleSketchValueAggregator implements ValueAggregator<byte[], Sketch<IntegerSummary>> {

Review Comment:
   have tested it more locally, it is fine being `byte[]` because we only handle aggregated sketches



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1179293708


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/***
+ * This is the base class for all Integer Tuple Sketch aggregations
+ *
+ * Note that it only supports BYTES columns containing serialized sketches currently, but could be expanded to more
+ */
+public class IntegerTupleSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, Comparable> {
+  final ExpressionContext _expressionContext;
+  final IntegerSummarySetOperations _setOps;
+  final int _entries;
+
+  public IntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments.get(0));
+    _expressionContext = arguments.get(0);

Review Comment:
   added a couple here, manually testing them locally since I can't find anywhere good to add unit tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1139040907


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java:
##########
@@ -66,6 +67,11 @@ public static ValueAggregator getValueAggregator(AggregationFunctionType aggrega
       case DISTINCTCOUNTTHETASKETCH:
       case DISTINCTCOUNTRAWTHETASKETCH:
         return new DistinctCountThetaSketchValueAggregator();
+      case DISTINCTCOUNTTUPLESKETCH:
+      case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+      case AVGVALUEINTEGERSUMTUPLESKETCH:
+      case SUMVALUESINTEGERSUMTUPLESKETCH:
+        return new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);

Review Comment:
   these methods have `SumTupleSketch` in because they use `Sum` as their mode for combining summaries, `avgValue` here is performed afterwards, so it does an average of the sum per key



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1139468149


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {
+
+  public SumValuesIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.SUMVALUESINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    double retainedTotal = 0L;
+    SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+    while (summaries.next()) {
+      retainedTotal += summaries.getSummary().getValue();
+    }
+    double estimate = retainedTotal / union.getResult().getRetainedEntries() * union.getResult().getEstimate();
+    return Double.valueOf(estimate).longValue();
+  }

Review Comment:
   I can give that a go, I swapped it to all compact because I was having issues with the non-threadsafe nature of `Sketch`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1179408202


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java:
##########
@@ -299,13 +299,21 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS);
           case FOURTHMOMENT:
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT);
+          case DISTINCTCOUNTTUPLESKETCH:
+            // mode actually doesn't matter here because we only care about keys, not values
+            return new DistinctCountIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+            return new IntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case SUMVALUESINTEGERSUMTUPLESKETCH:
+            return new SumValuesIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case AVGVALUEINTEGERSUMTUPLESKETCH:
+            return new AvgIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);

Review Comment:
   I removed the `DistinctCount` prefixes from the other ones, so it's just `SUMVALUESINTEGERSUMTUPLESKETCH` and `AVGVALUEINTEGERSUMTUPLESKETCH` for the Sum and Average operations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1179224084


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/***
+ * This is the base class for all Integer Tuple Sketch aggregations
+ *
+ * Note that it only supports BYTES columns containing serialized sketches currently, but could be expanded to more
+ */
+public class IntegerTupleSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, Comparable> {
+  final ExpressionContext _expressionContext;
+  final IntegerSummarySetOperations _setOps;
+  final int _entries;
+
+  public IntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments.get(0));
+    _expressionContext = arguments.get(0);
+    _setOps = new IntegerSummarySetOperations(mode, mode);
+    if (arguments.size() == 2) {
+      _entries = ((Long) arguments.get(1).getLiteral().getValue()).intValue();
+    } else {
+      _entries = (int) Math.pow(2, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+    }
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized Integer Tuple Sketch
+    FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+    if (storedType == FieldSpec.DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        List<CompactSketch<IntegerSummary>> integerSketch = aggregationResultHolder.getResult();
+        if (integerSketch != null) {
+          List<CompactSketch<IntegerSummary>> sketches =
+              Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
+                  .map(Sketch::compact).collect(Collectors.toList());
+          aggregationResultHolder.setValue(merge(aggregationResultHolder.getResult(), sketches));
+        } else {
+          List<CompactSketch<IntegerSummary>> sketches =
+              Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
+                  .map(Sketch::compact).collect(Collectors.toList());
+          aggregationResultHolder.setValue(sketches);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging Tuple Sketches", e);
+      }
+    } else {
+      throw new IllegalStateException("Illegal data type for " + getType() + " aggregation function: " + storedType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized Integer Tuple Sketch
+    FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+
+    if (storedType == FieldSpec.DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        for (int i = 0; i < length; i++) {
+          byte[] value = bytesValues[i];
+          int groupKey = groupKeyArray[i];
+          CompactSketch<IntegerSummary> newSketch =
+              ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
+          if (groupByResultHolder.getResult(groupKey) == null) {
+            ArrayList<CompactSketch<IntegerSummary>> newList = new ArrayList<>();
+            newList.add(newSketch);
+            groupByResultHolder.setValueForKey(groupKey, newList);
+          } else {
+            groupByResultHolder.<List<CompactSketch<IntegerSummary>>>getResult(groupKey).add(newSketch);
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging Tuple Sketches", e);
+      }
+    } else {
+      throw new IllegalStateException(
+          "Illegal data type for INTEGER_TUPLE_SKETCH_UNION aggregation function: " + storedType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    byte[][] valueArray = blockValSetMap.get(_expression).getBytesValuesSV();
+    for (int i = 0; i < length; i++) {
+      byte[] value = valueArray[i];
+      CompactSketch<IntegerSummary> newSketch =
+          ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
+      for (int groupKey : groupKeysArray[i]) {

Review Comment:
   yup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1140392678


##########
pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java:
##########
@@ -918,6 +923,28 @@ public Sketch deserialize(ByteBuffer byteBuffer) {
     }
   };
 
+  public static final ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>> DATA_SKETCH_INT_TUPLE_SER_DE =
+      new ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>>() {
+        @Override
+        public byte[] serialize(org.apache.datasketches.tuple.Sketch<IntegerSummary> value) {
+          return value.compact().toByteArray();
+        }
+
+        @Override
+        public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(byte[] bytes) {
+          return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+              new IntegerSummaryDeserializer());
+        }
+
+        @Override
+        public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(ByteBuffer byteBuffer) {
+          byte[] bytes = new byte[byteBuffer.remaining()];
+          byteBuffer.get(bytes);
+          return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+              new IntegerSummaryDeserializer());

Review Comment:
   the one for theta sketches moved it into a byte[] first, so I was following that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1139467007


##########
pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java:
##########
@@ -213,6 +216,8 @@ public static ObjectType getObjectType(Object value) {
         return ObjectType.VarianceTuple;
       } else if (value instanceof PinotFourthMoment) {
         return ObjectType.PinotFourthMoment;
+      } else if (value instanceof org.apache.datasketches.tuple.Sketch) {
+        return ObjectType.IntegerTupleSketch;

Review Comment:
   right now it is, but to add other types of tuple Sketch we'd need to add wrapper types, due to JVM type erasure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] swaminathanmanish commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "swaminathanmanish (via GitHub)" <gi...@apache.org>.
swaminathanmanish commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1142155296


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java:
##########
@@ -299,13 +299,21 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS);
           case FOURTHMOMENT:
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT);
+          case DISTINCTCOUNTTUPLESKETCH:
+            // mode actually doesn't matter here because we only care about keys, not values
+            return new DistinctCountIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);

Review Comment:
   Ok, so there can be functions that can use other summary modes (min, max..) in the future. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] swaminathanmanish commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "swaminathanmanish (via GitHub)" <gi...@apache.org>.
swaminathanmanish commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1139002821


##########
pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java:
##########
@@ -131,4 +133,49 @@ public static byte[] toHLL(@Nullable Object input, int log2m) {
     }
     return ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize(hll);
   }
+
+  /**
+   * Create a Tuple Sketch containing the key and value supplied
+   *
+   * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
+   * @param value an Integer we want to associate as the value to go along with the key, may be null to return an
+   *              empty sketch
+   * @return serialized tuple sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toIntegerSumTupleSketch(@Nullable Object key, @Nullable Integer value) {
+    return toIntegerSumTupleSketch(key, value, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+  }
+
+  /**
+   * Create a Tuple Sketch containing the key and value supplied
+   *
+   * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
+   * @param value an Integer we want to associate as the value to go along with the key, may be null to return an
+   *              empty sketch
+   * @param lgK integer representing the log of the maximum number of retained entries in the sketch, between 4 and 26
+   * @return serialized tuple sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toIntegerSumTupleSketch(@Nullable Object key, Integer value, int lgK) {
+    IntegerSketch is = new IntegerSketch(lgK, IntegerSummary.Mode.Sum);
+    if (value != null) {
+      if (key instanceof Integer) {

Review Comment:
   Can the key be null ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1140344595


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -96,6 +96,9 @@ public static class Helix {
     // https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html
     public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536;
 
+
+    public static final int DEFAULT_TUPLE_SKETCH_LGK = 16;

Review Comment:
   I'll add a comment, it's the same as the theta one above, but log 2, I will add a comment



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -96,6 +96,9 @@ public static class Helix {
     // https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html
     public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536;
 
+
+    public static final int DEFAULT_TUPLE_SKETCH_LGK = 16;

Review Comment:
   I'll add a comment, it's the same as the theta one above, but log 2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1140451280


##########
pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java:
##########
@@ -918,6 +923,28 @@ public Sketch deserialize(ByteBuffer byteBuffer) {
     }
   };
 
+  public static final ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>> DATA_SKETCH_INT_TUPLE_SER_DE =
+      new ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>>() {
+        @Override
+        public byte[] serialize(org.apache.datasketches.tuple.Sketch<IntegerSummary> value) {
+          return value.compact().toByteArray();
+        }
+
+        @Override
+        public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(byte[] bytes) {
+          return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+              new IntegerSummaryDeserializer());
+        }
+
+        @Override
+        public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(ByteBuffer byteBuffer) {
+          byte[] bytes = new byte[byteBuffer.remaining()];
+          byteBuffer.get(bytes);
+          return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+              new IntegerSummaryDeserializer());

Review Comment:
   I tried wrapping the bytebuffer directly and a lot of things broke, it is required to move it out into a byte[]



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1139467660


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java:
##########
@@ -299,13 +299,21 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS);
           case FOURTHMOMENT:
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT);
+          case DISTINCTCOUNTTUPLESKETCH:
+            // mode actually doesn't matter here because we only care about keys, not values
+            return new DistinctCountIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+            return new IntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case SUMVALUESINTEGERSUMTUPLESKETCH:
+            return new SumValuesIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case AVGVALUEINTEGERSUMTUPLESKETCH:
+            return new AvgIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);

Review Comment:
   yeah that is currently like that to line up with the other functions, maybe `UNION_SUM_TUPLE_SKETCH` would be cleaner



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java:
##########
@@ -299,13 +299,21 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS);
           case FOURTHMOMENT:
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT);
+          case DISTINCTCOUNTTUPLESKETCH:
+            // mode actually doesn't matter here because we only care about keys, not values
+            return new DistinctCountIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+            return new IntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case SUMVALUESINTEGERSUMTUPLESKETCH:
+            return new SumValuesIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case AVGVALUEINTEGERSUMTUPLESKETCH:
+            return new AvgIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);

Review Comment:
   yeah that is currently like that to line up with the other functions, maybe `UNION_INTEGER_SUM_TUPLE_SKETCH` would be cleaner



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1139468898


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java:
##########
@@ -299,13 +299,21 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS);
           case FOURTHMOMENT:
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT);
+          case DISTINCTCOUNTTUPLESKETCH:
+            // mode actually doesn't matter here because we only care about keys, not values
+            return new DistinctCountIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);

Review Comment:
   that is the mode for `IntegerSummary` merging, all of these use `Sum`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1140397359


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class AvgIntegerTupleSketchAggregationFunction
+    extends IntegerTupleSketchAggregationFunction {
+
+  public AvgIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.AVGVALUEINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    double retainedTotal = 0L;
+    SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+    while (summaries.next()) {
+      retainedTotal += summaries.getSummary().getValue();
+    }
+    double estimate = retainedTotal / union.getResult().getRetainedEntries();
+    return Double.valueOf(estimate).longValue();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mayankshriv merged pull request #10427: Integer Tuple Sketch support

Posted by "mayankshriv (via GitHub)" <gi...@apache.org>.
mayankshriv merged PR #10427:
URL: https://github.com/apache/pinot/pull/10427


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] andimiller commented on a diff in pull request #10427: Integer Tuple Sketch support

Posted by "andimiller (via GitHub)" <gi...@apache.org>.
andimiller commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1179371002


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/***
+ * This is the base class for all Integer Tuple Sketch aggregations
+ *
+ * Note that it only supports BYTES columns containing serialized sketches currently, but could be expanded to more
+ */
+public class IntegerTupleSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, Comparable> {
+  final ExpressionContext _expressionContext;
+  final IntegerSummarySetOperations _setOps;
+  final int _entries;
+
+  public IntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments.get(0));
+    _expressionContext = arguments.get(0);

Review Comment:
   tested locally and it's doing what I expected:
   ```json
   "message": "QueryExecutionError:\nQuery execution error on: Server_172.19.14.129_7050 org.apache.pinot.spi.exception.BadQueryRequestException: Invalid aggregation function: avgvalueintegersumtuplesketch(players,'true','false'); Reason: Tuple Sketch Aggregation Function expects at most 2 arguments, got: 3"
   ```
   ```json
   "message": "QueryExecutionError:\nQuery execution error on: Server_172.19.14.129_7050 org.apache.pinot.spi.exception.BadQueryRequestException: Invalid aggregation function: avgvalueintegersumtuplesketch(players,'true'); Reason: Tuple Sketch Aggregation Function expected the second argument to be a number of entries to keep, but it was of type BOOLEAN"
   ````



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org