You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "davecromberge (via GitHub)" <gi...@apache.org> on 2023/03/16 23:45:28 UTC

[GitHub] [pinot] davecromberge commented on a diff in pull request #10427: Integer Tuple Sketch support

davecromberge commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1139439869


##########
pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java:
##########
@@ -213,6 +216,8 @@ public static ObjectType getObjectType(Object value) {
         return ObjectType.VarianceTuple;
       } else if (value instanceof PinotFourthMoment) {
         return ObjectType.PinotFourthMoment;
+      } else if (value instanceof org.apache.datasketches.tuple.Sketch) {
+        return ObjectType.IntegerTupleSketch;

Review Comment:
   Is this a safe assumption?  Is it also necessary to inspect the summary type to verify integer?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {
+
+  public SumValuesIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.SUMVALUESINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    double retainedTotal = 0L;
+    SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+    while (summaries.next()) {
+      retainedTotal += summaries.getSummary().getValue();
+    }
+    double estimate = retainedTotal / union.getResult().getRetainedEntries() * union.getResult().getEstimate();

Review Comment:
   You could multiply by theta instead which should be equivalent to the number of retained entries divided by the estimate.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class AvgIntegerTupleSketchAggregationFunction
+    extends IntegerTupleSketchAggregationFunction {
+
+  public AvgIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.AVGVALUEINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    double retainedTotal = 0L;
+    SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+    while (summaries.next()) {
+      retainedTotal += summaries.getSummary().getValue();
+    }
+    double estimate = retainedTotal / union.getResult().getRetainedEntries();
+    return Double.valueOf(estimate).longValue();

Review Comment:
   Does calling getResult() on the union multiple times recompute the result?



##########
pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java:
##########
@@ -918,6 +923,28 @@ public Sketch deserialize(ByteBuffer byteBuffer) {
     }
   };
 
+  public static final ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>> DATA_SKETCH_INT_TUPLE_SER_DE =
+      new ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>>() {
+        @Override
+        public byte[] serialize(org.apache.datasketches.tuple.Sketch<IntegerSummary> value) {
+          return value.compact().toByteArray();
+        }
+
+        @Override
+        public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(byte[] bytes) {
+          return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+              new IntegerSummaryDeserializer());
+        }
+
+        @Override
+        public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(ByteBuffer byteBuffer) {
+          byte[] bytes = new byte[byteBuffer.remaining()];
+          byteBuffer.get(bytes);
+          return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+              new IntegerSummaryDeserializer());

Review Comment:
   The Datasketches Memory can also wrap a byte buffer directly.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {
+
+  public SumValuesIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.SUMVALUESINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    double retainedTotal = 0L;
+    SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+    while (summaries.next()) {
+      retainedTotal += summaries.getSummary().getValue();
+    }
+    double estimate = retainedTotal / union.getResult().getRetainedEntries() * union.getResult().getEstimate();
+    return Double.valueOf(estimate).longValue();
+  }

Review Comment:
   Does the serde always deserialise bytes to a compact sketch?  It could be better to use the base `Sketch` abstraction for cases where the sketches have been created outside the system and not compacted.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java:
##########
@@ -299,13 +299,21 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS);
           case FOURTHMOMENT:
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT);
+          case DISTINCTCOUNTTUPLESKETCH:
+            // mode actually doesn't matter here because we only care about keys, not values
+            return new DistinctCountIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+            return new IntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case SUMVALUESINTEGERSUMTUPLESKETCH:
+            return new SumValuesIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case AVGVALUEINTEGERSUMTUPLESKETCH:
+            return new AvgIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);

Review Comment:
   `DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH` is consistent with the current naming scheme but it's somewhat redundant to have the `DISTINCTCOUNT` prefix since it's clear that we are using a tuple (or theta) sketch.    This is just a subjective remark and not necessary to change it since this is consistent with the theta naming.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class AvgIntegerTupleSketchAggregationFunction
+    extends IntegerTupleSketchAggregationFunction {
+
+  public AvgIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.AVGVALUEINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    double retainedTotal = 0L;
+    SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+    while (summaries.next()) {
+      retainedTotal += summaries.getSummary().getValue();
+    }
+    double estimate = retainedTotal / union.getResult().getRetainedEntries();

Review Comment:
   It could be a good idea to first check if the the union is empty, otherwise you may encounter division by zero.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/***
+ * This is the base class for all Integer Tuple Sketch aggregations
+ *
+ * Note that it only supports BYTES columns containing serialized sketches currently, but could be expanded to more
+ */
+public class IntegerTupleSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, Comparable> {
+  ExpressionContext _expressionContext;
+  IntegerSummarySetOperations _setOps;
+  int _entries;

Review Comment:
   Would these be final if they are only set in the constructor?



##########
pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java:
##########
@@ -131,4 +133,49 @@ public static byte[] toHLL(@Nullable Object input, int log2m) {
     }
     return ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize(hll);
   }
+
+  /**
+   * Create a Tuple Sketch containing the key and value supplied
+   *
+   * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
+   * @param value an Integer we want to associate as the value to go along with the key, may be null to return an
+   *              empty sketch
+   * @return serialized tuple sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toIntegerSumTupleSketch(@Nullable Object key, @Nullable Integer value) {
+    return toIntegerSumTupleSketch(key, value, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+  }
+
+  /**
+   * Create a Tuple Sketch containing the key and value supplied
+   *
+   * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
+   * @param value an Integer we want to associate as the value to go along with the key, may be null to return an
+   *              empty sketch
+   * @param lgK integer representing the log of the maximum number of retained entries in the sketch, between 4 and 26
+   * @return serialized tuple sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toIntegerSumTupleSketch(@Nullable Object key, Integer value, int lgK) {
+    IntegerSketch is = new IntegerSketch(lgK, IntegerSummary.Mode.Sum);
+    if (value != null) {
+      if (key instanceof Integer) {

Review Comment:
   The tradeoff with this decision is that we may lose the "key" from a distinct count for a null value, but I don't see a sensible way around this beside defaulting to some sentinel which may not be applicable to all use cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org