You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "Jackie-Jiang (via GitHub)" <gi...@apache.org> on 2023/07/11 22:57:40 UTC

[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10926: Realtime pre-aggregation for Distinct Count HLL & Big Decimal

Jackie-Jiang commented on code in PR #10926:
URL: https://github.com/apache/pinot/pull/10926#discussion_r1260315154


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java:
##########
@@ -28,11 +33,31 @@
 
 public class DistinctCountHLLValueAggregator implements ValueAggregator<Object, HyperLogLog> {
   public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
-  private static final int DEFAULT_LOG2M_BYTE_SIZE = 180;
-
+  private int _log2m = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M;
+  private int _log2mByteSize = 180;
   // Byte size won't change once we get the initial aggregated value
   private int _maxByteSize;
 
+  public DistinctCountHLLValueAggregator() {
+  }
+
+  public DistinctCountHLLValueAggregator(List<ExpressionContext> arguments) {
+    // length 1 means we use the default _log2m of 8
+    if (arguments.size() <= 1) {
+      return;
+    }
+
+    try {
+      String log2mLit = arguments.get(1).getLiteral().getStringValue();
+      Preconditions.checkState(StringUtils.isNumeric(log2mLit), "log2m argument must be a numeric literal");
+
+      _log2m = Integer.parseInt(log2mLit);

Review Comment:
   (minor)
   ```suggestion
         _log2m = arguments.get(1).getLiteral().getIntValue();
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -255,6 +255,14 @@ public boolean isMutableSegment() {
 
     // Initialize for each column
     for (FieldSpec fieldSpec : _physicalFieldSpecs) {
+      String fieldSpecName = fieldSpec.getName();
+      if (metricsAggregators.containsKey(fieldSpecName)) {
+        int maxLength = metricsAggregators.get(fieldSpecName).getRight().getMaxAggregatedValueByteSize();
+        if (maxLength > 0) {
+          fieldSpec.setMaxLength(maxLength);

Review Comment:
   This will update the original schema, and can potentially cause unexpected behavior. Ideally schema should not be modified 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java:
##########
@@ -142,7 +143,8 @@ static class Record {
     for (AggregationFunctionColumnPair functionColumnPair : functionColumnPairs) {
       _metrics[index] = functionColumnPair.toColumnName();
       _functionColumnPairs[index] = functionColumnPair;
-      _valueAggregators[index] = ValueAggregatorFactory.getValueAggregator(functionColumnPair.getFunctionType());
+      _valueAggregators[index] =
+          ValueAggregatorFactory.getValueAggregator(functionColumnPair.getFunctionType(), Collections.EMPTY_LIST);

Review Comment:
   (nit)
   ```suggestion
             ValueAggregatorFactory.getValueAggregator(functionColumnPair.getFunctionType(), Collections.emptyList());
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java:
##########
@@ -100,6 +124,9 @@ public byte[] serializeAggregatedValue(HyperLogLog value) {
 
   @Override
   public HyperLogLog deserializeAggregatedValue(byte[] bytes) {
+    if (bytes == null || bytes.length == 0) {
+      return new HyperLogLog(_log2m);
+    }

Review Comment:
   I don't follow. Even if it is used in `getInitialAggregatedValue()`, the input should never be null or empty. Are we trying to handle invalid input data (e.g. empty byte array)? If so, the handling should be added to `getInitialAggregatedValue()` and `applyRawValue()` instead of here



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java:
##########
@@ -28,11 +33,31 @@
 
 public class DistinctCountHLLValueAggregator implements ValueAggregator<Object, HyperLogLog> {
   public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
-  private static final int DEFAULT_LOG2M_BYTE_SIZE = 180;
-
+  private int _log2m = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M;
+  private int _log2mByteSize = 180;
   // Byte size won't change once we get the initial aggregated value
   private int _maxByteSize;
 
+  public DistinctCountHLLValueAggregator() {
+  }
+
+  public DistinctCountHLLValueAggregator(List<ExpressionContext> arguments) {
+    // length 1 means we use the default _log2m of 8
+    if (arguments.size() <= 1) {
+      return;
+    }
+
+    try {
+      String log2mLit = arguments.get(1).getLiteral().getStringValue();
+      Preconditions.checkState(StringUtils.isNumeric(log2mLit), "log2m argument must be a numeric literal");
+
+      _log2m = Integer.parseInt(log2mLit);
+      _log2mByteSize = (new HyperLogLog(_log2m)).getBytes().length;

Review Comment:
   We can add a util to get the byte size without serializing:
   `byteSize = (RegisterSet.getSizeForCount(1 << log2m) + 2) * Integer.BYTES`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumPrecisionValueAggregator.java:
##########
@@ -28,6 +32,29 @@ public class SumPrecisionValueAggregator implements ValueAggregator<Object, BigD
   public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
 
   private int _maxByteSize;
+  private int _fixedSize = -1;
+
+  public SumPrecisionValueAggregator() {
+  }
+
+  /*
+  Aggregate with a optimal maximum precision in mind. Scale is always only 1 32-bit
+  int and the storing of the scale value does not affect the size of the big decimal.
+  Given this, we won't care about scale in terms of the aggregations.
+  During query time, the optional scale parameter can be provided, but during aggregation,
+  we don't limit it.
+   */
+  public SumPrecisionValueAggregator(List<ExpressionContext> arguments) {
+    // length 1 means we don't have any caps on maximum precision nor do we have a fixed size then
+    if (arguments.size() <= 1) {
+      return;
+    }
+
+    String precision = arguments.get(1).getLiteral().getStringValue();
+    Preconditions.checkState(StringUtils.isNumeric(precision), "precision must be a numeric literal");
+
+    _fixedSize = BigDecimalUtils.byteSizeForFixedPrecision(Integer.parseInt(precision));

Review Comment:
   ```suggestion
       _fixedSize = BigDecimalUtils.byteSizeForFixedPrecision(arguments.get(1).getLiteral().getIntValue());
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -288,6 +296,7 @@ public boolean isMutableSegment() {
       // Only support generating raw index on single-value columns that do not have inverted index while
       // consuming. After consumption completes and the segment is built, all single-value columns can have raw index
 
+

Review Comment:
   (nit) extra empty line



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1211,17 +1230,29 @@ private static Map<String, Pair<String, ValueAggregator>> fromAggregationConfig(
           "aggregation function must be a function: %s", config);
       FunctionContext functionContext = expressionContext.getFunction();
       TableConfigUtils.validateIngestionAggregation(functionContext.getFunctionName());
-      Preconditions.checkState(functionContext.getArguments().size() == 1,
-          "aggregation function can only have one argument: %s", config);
+

Review Comment:
   IMO checking whether there are `>= 1` arguments is enough. Seems the check is already applied in the `TableConfigUtils`, so we can actually remove this check



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -378,12 +386,75 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
 
           FunctionContext functionContext = expressionContext.getFunction();
           validateIngestionAggregation(functionContext.getFunctionName());
-          Preconditions.checkState(functionContext.getArguments().size() == 1,
-              "aggregation function can only have one argument: %s", aggregationConfig);
+
+          List<ExpressionContext> arguments = functionContext.getArguments();
+
+          if (("distinctcounthll".equals(functionContext.getFunctionName()))

Review Comment:
   We need to use canonical name (removing underscore). Currently if the function name is `distinct_count_hll` or `sum_precision` it will fail



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteSVMutableForwardIndex.java:
##########
@@ -195,6 +205,22 @@ public void setDouble(int docId, double value) {
     getWriterForRow(docId).setDouble(docId, value);
   }
 
+  @Override
+  public byte[] getBytes(int docId) {
+    int bufferId = getBufferId(docId);
+    return _readers.get(bufferId).getBytes(docId);
+  }
+
+  @Override
+  public void setBytes(int docId, byte[] value) {
+    if (value.length != _valueSizeInBytes) {
+      throw new IllegalArgumentException("Expected value size to be " + _valueSizeInBytes + " but was " + value.length);
+    }

Review Comment:
   (minor)
   ```suggestion
       Preconditions.checkArgument(value.length == _valueSizeInBytes, "Expected value size to be: %s but got: %s ", _valueSizeInBytes, value.length);
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java:
##########
@@ -28,11 +33,31 @@
 
 public class DistinctCountHLLValueAggregator implements ValueAggregator<Object, HyperLogLog> {
   public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
-  private static final int DEFAULT_LOG2M_BYTE_SIZE = 180;
-
+  private int _log2m = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M;
+  private int _log2mByteSize = 180;
   // Byte size won't change once we get the initial aggregated value
   private int _maxByteSize;
 
+  public DistinctCountHLLValueAggregator() {

Review Comment:
   (minor) This constructor is not used in production code. Should we consider modifying the test usage and remove it?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumPrecisionValueAggregator.java:
##########
@@ -28,6 +32,29 @@ public class SumPrecisionValueAggregator implements ValueAggregator<Object, BigD
   public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
 
   private int _maxByteSize;
+  private int _fixedSize = -1;
+
+  public SumPrecisionValueAggregator() {
+  }
+
+  /*
+  Aggregate with a optimal maximum precision in mind. Scale is always only 1 32-bit

Review Comment:
   (code format) We usually indent (add 2 spaces) the block comment



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1193,8 +1212,8 @@ private static Map<String, Pair<String, ValueAggregator>> fromAggregateMetrics(R
 
     Map<String, Pair<String, ValueAggregator>> columnNameToAggregator = new HashMap<>();
     for (String metricName : segmentConfig.getSchema().getMetricNames()) {
-      columnNameToAggregator.put(metricName,
-          Pair.of(metricName, ValueAggregatorFactory.getValueAggregator(AggregationFunctionType.SUM)));
+      columnNameToAggregator.put(metricName, Pair.of(metricName,
+          ValueAggregatorFactory.getValueAggregator(AggregationFunctionType.SUM, Collections.EMPTY_LIST)));

Review Comment:
   (nit) We usually use
   ```suggestion
             ValueAggregatorFactory.getValueAggregator(AggregationFunctionType.SUM, Collections.emptyList())));
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java:
##########
@@ -269,13 +272,14 @@ public MutableIndex createMutableIndex(MutableIndexContext context, ForwardIndex
     String column = context.getFieldSpec().getName();
     String segmentName = context.getSegmentName();
     FieldSpec.DataType storedType = context.getFieldSpec().getDataType().getStoredType();
+    int maxLength = context.getFieldSpec().getMaxLength();

Review Comment:
   (MAJOR) I don't think this is the correct way to pass this information. We can probably add the fixed length info into the `MutableIndexContext` to avoid modifying the field spec



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteSVMutableForwardIndex.java:
##########
@@ -63,22 +63,32 @@ public class FixedByteSVMutableForwardIndex implements MutableForwardIndex {
 
   /**
    * @param storedType Data type of the values
+   * @param fixedLength Fixed length of values if known: only used for BYTES field and Hyperloglog values for now.
    * @param numRowsPerChunk Number of rows to pack in one chunk before a new chunk is created.
    * @param memoryManager Memory manager to be used for allocating memory.
    * @param allocationContext Allocation allocationContext.
    */
-  public FixedByteSVMutableForwardIndex(boolean dictionaryEncoded, DataType storedType, int numRowsPerChunk,
-      PinotDataBufferMemoryManager memoryManager, String allocationContext) {
+  public FixedByteSVMutableForwardIndex(boolean dictionaryEncoded, DataType storedType, int fixedLength,
+      int numRowsPerChunk, PinotDataBufferMemoryManager memoryManager, String allocationContext) {
     _dictionaryEncoded = dictionaryEncoded;
     _storedType = storedType;
-    _valueSizeInBytes = storedType.size();
+    if (storedType == DataType.BYTES || storedType == DataType.BIG_DECIMAL) {
+      _valueSizeInBytes = fixedLength;

Review Comment:
   Let's add a check here verifying `fixedLength` is positive



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -621,6 +631,10 @@ private void addNewRow(int docId, GenericRow row) {
           case DOUBLE:
             forwardIndex.add(((Number) value).doubleValue(), -1, docId);
             break;
+          case BIG_DECIMAL:

Review Comment:
   The above comment no longer apply.
   We should probably add some comment about using byte[] to support `BIG_DECIMAL`. It works because `BIG_DECIMAL` is actually stored as byte[] underlying



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumValueAggregator.java:
##########
@@ -37,11 +37,17 @@ public DataType getAggregatedValueType() {
 
   @Override
   public Double getInitialAggregatedValue(Number rawValue) {
+    if (rawValue == null) {

Review Comment:
   Currently the input should never be `null` (it should have already been filled with default value). My concern is that we are adding `null` handling to only this aggregation but not others. In order to completely support `null` input, we need to allow null value in, and annotate the input value as `@Nullable` and support it for all aggregations. That is not in the scope of this PR, so suggest doing it separately



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org