You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "walterddr (via GitHub)" <gi...@apache.org> on 2023/06/15 01:03:02 UTC

[GitHub] [pinot] walterddr commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

walterddr commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1230300258


##########
pinot-common/src/main/java/org/apache/pinot/common/request/context/IdentifierContext.java:
##########
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.request.context;
+
+import java.util.Objects;
+import org.apache.pinot.common.utils.DataSchema;
+
+/**
+ * The {@code IdentifierContext} class represents a Identifer in the query.
+ * <p> This class includes information that about an identifier. The v1 engine uses column names for identifiers. The
+ * multistage query engine uses ordinals to distinctly track each identifier. So this context is set up to support both
+ * v1 and multistage engine identifiers.
+ */
+public class IdentifierContext {

Review Comment:
   why is this class necessary? 
   - all PlanNode has schema and rexexpression, you could derive the column name from the schema right?
   - i think the intermediate stage input schema column name is just going to be `EXPR$0` for ordinal `[0]`
   - even if it is not named like that, intermediate stage schema column name is totally up to us, we can make it ordinal based such as `$colName__$ordinal` format so we dont need this
   
   It is not a deal breaker but would make lots of the v1 file changes unnecessary



##########
pinot-core/src/main/java/org/apache/pinot/core/common/IntermediateStageBlockValSet.java:
##########
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common;
+
+import java.math.BigDecimal;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * In the multistage engine, the leaf stage servers process the data in columnar fashion. By the time the
+ * intermediate stage receives the projected column, they are converted to a row based format. This class provides
+ * the capability to convert the row based represenation into blocks so that they can be used to process
+ * aggregations.
+ * TODO: Support MV
+ */
+public class IntermediateStageBlockValSet implements BlockValSet {
+  private final FieldSpec.DataType _dataType;
+  private final PinotDataType _pinotDataType;
+  private final List<Object> _values;
+  private final RoaringBitmap _nullBitMap;
+  private boolean _nullBitMapSet;
+
+  public IntermediateStageBlockValSet(DataSchema.ColumnDataType columnDataType, List<Object> values) {

Review Comment:
   if we already knew the raw type. let's dont even box it in `Object`



##########
pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintStrategyTable.java:
##########
@@ -29,6 +29,9 @@
 public class PinotHintStrategyTable {
   public static final String INTERNAL_AGG_INTERMEDIATE_STAGE = "aggIntermediateStage";
   public static final String INTERNAL_AGG_FINAL_STAGE = "aggFinalStage";
+  // Hint to denote that aggregation is to be run as a single stage rather than split into an intermediate and a final
+  // stage
+  public static final String INTERNAL_IS_SINGLE_STAGE_AGG = "isSingleStageAgg";

Review Comment:
   what does this hint do? this hint is the same as `aggFinalStage` right?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunction.java:
##########
@@ -106,6 +106,27 @@ void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder
    */
   IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2);
 
+  /**
+   * Merges two intermediate results and also updates the aggregation result holder. This is needed when aggregation
+   * is processed in multiple stages to store the intermediate results.
+   */
+  default void mergeAndUpdateResultHolder(IntermediateResult intermediateResult,
+      AggregationResultHolder aggregationResultHolder) {
+    // TODO: Remove when support for all aggregation functions is added to the Multistage engine.

Review Comment:
   i am confused with this comment, what's the end state after removing this function? 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java:
##########
@@ -187,17 +189,19 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
       } else {
         switch (AggregationFunctionType.valueOf(upperCaseFunctionName)) {
           case COUNT:
-            return new CountAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
+            return new CountAggregationFunction(firstArgument, aggFunctionQueryContext.isNullHandlingEnabled());
           case MIN:
-            return new MinAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
+            return new MinAggregationFunction(firstArgument, aggFunctionQueryContext.isNullHandlingEnabled());
           case MAX:
-            return new MaxAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
+            return new MaxAggregationFunction(firstArgument, aggFunctionQueryContext.isNullHandlingEnabled());
+          // TODO(Sonam): Uncomment SUM0 when merging planner changes
           case SUM:
-            return new SumAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
+          // case SUM0:

Review Comment:
   how did we removed SUM,SUM0 and other variance?
   
   IMO this move is not exactly ok, b/c the variance has different behavior on
   - what to return when nothing is in the group set; or nothing is in global agg
   - what to do with null values



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggFunctionQueryContext.java:
##########
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+/**
+ * The <code>AggFunctionQueryContext</code> class contains extracted details from QueryContext that can be used for
+ * Aggregation Functions.
+ */
+public class AggFunctionQueryContext {

Review Comment:
   is this a refactor? can we individually pull this one out as a separate PR?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseBooleanAggregationFunction.java:
##########
@@ -231,9 +231,27 @@ public Integer merge(Integer intermediateResult1, Integer intermediateResult2) {
     return _merger.merge(intermediateResult1, intermediateResult2);
   }
 
+  @Override
+  public void mergeAndUpdateResultHolder(Integer intermediateResult,

Review Comment:
   this and other functions. can we split the supported functions into 
   - what currently v2 supports, MIN/MAX/SUM/COUNT
   - what we plan to demonstrate in this PR (preferrably, distinctCountHLL?)
   - what we can add later (all the others)
   
   This way we can make this PR smaller by not including the rest of the agg functions 
   - we are not supporting them anyway correct?
   - these 2 added APIs are only used by multistage v2 operators yes?
   
   [note to self] skip reviewing the rest of the agg functions



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java:
##########
@@ -96,10 +102,88 @@ public MultiStageOperator visitMailboxSend(MailboxSendNode node, PhysicalPlanCon
   @Override
   public MultiStageOperator visitAggregate(AggregateNode node, PhysicalPlanContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
+
+    // TODO: Will need to use a collection of inputSchema when we support aggregation functions with multiple
+    //  columns.

Review Comment:
   i am not quite sure what this TODO is about, could you elaborate with an example?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NewAggregateOperator.java:
##########
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.IntermediateStageBlockValSet;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.function.AggFunctionQueryContext;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+
+
+/**
+ *
+ * AggregateOperator is used to aggregate values over a set of group by keys.
+ * Output data will be in the format of [group by key, aggregate result1, ... aggregate resultN]
+ * Currently, we only support the following aggregation functions:
+ * 1. SUM
+ * 2. COUNT
+ * 3. MIN
+ * 4. MAX
+ * 5. DistinctCount and Count(Distinct)
+ * 6.AVG
+ * 7. FourthMoment
+ * 8. BoolAnd and BoolOr
+ *
+ * When the list of aggregation calls is empty, this class is used to calculate distinct result based on group by keys.
+ * In this case, the input can be any type.
+ *
+ * If the list of aggregation calls is not empty, the input of aggregation has to be a number.
+ *
+ * Note: This class performs aggregation over the double value of input.
+ * If the input is single value, the output type will be input type. Otherwise, the output type will be double.
+ */
+// TODO: Rename to AggregateOperator when merging Planner support.
+public class NewAggregateOperator extends MultiStageOperator {
+  private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+
+  private final MultiStageOperator _inputOperator;
+  private final DataSchema _resultSchema;
+
+  // Aggregation containers
+  private final AggregationFunction[] _aggregationFunctions;
+  private final AggregationResultHolder[] _aggregationResultHolders;
+
+  // Group By containers
+  private final List<ExpressionContext> _groupSet;
+  private final GroupByResultHolder[] _groupByResultHolders;
+  // Mapping from the group by row-key to the values in the row.
+  private final Map<Key, Object[]> _groupByKeyHolder;
+  // groupId and groupIdMap are used to create a 0-based index for group-by keys instead of using the hash value
+  // directly - similar to GroupByKeyGenerator. This is useful when we invoke the aggregation functions because they
+  // use the group by key indexes to store results.
+  private int _groupId = 0;
+  private Map<Integer, Integer> _groupIdMap;
+
+  private TransferableBlock _upstreamErrorBlock;
+  private boolean _readyToConstruct;
+  private boolean _hasReturnedAggregateBlock;
+
+  // Denotes whether this aggregation operator should merge intermediate results.
+  private boolean _isMergeAggregation;
+
+  // TODO: refactor Pinot Reducer code to support the intermediate stage agg operator.
+  // aggCalls has to be a list of FunctionCall and cannot be null
+  // groupSet has to be a list of InputRef and cannot be null
+  // TODO: Add these two checks when we confirm we can handle error in upstream ctor call.
+
+  @VisibleForTesting
+  public NewAggregateOperator(OpChainExecutionContext context, MultiStageOperator inputOperator,
+      DataSchema resultSchema, List<FunctionContext> functionContexts, List<ExpressionContext> groupSet,
+      boolean isMergeAggregation, boolean isSingleStageAggregation) {
+    super(context);
+    _inputOperator = inputOperator;
+    _resultSchema = resultSchema;
+
+    _groupSet = groupSet;
+    _groupByKeyHolder = new HashMap<>();
+    _groupIdMap = new HashMap<>();
+    _aggregationFunctions = new AggregationFunction[functionContexts.size()];
+    _aggregationResultHolders = new AggregationResultHolder[functionContexts.size()];
+    _groupByResultHolders = new GroupByResultHolder[functionContexts.size()];
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      _aggregationFunctions[i] = AggregationFunctionFactory.getAggregationFunction(functionContexts.get(i),
+          new AggFunctionQueryContext(true));
+      _aggregationResultHolders[i] = _aggregationFunctions[i].createAggregationResultHolder();
+      _groupByResultHolders[i] = _aggregationFunctions[i].createGroupByResultHolder(
+          InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+          InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+    }
+
+    _upstreamErrorBlock = null;
+    _readyToConstruct = false;
+    _hasReturnedAggregateBlock = false;
+    _isMergeAggregation = isMergeAggregation && !isSingleStageAggregation;
+  }
+
+  @Override
+  public List<MultiStageOperator> getChildOperators() {
+    return ImmutableList.of(_inputOperator);
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected TransferableBlock getNextBlock() {
+    try {
+      if (!_readyToConstruct && !consumeInputBlocks()) {
+        return TransferableBlockUtils.getNoOpTransferableBlock();
+      }
+
+      if (_upstreamErrorBlock != null) {
+        return _upstreamErrorBlock;
+      }
+
+      if (!_hasReturnedAggregateBlock) {
+        return produceAggregatedBlock();
+      } else {
+        // TODO: Move to close call.
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      }
+    } catch (Exception e) {
+      return TransferableBlockUtils.getErrorTransferableBlock(e);
+    }
+  }
+
+  private TransferableBlock produceAggregatedBlock() {
+    List<Object[]> rows = _groupSet.isEmpty() ? collectAggregationResultRows() : collectGroupByResultRows();
+
+    _hasReturnedAggregateBlock = true;
+    if (rows.size() == 0) {
+      if (_groupSet.size() == 0) {
+        return constructEmptyAggResultBlock();
+      } else {
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      }
+    } else {
+      return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
+    }
+  }
+
+  private List<Object[]> collectAggregationResultRows() {
+    List<Object[]> rows = new ArrayList<>();
+
+    Object[] row = new Object[_aggregationFunctions.length];
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      AggregationFunction aggregationFunction = _aggregationFunctions[i];
+      row[i] = aggregationFunction.extractAggregationResult(_aggregationResultHolders[i]);
+    }
+    rows.add(row);
+    return rows;
+  }
+
+  private List<Object[]> collectGroupByResultRows() {
+    List<Object[]> rows = new ArrayList<>();
+    for (Map.Entry<Key, Object[]> e : _groupByKeyHolder.entrySet()) {
+      Object[] row = new Object[_aggregationFunctions.length + _groupSet.size()];
+      Object[] keyElements = e.getValue();
+      System.arraycopy(keyElements, 0, row, 0, keyElements.length);
+      for (int i = 0; i < _aggregationFunctions.length; i++) {
+        row[i + _groupSet.size()] = _aggregationFunctions[i].extractGroupByResult(_groupByResultHolders[i],
+            _groupIdMap.get(e.getKey().hashCode()));
+      }
+      rows.add(row);
+    }
+    return rows;
+  }
+
+  /**
+   * @return an empty agg result block for non-group-by aggregation.
+   */
+  private TransferableBlock constructEmptyAggResultBlock() {
+    Object[] row = new Object[_aggregationFunctions.length];
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      AggregationFunction aggFunction = _aggregationFunctions[i];
+      row[i] = aggFunction.extractAggregationResult(aggFunction.createAggregationResultHolder());
+    }
+    return new TransferableBlock(Collections.singletonList(row), _resultSchema, DataBlock.Type.ROW);
+  }
+
+  /**
+   * @return whether or not the operator is ready to move on (EOS or ERROR)
+   */
+  private boolean consumeInputBlocks() {
+    TransferableBlock block = _inputOperator.nextBlock();
+    while (!block.isNoOpBlock()) {
+      // setting upstream error block
+      if (block.isErrorBlock()) {
+        _upstreamErrorBlock = block;
+        return true;
+      } else if (block.isEndOfStreamBlock()) {
+        _readyToConstruct = true;
+        return true;
+      }
+
+      List<Object[]> container = block.getContainer();
+      if (_isMergeAggregation) {
+        mergeIntermediateValues(container);
+      } else {
+        aggregateValues(container);
+      }
+
+      block = _inputOperator.nextBlock();
+    }
+    return false;
+  }
+
+  private void mergeIntermediateValues(List<Object[]> container) {
+    if (_groupSet.isEmpty()) {
+      performMergeAggregation(container);
+    } else {
+      performMergeGroupBy(container);
+    }
+  }
+
+  private void performMergeAggregation(List<Object[]> container) {
+    // Simple aggregation function.
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      List<ExpressionContext> expressions = _aggregationFunctions[i].getInputExpressions();
+      for (Object[] row : container) {
+        Object intermediateResultToMerge = extractIntermediateValue(row, expressions);
+        _aggregationFunctions[i].mergeAndUpdateResultHolder(intermediateResultToMerge,
+            _aggregationResultHolders[i]);
+      }
+    }
+  }
+
+  private void performMergeGroupBy(List<Object[]> container) {
+    // Create group by keys for each row.
+    int[] intKeys = generateGroupByKeys(container);
+
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      GroupByResultHolder groupByResultHolder = _groupByResultHolders[i];
+      groupByResultHolder.ensureCapacity(_groupIdMap.size());
+      List<ExpressionContext> expressions = _aggregationFunctions[i].getInputExpressions();
+
+      for (int j = 0; j < container.size(); j++) {
+        Object[] row = container.get(j);
+        Object intermediateResultToMerge = extractIntermediateValue(row, expressions);
+        _aggregationFunctions[i].mergeAndUpdateResultHolder(intermediateResultToMerge, groupByResultHolder,
+            intKeys[j]);
+      }
+    }
+  }
+
+  Object extractIntermediateValue(Object[] row, List<ExpressionContext> expressions) {
+    // TODO: Add support to handle aggregation functions where:
+    //       1. The identifier need not be the first argument
+    //       2. There are more than one identifiers.
+    Preconditions.checkState(expressions.size() <= 1);
+    Preconditions.checkState(!expressions.isEmpty());
+    ExpressionContext expr = expressions.get(0);
+
+    Object result = expr.getType().equals(ExpressionContext.Type.IDENTIFIER) ? row[expr.getIdentifierIndex()]
+        : expr.getLiteral().getValue();
+    return result;
+  }
+
+  public void aggregateValues(List<Object[]> container) {
+    // Convert row to columnar representation
+    Map<Integer, List<Object>> columnValuesMap = new HashMap<>();

Review Comment:
   is there any better way of handling this? converting row to columnar should be done via RowDataBlock and ColumnDataBlock abstraction?
   
   I agree we can do the refactoring later but was wondering if there's any way we can directly extract this part into a DataBlockUtils



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org