You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "vvivekiyer (via GitHub)" <gi...@apache.org> on 2023/06/05 18:35:05 UTC

[GitHub] [pinot] vvivekiyer opened a new pull request, #10845: [Multistage] Runtime changes for supporting all V1 Aggregations

vvivekiyer opened a new pull request, #10845:
URL: https://github.com/apache/pinot/pull/10845

   Label=feature
   OSS Issue = https://github.com/apache/pinot/issues/10745
   Design Doc = https://docs.google.com/document/d/1Us6aBvTpNLMEy0ODo34OgTk73h_LVFFAH6q17689h1M/edit 
   
   
   As a part of this effort, we want the Multistage engine to leverage our existing AggregationFunctions. The execution for this effort is divided into phases. This PR contains the runtime changes for Phase 1 where we add support for all existing aggregations in the Multistage Engine, namely:
   1. Sum
   2. Min
   3. Max
   4. Count
   5. Bool_And
   6. Bool_Or
   7. Forthmoment (Kurtosis, Skewness)
   8. DistinctCount 
   9. Avg
   
   The corresponding planner changes are addressed in this PR (TBD: Insert Link)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] vvivekiyer commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "vvivekiyer (via GitHub)" <gi...@apache.org>.
vvivekiyer commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1237805932


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java:
##########
@@ -96,10 +102,88 @@ public MultiStageOperator visitMailboxSend(MailboxSendNode node, PhysicalPlanCon
   @Override
   public MultiStageOperator visitAggregate(AggregateNode node, PhysicalPlanContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
+
+    // TODO: Will need to use a collection of inputSchema when we support aggregation functions with multiple
+    //  columns.

Review Comment:
   This is a misplaced TODO. I've removed it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] abhioncbr commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "abhioncbr (via GitHub)" <gi...@apache.org>.
abhioncbr commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1219879274


##########
pinot-core/src/main/java/org/apache/pinot/core/common/IntermediateStageBlockValSet.java:
##########
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common;
+
+import java.math.BigDecimal;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * In the multistage engine, the leaf stage servers process the data in columnar fashion. By the time the
+ * intermediate stage receives the projected column, they are converted to a row based format. This class provides
+ * the capability to convert the row based represenation into blocks so that they can be used to process
+ * aggregations.
+ * TODO: Support MV
+ */
+public class IntermediateStageBlockValSet implements BlockValSet {
+  private final FieldSpec.DataType _dataType;
+  private final PinotDataType _pinotDataType;
+  private final List<Object> _values;
+  private final RoaringBitmap _nullBitMap;
+  private boolean _nullBitMapSet;
+
+  public IntermediateStageBlockValSet(DataSchema.ColumnDataType columnDataType, List<Object> values) {
+    _dataType = columnDataType.toDataType();
+    _pinotDataType = PinotDataType.getPinotDataTypeForExecution(columnDataType);
+    _values = values;
+    _nullBitMap = new RoaringBitmap();
+  }
+
+  /**
+   * Returns a bitmap of indices where null values are found.
+   */
+  @Nullable
+  @Override
+  public RoaringBitmap getNullBitmap() {
+    if (!_nullBitMapSet) {
+      if (_values == null) {
+        return _nullBitMap;
+      }
+
+      for (int i = 0; i < _values.size(); i++) {
+        if (_values.get(i) == null) {
+          _nullBitMap.add(i);
+        }
+      }
+      _nullBitMapSet = true;
+    }
+    return _nullBitMap;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return _dataType;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    // TODO: Needs to be changed when we start supporting MV in multistage
+    return true;
+  }
+
+  @Nullable
+  @Override
+  public Dictionary getDictionary() {
+    return null;
+  }
+
+  @Override
+  public int[] getDictionaryIdsSV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int[] getIntValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+
+    int length = _values.size();
+    int[] values = new int[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toInt(value);
+      }
+    }
+    return values;
+  }
+
+  @Override
+  public long[] getLongValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+
+    int length = _values.size();
+    long[] values = new long[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toLong(value);
+      }
+    }
+
+    return values;
+  }
+
+  @Override
+  public float[] getFloatValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+    int length = _values.size();
+    float[] values = new float[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toFloat(value);
+      }
+    }
+
+    return values;
+  }
+
+  @Override
+  public double[] getDoubleValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+    int length = _values.size();
+    double[] values = new double[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toDouble(value);
+      }
+    }
+
+    return values;
+  }
+
+  @Override
+  public BigDecimal[] getBigDecimalValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+    int length = _values.size();
+    BigDecimal[] values = new BigDecimal[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toBigDecimal(value);
+      }
+    }
+    return values;
+  }
+
+  @Override
+  public String[] getStringValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+    int length = _values.size();
+    String[] values = new String[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toString(value);
+      }
+    }
+    return values;
+  }
+
+  @Override
+  public byte[][] getBytesValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+    int length = _values.size();
+    byte[][] values = new byte[length][];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toBytes(value);
+      }
+    }
+    return values;
+  }
+
+  @Override
+  public int[][] getDictionaryIdsMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int[][] getIntValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long[][] getLongValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public float[][] getFloatValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public double[][] getDoubleValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String[][] getStringValuesMV() {
+    throw new UnsupportedOperationException();

Review Comment:
   Thanks for putting this work. I was waiting for this as I think it helps me with supporting the [json scalar functions](https://github.com/apache/pinot/pull/10790) in multistage. 
   
   Just a question, why we are not supporting MV types? With the MV types support we can handle scalar functions like `string split` and many other. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] vvivekiyer commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "vvivekiyer (via GitHub)" <gi...@apache.org>.
vvivekiyer commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1242529302


##########
pinot-common/src/main/java/org/apache/pinot/common/request/context/ExpressionContext.java:
##########
@@ -74,8 +83,27 @@ public LiteralContext getLiteral(){
     return _literal;
   }
 
-  public String getIdentifier() {
-    return _identifier;
+  // Please check that the _type is Identifier before calling these functions.
+  public String getIdentifierName() {
+    if (_identifier == null) {
+      return null;
+    }
+
+    return _identifier.getName();
+  }
+
+  public int getIdentifierIndex() {

Review Comment:
   Can we make this change contained in v2 (in NewAggregateOperator) so that we don't need this interface change? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] vvivekiyer commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "vvivekiyer (via GitHub)" <gi...@apache.org>.
vvivekiyer commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1219905051


##########
pinot-core/src/main/java/org/apache/pinot/core/common/IntermediateStageBlockValSet.java:
##########
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common;
+
+import java.math.BigDecimal;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * In the multistage engine, the leaf stage servers process the data in columnar fashion. By the time the
+ * intermediate stage receives the projected column, they are converted to a row based format. This class provides
+ * the capability to convert the row based represenation into blocks so that they can be used to process
+ * aggregations.
+ * TODO: Support MV
+ */
+public class IntermediateStageBlockValSet implements BlockValSet {
+  private final FieldSpec.DataType _dataType;
+  private final PinotDataType _pinotDataType;
+  private final List<Object> _values;
+  private final RoaringBitmap _nullBitMap;
+  private boolean _nullBitMapSet;
+
+  public IntermediateStageBlockValSet(DataSchema.ColumnDataType columnDataType, List<Object> values) {
+    _dataType = columnDataType.toDataType();
+    _pinotDataType = PinotDataType.getPinotDataTypeForExecution(columnDataType);
+    _values = values;
+    _nullBitMap = new RoaringBitmap();
+  }
+
+  /**
+   * Returns a bitmap of indices where null values are found.
+   */
+  @Nullable
+  @Override
+  public RoaringBitmap getNullBitmap() {
+    if (!_nullBitMapSet) {
+      if (_values == null) {
+        return _nullBitMap;
+      }
+
+      for (int i = 0; i < _values.size(); i++) {
+        if (_values.get(i) == null) {
+          _nullBitMap.add(i);
+        }
+      }
+      _nullBitMapSet = true;
+    }
+    return _nullBitMap;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return _dataType;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    // TODO: Needs to be changed when we start supporting MV in multistage
+    return true;
+  }
+
+  @Nullable
+  @Override
+  public Dictionary getDictionary() {
+    return null;
+  }
+
+  @Override
+  public int[] getDictionaryIdsSV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int[] getIntValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+
+    int length = _values.size();
+    int[] values = new int[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toInt(value);
+      }
+    }
+    return values;
+  }
+
+  @Override
+  public long[] getLongValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+
+    int length = _values.size();
+    long[] values = new long[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toLong(value);
+      }
+    }
+
+    return values;
+  }
+
+  @Override
+  public float[] getFloatValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+    int length = _values.size();
+    float[] values = new float[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toFloat(value);
+      }
+    }
+
+    return values;
+  }
+
+  @Override
+  public double[] getDoubleValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+    int length = _values.size();
+    double[] values = new double[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toDouble(value);
+      }
+    }
+
+    return values;
+  }
+
+  @Override
+  public BigDecimal[] getBigDecimalValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+    int length = _values.size();
+    BigDecimal[] values = new BigDecimal[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toBigDecimal(value);
+      }
+    }
+    return values;
+  }
+
+  @Override
+  public String[] getStringValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+    int length = _values.size();
+    String[] values = new String[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toString(value);
+      }
+    }
+    return values;
+  }
+
+  @Override
+  public byte[][] getBytesValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+    int length = _values.size();
+    byte[][] values = new byte[length][];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toBytes(value);
+      }
+    }
+    return values;
+  }
+
+  @Override
+  public int[][] getDictionaryIdsMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int[][] getIntValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long[][] getLongValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public float[][] getFloatValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public double[][] getDoubleValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String[][] getStringValuesMV() {
+    throw new UnsupportedOperationException();

Review Comment:
   We currently don't support MV types in Multistage engine - https://github.com/apache/pinot/issues/10658. Eventually when we support MV columns in Multistage and add the corresponding aggregation functions, we'll have to support MV types here. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] vvivekiyer commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "vvivekiyer (via GitHub)" <gi...@apache.org>.
vvivekiyer commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1242566160


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NewAggregateOperator.java:
##########
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.IntermediateStageBlockValSet;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.function.AggFunctionQueryContext;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+
+
+/**
+ *
+ * AggregateOperator is used to aggregate values over a set of group by keys.
+ * Output data will be in the format of [group by key, aggregate result1, ... aggregate resultN]
+ * Currently, we only support the following aggregation functions:
+ * 1. SUM
+ * 2. COUNT
+ * 3. MIN
+ * 4. MAX
+ * 5. DistinctCount and Count(Distinct)
+ * 6.AVG
+ * 7. FourthMoment
+ * 8. BoolAnd and BoolOr
+ *
+ * When the list of aggregation calls is empty, this class is used to calculate distinct result based on group by keys.
+ * In this case, the input can be any type.
+ *
+ * If the list of aggregation calls is not empty, the input of aggregation has to be a number.
+ *
+ * Note: This class performs aggregation over the double value of input.
+ * If the input is single value, the output type will be input type. Otherwise, the output type will be double.
+ */
+// TODO: Rename to AggregateOperator when merging Planner support.
+public class NewAggregateOperator extends MultiStageOperator {
+  private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+
+  private final MultiStageOperator _inputOperator;
+  private final DataSchema _resultSchema;
+
+  // Aggregation containers
+  private final AggregationFunction[] _aggregationFunctions;
+  private final AggregationResultHolder[] _aggregationResultHolders;
+
+  // Group By containers
+  private final List<ExpressionContext> _groupSet;
+  private final GroupByResultHolder[] _groupByResultHolders;
+  // Mapping from the group by row-key to the values in the row.
+  private final Map<Key, Object[]> _groupByKeyHolder;
+  // groupId and groupIdMap are used to create a 0-based index for group-by keys instead of using the hash value
+  // directly - similar to GroupByKeyGenerator. This is useful when we invoke the aggregation functions because they
+  // use the group by key indexes to store results.
+  private int _groupId = 0;
+  private Map<Integer, Integer> _groupIdMap;
+
+  private TransferableBlock _upstreamErrorBlock;
+  private boolean _readyToConstruct;
+  private boolean _hasReturnedAggregateBlock;
+
+  // Denotes whether this aggregation operator should merge intermediate results.
+  private boolean _isMergeAggregation;
+
+  // TODO: refactor Pinot Reducer code to support the intermediate stage agg operator.
+  // aggCalls has to be a list of FunctionCall and cannot be null
+  // groupSet has to be a list of InputRef and cannot be null
+  // TODO: Add these two checks when we confirm we can handle error in upstream ctor call.
+
+  @VisibleForTesting
+  public NewAggregateOperator(OpChainExecutionContext context, MultiStageOperator inputOperator,
+      DataSchema resultSchema, List<FunctionContext> functionContexts, List<ExpressionContext> groupSet,
+      boolean isMergeAggregation, boolean isSingleStageAggregation) {
+    super(context);
+    _inputOperator = inputOperator;
+    _resultSchema = resultSchema;
+
+    _groupSet = groupSet;
+    _groupByKeyHolder = new HashMap<>();
+    _groupIdMap = new HashMap<>();
+    _aggregationFunctions = new AggregationFunction[functionContexts.size()];
+    _aggregationResultHolders = new AggregationResultHolder[functionContexts.size()];
+    _groupByResultHolders = new GroupByResultHolder[functionContexts.size()];
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      _aggregationFunctions[i] = AggregationFunctionFactory.getAggregationFunction(functionContexts.get(i),
+          new AggFunctionQueryContext(true));
+      _aggregationResultHolders[i] = _aggregationFunctions[i].createAggregationResultHolder();
+      _groupByResultHolders[i] = _aggregationFunctions[i].createGroupByResultHolder(
+          InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+          InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+    }
+
+    _upstreamErrorBlock = null;
+    _readyToConstruct = false;
+    _hasReturnedAggregateBlock = false;
+    _isMergeAggregation = isMergeAggregation && !isSingleStageAggregation;
+  }
+
+  @Override
+  public List<MultiStageOperator> getChildOperators() {
+    return ImmutableList.of(_inputOperator);
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected TransferableBlock getNextBlock() {
+    try {
+      if (!_readyToConstruct && !consumeInputBlocks()) {
+        return TransferableBlockUtils.getNoOpTransferableBlock();
+      }
+
+      if (_upstreamErrorBlock != null) {
+        return _upstreamErrorBlock;
+      }
+
+      if (!_hasReturnedAggregateBlock) {
+        return produceAggregatedBlock();
+      } else {
+        // TODO: Move to close call.
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      }
+    } catch (Exception e) {
+      return TransferableBlockUtils.getErrorTransferableBlock(e);
+    }
+  }
+
+  private TransferableBlock produceAggregatedBlock() {
+    List<Object[]> rows = _groupSet.isEmpty() ? collectAggregationResultRows() : collectGroupByResultRows();
+
+    _hasReturnedAggregateBlock = true;
+    if (rows.size() == 0) {
+      if (_groupSet.size() == 0) {
+        return constructEmptyAggResultBlock();
+      } else {
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      }
+    } else {
+      return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
+    }
+  }
+
+  private List<Object[]> collectAggregationResultRows() {
+    List<Object[]> rows = new ArrayList<>();
+
+    Object[] row = new Object[_aggregationFunctions.length];
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      AggregationFunction aggregationFunction = _aggregationFunctions[i];
+      row[i] = aggregationFunction.extractAggregationResult(_aggregationResultHolders[i]);
+    }
+    rows.add(row);
+    return rows;
+  }
+
+  private List<Object[]> collectGroupByResultRows() {
+    List<Object[]> rows = new ArrayList<>();
+    for (Map.Entry<Key, Object[]> e : _groupByKeyHolder.entrySet()) {
+      Object[] row = new Object[_aggregationFunctions.length + _groupSet.size()];
+      Object[] keyElements = e.getValue();
+      System.arraycopy(keyElements, 0, row, 0, keyElements.length);
+      for (int i = 0; i < _aggregationFunctions.length; i++) {
+        row[i + _groupSet.size()] = _aggregationFunctions[i].extractGroupByResult(_groupByResultHolders[i],
+            _groupIdMap.get(e.getKey().hashCode()));

Review Comment:
   Don't use hashCode because there will be collisions. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr merged pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr merged PR #10845:
URL: https://github.com/apache/pinot/pull/10845


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "xiangfu0 (via GitHub)" <gi...@apache.org>.
xiangfu0 commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1242589834


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxAggregationFunction.java:
##########
@@ -295,6 +295,22 @@ public Double merge(Double intermediateMaxResult1, Double intermediateMaxResult2
     return intermediateMaxResult2;
   }
 
+  @Override
+  public void mergeAndUpdateResultHolder(Double intermediateResult,

Review Comment:
   Shall we make those default implementation for the base class and put unsupported and TODO for the implementations won't honor these two methods?



##########
pinot-common/src/main/java/org/apache/pinot/common/request/context/IdentifierContext.java:
##########
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.request.context;
+
+import java.util.Objects;
+import org.apache.pinot.common.utils.DataSchema;
+
+/**
+ * The {@code IdentifierContext} class represents a Identifer in the query.
+ * <p> This class includes information that about an identifier. The v1 engine uses column names for identifiers. The
+ * multistage query engine uses ordinals to distinctly track each identifier. So this context is set up to support both
+ * v1 and multistage engine identifiers.
+ */
+public class IdentifierContext {
+  private DataSchema.ColumnDataType _dataType;
+  String _name;
+  // Identifier Index is needed because multistage engine tracks identifiers with the index(ordinal) position.
+  int _identifierIndex;
+
+  public IdentifierContext(String name, DataSchema.ColumnDataType dataType, int identifierIndex) {
+    _name = name;
+    _dataType = dataType;
+    _identifierIndex = identifierIndex;
+  }
+
+  public String getName() {
+    return _name;
+  }
+
+  public DataSchema.ColumnDataType getDataType() {
+    return _dataType;
+  }
+
+  public int getIndex() {
+    return _identifierIndex;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof IdentifierContext)) {
+      return false;
+    }
+    IdentifierContext that = (IdentifierContext) o;
+    return Objects.equals(_name, that._name) && Objects.equals(_identifierIndex, that._identifierIndex);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(_name, _identifierIndex);
+  }
+
+  @Override
+  public String toString() {
+    return _name;

Review Comment:
   Where is this used? Shall we assemble both _name and _identifierIndex or even the _dataType here?



##########
pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintStrategyTable.java:
##########
@@ -29,6 +29,9 @@
 public class PinotHintStrategyTable {
   public static final String INTERNAL_AGG_INTERMEDIATE_STAGE = "aggIntermediateStage";
   public static final String INTERNAL_AGG_FINAL_STAGE = "aggFinalStage";
+  // Hint to denote that aggregation is to be run as a single stage rather than split into an intermediate and a final
+  // stage
+  public static final String INTERNAL_IS_SINGLE_STAGE_AGG = "isSingleStageAgg";

Review Comment:
   Is this hint required to be set explicitly from external users or we are using it internally always?
   If later, I think we can add a parameter to the Agg PlanNode instead of adding a hint?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AggregateNode.java:
##########
@@ -44,9 +44,11 @@ public AggregateNode(int planFragmentId, DataSchema dataSchema, List<AggregateCa
       List<RexExpression> groupSet,
       List<RelHint> relHints) {
     super(planFragmentId, dataSchema);
-    _aggCalls = aggCalls.stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
+
+
     _groupSet = groupSet;
     _relHints = relHints;
+    _aggCalls = aggCalls.stream().map(RexExpression::toRexExpression).collect(Collectors.toList());

Review Comment:
   why changed the ordering?



##########
pinot-core/src/main/java/org/apache/pinot/core/common/IntermediateStageBlockValSet.java:
##########
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common;
+
+import java.math.BigDecimal;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * In the multistage engine, the leaf stage servers process the data in columnar fashion. By the time the
+ * intermediate stage receives the projected column, they are converted to a row based format. This class provides
+ * the capability to convert the row based represenation into blocks so that they can be used to process
+ * aggregations.
+ * TODO: Support MV
+ */
+public class IntermediateStageBlockValSet implements BlockValSet {
+  private final FieldSpec.DataType _dataType;
+  private final PinotDataType _pinotDataType;
+  private final List<Object> _values;
+  private final RoaringBitmap _nullBitMap;
+  private boolean _nullBitMapSet;

Review Comment:
   why have this boolean instead of set `_nullBitMap` to null and construct in `getNullBitmap` method? 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggFunctionQueryContext.java:
##########
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+/**
+ * The <code>AggFunctionQueryContext</code> class contains extracted details from QueryContext that can be used for
+ * Aggregation Functions.
+ */
+public class AggFunctionQueryContext {
+  private boolean _isNullHandlingEnabled;

Review Comment:
   Make those member variables final?



##########
pinot-core/src/main/java/org/apache/pinot/core/common/IntermediateStageBlockValSet.java:
##########
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common;
+
+import java.math.BigDecimal;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * In the multistage engine, the leaf stage servers process the data in columnar fashion. By the time the
+ * intermediate stage receives the projected column, they are converted to a row based format. This class provides
+ * the capability to convert the row based represenation into blocks so that they can be used to process
+ * aggregations.
+ * TODO: Support MV
+ */
+public class IntermediateStageBlockValSet implements BlockValSet {
+  private final FieldSpec.DataType _dataType;
+  private final PinotDataType _pinotDataType;
+  private final List<Object> _values;
+  private final RoaringBitmap _nullBitMap;
+  private boolean _nullBitMapSet;
+
+  public IntermediateStageBlockValSet(DataSchema.ColumnDataType columnDataType, List<Object> values) {

Review Comment:
   +1 to what @walterddr mentioned.
   I think we can have two constructors:
   1. `IntermediateStageBlockValSet(DataSchema.ColumnDataType columnDataType, List<Object> values)`, which is the current setup, that is a list of boxed java object, so we have the solution to solve all the scenarios both simple and complex data types. 
   2. Another constructor could be: `IntermediateStageBlockValSet(DataSchema.ColumnDataType columnDataType, Object values)`, in this case, values is instance of `int[]` when ColumnDataType is int.
   
   Agreed with you that this could be further optimization, which we can add TODO here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1248016907


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -268,4 +269,381 @@ private static Object[] extractRowFromDataBlock(DataBlock dataBlock, int rowId,
     }
     return row;
   }
+
+  /**
+   * Given a datablock and the column index, extracts the integer values for the column. Prefer using this function over
+   * extractRowFromDatablock if the desired datatype is known to prevent autoboxing to Object and later unboxing to the
+   * desired type.
+   *
+   * @return int array of values in the column
+   */
+  public static int[] extractIntRowsForColumn(DataBlock dataBlock, int columnIndex) {

Review Comment:
   1. name is confusing suggest
   ```suggestion
     public static int[] extractIntValuesForColumn(DataBlock dataBlock, int columnIndex) {
   ```
   2. (stretch/follow-up) dataBlock can be either ROW-based or COLUMNAR-based so check dataBlock type first is needed later 



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -268,4 +269,381 @@ private static Object[] extractRowFromDataBlock(DataBlock dataBlock, int rowId,
     }
     return row;
   }
+
+  /**
+   * Given a datablock and the column index, extracts the integer values for the column. Prefer using this function over
+   * extractRowFromDatablock if the desired datatype is known to prevent autoboxing to Object and later unboxing to the
+   * desired type.
+   *
+   * @return int array of values in the column
+   */
+  public static int[] extractIntRowsForColumn(DataBlock dataBlock, int columnIndex) {
+    DataSchema dataSchema = dataBlock.getDataSchema();
+    DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+    // Get null bitmap for the column.
+    RoaringBitmap nullBitmap = extractNullBitmaps(dataBlock)[columnIndex];
+    int numRows = dataBlock.getNumberOfRows();
+
+    int[] rows = new int[numRows];
+    switch (columnDataTypes[columnIndex]) {
+      case INT:
+      case BOOLEAN:
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          if (nullBitmap != null && nullBitmap.contains(rowId)) {
+            continue;
+          }

Review Comment:
   nit: consider switching from a per-row nullbitmap check to a global switch-case



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java:
##########
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.IntermediateStageBlockValSet;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * Class that executes the group by aggregations for the multistage AggregateOperator.
+ */
+public class MultistageGroupByExecutor {
+  private final NewAggregateOperator.Mode _mode;
+  // The identifier operands for the aggregation function only store the column name. This map contains mapping
+  // between column name to their index which is used in v2 engine.
+  private final Map<String, Integer> _colNameToIndexMap;
+
+  private final List<ExpressionContext> _groupSet;
+  private final AggregationFunction[] _aggFunctions;
+
+  // Group By Result holders for each mode
+  private final GroupByResultHolder[] _aggregateResultHolders;
+  private final Map<Integer, Object[]> _mergeResultHolder;
+  private final List<Object[]> _finalResultHolder;
+
+  // Mapping from the row-key to a zero based integer index. This is used when we invoke the v1 aggregation functions
+  // because they use the zero based integer indexes to store results.
+  private int _groupId = 0;
+  private Map<Key, Integer> _groupKeyToIdMap;
+
+  // Mapping from the group by row-key to the values in the row which form the key. Used to fetch the actual row
+  // values when populating the result.
+  private final Map<Key, Object[]> _groupByKeyHolder;
+
+  public MultistageGroupByExecutor(List<ExpressionContext> groupByExpr, AggregationFunction[] aggFunctions,
+      NewAggregateOperator.Mode mode, Map<String, Integer> colNameToIndexMap) {
+    _mode = mode;
+    _colNameToIndexMap = colNameToIndexMap;
+    _groupSet = groupByExpr;
+    _aggFunctions = aggFunctions;
+
+    _aggregateResultHolders = new GroupByResultHolder[_aggFunctions.length];
+    _mergeResultHolder = new HashMap<>();
+    _finalResultHolder = new ArrayList<>();
+
+    _groupKeyToIdMap = new HashMap<>();
+    _groupByKeyHolder = new HashMap<>();
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      _aggregateResultHolders[i] =
+          _aggFunctions[i].createGroupByResultHolder(InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+              InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+    }
+  }
+
+  /**
+   * Performs group-by aggregation for the data in the block.
+   */
+  public void processBlock(TransferableBlock block, DataSchema inputDataSchema) {
+    if (_mode.equals(NewAggregateOperator.Mode.AGGREGATE)) {
+      processAggregate(block, inputDataSchema);
+    } else if (_mode.equals(NewAggregateOperator.Mode.MERGE)) {
+      processMerge(block);
+    } else if (_mode.equals(NewAggregateOperator.Mode.EXTRACT_RESULT)) {
+      collectResult(block);
+    }
+  }
+
+  /**
+   * Fetches the result.
+   */
+  public List<Object[]> getResult() {
+    List<Object[]> rows = new ArrayList<>();
+
+    if (_mode.equals(NewAggregateOperator.Mode.EXTRACT_RESULT)) {
+      return extractFinalGroupByResult();
+    }
+
+    // If the mode is MERGE or AGGREGATE, the groupby keys are already collected in _groupByKeyHolder by virtue of
+    // generating the row keys.
+    for (Map.Entry<Key, Object[]> e : _groupByKeyHolder.entrySet()) {
+      int numCols = _groupSet.size() + _aggFunctions.length;
+      Object[] row = new Object[numCols];
+      Object[] keyElements = e.getValue();
+      System.arraycopy(keyElements, 0, row, 0, keyElements.length);
+
+      for (int i = 0; i < _aggFunctions.length; i++) {
+        int index = i + _groupSet.size();
+        int groupId = _groupKeyToIdMap.get(e.getKey());
+        if (_mode.equals(NewAggregateOperator.Mode.MERGE)) {
+          row[index] = _mergeResultHolder.get(groupId)[i];
+        } else if (_mode.equals(NewAggregateOperator.Mode.AGGREGATE)) {
+          row[index] = _aggFunctions[i].extractGroupByResult(_aggregateResultHolders[i], groupId);
+        }
+      }
+
+      rows.add(row);
+    }
+
+    return rows;
+  }
+
+  private List<Object[]> extractFinalGroupByResult() {
+    List<Object[]> rows = new ArrayList<>();
+    for (Object[] resultRow : _finalResultHolder) {
+      int numCols = _groupSet.size() + _aggFunctions.length;
+      Object[] row = new Object[numCols];
+      System.arraycopy(resultRow, 0, row, 0, _groupSet.size());
+
+      for (int i = 0; i < _aggFunctions.length; i++) {
+        int aggIdx = i + _groupSet.size();
+        Comparable result = _aggFunctions[i].extractFinalResult(resultRow[aggIdx]);
+        row[aggIdx] = result == null ? null : _aggFunctions[i].getFinalResultColumnType().convert(result);
+      }
+
+      rows.add(row);
+    }
+    return rows;
+  }
+
+  private void processAggregate(TransferableBlock block, DataSchema inputDataSchema) {
+    int[] intKeys = generateGroupByKeys(block.getContainer());
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      AggregationFunction aggregationFunction = _aggFunctions[i];
+      Map<ExpressionContext, BlockValSet> blockValSetMap =
+          getBlockValSetMap(aggregationFunction, block, inputDataSchema);
+      GroupByResultHolder groupByResultHolder = _aggregateResultHolders[i];
+      groupByResultHolder.ensureCapacity(_groupKeyToIdMap.size());
+      aggregationFunction.aggregateGroupBySV(block.getNumRows(), intKeys, groupByResultHolder, blockValSetMap);
+    }
+  }
+
+  private void processMerge(TransferableBlock block) {
+    List<Object[]> container = block.getContainer();
+    int[] intKeys = generateGroupByKeys(container);
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      List<ExpressionContext> expressions = _aggFunctions[i].getInputExpressions();
+
+      for (int j = 0; j < container.size(); j++) {
+        Object[] row = container.get(j);
+        int rowKey = intKeys[j];
+        if (!_mergeResultHolder.containsKey(rowKey)) {
+          _mergeResultHolder.put(rowKey, new Object[_aggFunctions.length]);
+        }
+        Object intermediateResultToMerge = extractValueFromRow(row, expressions);
+        Object mergedIntermediateResult = _mergeResultHolder.get(rowKey)[i];
+
+        // Not all V1 aggregation functions have null-handling. So handle null values and call merge only if necessary.
+        if (intermediateResultToMerge == null) {
+          continue;
+        }
+        if (mergedIntermediateResult == null) {
+          _mergeResultHolder.get(rowKey)[i] = intermediateResultToMerge;
+          continue;
+        }
+
+        _mergeResultHolder.get(rowKey)[i] = _aggFunctions[i].merge(intermediateResultToMerge, mergedIntermediateResult);
+      }
+    }
+  }
+
+  private void collectResult(TransferableBlock block) {
+    List<Object[]> container = block.getContainer();
+    for (Object[] row : container) {
+      assert row.length == _groupSet.size() + _aggFunctions.length;
+      _finalResultHolder.add(row);
+    }
+  }
+
+  /**
+   * Creates the group by key for each row. Converts the key into a 0-index based int value that can be used by
+   * GroupByAggregationResultHolders used in v1 aggregations.
+   * <p>
+   * Returns the int key for each row.
+   */
+  private int[] generateGroupByKeys(List<Object[]> rows) {
+    int[] rowKeys = new int[rows.size()];
+    int numGroups = _groupSet.size();
+
+    for (int i = 0; i < rows.size(); i++) {
+      Object[] row = rows.get(i);
+
+      Object[] keyElements = new Object[numGroups];
+      for (int j = 0; j < numGroups; j++) {
+        String colName = _groupSet.get(j).getIdentifier();
+        int colIndex = _colNameToIndexMap.get(colName);
+        keyElements[j] = row[colIndex];
+      }
+
+      Key rowKey = new Key(keyElements);
+      _groupByKeyHolder.put(rowKey, rowKey.getValues());
+      if (!_groupKeyToIdMap.containsKey(rowKey)) {
+        _groupKeyToIdMap.put(rowKey, _groupId);
+        ++_groupId;
+      }
+      rowKeys[i] = _groupKeyToIdMap.get(rowKey);
+    }
+
+    return rowKeys;
+  }
+
+  private Map<ExpressionContext, BlockValSet> getBlockValSetMap(AggregationFunction aggFunction,
+      TransferableBlock block, DataSchema inputDataSchema) {
+    List<ExpressionContext> expressions = aggFunction.getInputExpressions();
+    int numExpressions = expressions.size();
+    if (numExpressions == 0) {
+      return Collections.emptyMap();
+    }
+
+    Preconditions.checkState(numExpressions == 1, "Cannot handle more than one identifier in aggregation function.");
+    ExpressionContext expression = expressions.get(0);
+    Preconditions.checkState(expression.getType().equals(ExpressionContext.Type.IDENTIFIER));
+    int index = _colNameToIndexMap.get(expression.getIdentifier());
+
+    DataSchema.ColumnDataType dataType = inputDataSchema.getColumnDataType(index);
+    Preconditions.checkState(block.getType().equals(DataBlock.Type.ROW), "Datablock type is not ROW");
+    return Collections.singletonMap(expression,
+        new IntermediateStageBlockValSet(dataType, block.getDataBlock(), index));

Review Comment:
   same here. consider which is the format we are going to use. the serialized or the unserialized. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageAggregationExecutor.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.IntermediateStageBlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+/**
+ * Class that executes all aggregation functions (without group-bys) for the multistage AggregateOperator.
+ */
+public class MultistageAggregationExecutor {
+  private final NewAggregateOperator.Mode _mode;
+  // The identifier operands for the aggregation function only store the column name. This map contains mapping
+  // from column name to their index.
+  private final Map<String, Integer> _colNameToIndexMap;
+
+  private final AggregationFunction[] _aggFunctions;
+
+  // Result holders for each mode.
+  private final AggregationResultHolder[] _aggregateResultHolder;
+  private final Object[] _mergeResultHolder;
+  private final Object[] _finalResultHolder;
+
+  public MultistageAggregationExecutor(AggregationFunction[] aggFunctions, NewAggregateOperator.Mode mode,
+      Map<String, Integer> colNameToIndexMap) {
+    _aggFunctions = aggFunctions;
+    _mode = mode;
+    _colNameToIndexMap = colNameToIndexMap;
+
+    _aggregateResultHolder = new AggregationResultHolder[aggFunctions.length];
+    _mergeResultHolder = new Object[aggFunctions.length];
+    _finalResultHolder = new Object[aggFunctions.length];
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      _aggregateResultHolder[i] = _aggFunctions[i].createAggregationResultHolder();
+    }
+  }
+
+  /**
+   * Performs aggregation for the data in the block.
+   */
+  public void processBlock(TransferableBlock block, DataSchema inputDataSchema) {
+    if (_mode.equals(NewAggregateOperator.Mode.AGGREGATE)) {
+      processAggregate(block, inputDataSchema);
+    } else if (_mode.equals(NewAggregateOperator.Mode.MERGE)) {
+      processMerge(block);
+    } else if (_mode.equals(NewAggregateOperator.Mode.EXTRACT_RESULT)) {
+      collectResult(block);
+    }
+  }
+
+  /**
+   * Fetches the result.
+   */
+  public List<Object[]> getResult() {
+    List<Object[]> rows = new ArrayList<>();
+    Object[] row = new Object[_aggFunctions.length];
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      AggregationFunction aggFunction = _aggFunctions[i];
+      if (_mode.equals(NewAggregateOperator.Mode.MERGE)) {
+        row[i] = _mergeResultHolder[i];
+      } else if (_mode.equals(NewAggregateOperator.Mode.AGGREGATE)) {
+        row[i] = aggFunction.extractAggregationResult(_aggregateResultHolder[i]);
+      } else {
+        assert _mode.equals(NewAggregateOperator.Mode.EXTRACT_RESULT);
+        Comparable result = aggFunction.extractFinalResult(_finalResultHolder[i]);
+        row[i] = result == null ? null : aggFunction.getFinalResultColumnType().convert(result);
+      }
+    }
+    rows.add(row);
+    return rows;
+  }
+
+  /**
+   * @return an empty agg result block for non-group-by aggregation.
+   */
+  public Object[] constructEmptyAggResultRow() {
+    Object[] row = new Object[_aggFunctions.length];
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      AggregationFunction aggFunction = _aggFunctions[i];
+      row[i] = aggFunction.extractAggregationResult(aggFunction.createAggregationResultHolder());
+    }
+    return row;
+  }
+
+  private void processAggregate(TransferableBlock block, DataSchema inputDataSchema) {
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      AggregationFunction aggregationFunction = _aggFunctions[i];
+      Map<ExpressionContext, BlockValSet> blockValSetMap =
+          getBlockValSetMap(aggregationFunction, block, inputDataSchema);
+      aggregationFunction.aggregate(block.getNumRows(), _aggregateResultHolder[i], blockValSetMap);
+    }
+  }
+
+  private void processMerge(TransferableBlock block) {
+    List<Object[]> container = block.getContainer();
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      List<ExpressionContext> expressions = _aggFunctions[i].getInputExpressions();
+      for (Object[] row : container) {
+        Object intermediateResultToMerge = extractValueFromRow(row, expressions);
+        Object mergedIntermediateResult = _mergeResultHolder[i];
+
+        // Not all V1 aggregation functions have null-handling logic. Handle null values before calling merge.
+        if (intermediateResultToMerge == null) {
+          continue;
+        }
+        if (mergedIntermediateResult == null) {
+          _mergeResultHolder[i] = intermediateResultToMerge;
+          continue;
+        }
+
+        _mergeResultHolder[i] = _aggFunctions[i].merge(intermediateResultToMerge, mergedIntermediateResult);
+      }
+    }
+  }
+
+  private void collectResult(TransferableBlock block) {
+    List<Object[]> container = block.getContainer();
+    assert container.size() == 1;
+    Object[] row = container.get(0);
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      List<ExpressionContext> expressions = _aggFunctions[i].getInputExpressions();
+      _finalResultHolder[i] = extractValueFromRow(row, expressions);
+    }
+  }
+
+  private Map<ExpressionContext, BlockValSet> getBlockValSetMap(AggregationFunction aggFunction,
+      TransferableBlock block, DataSchema inputDataSchema) {
+    List<ExpressionContext> expressions = aggFunction.getInputExpressions();
+    int numExpressions = expressions.size();
+    if (numExpressions == 0) {
+      return Collections.emptyMap();
+    }
+
+    Preconditions.checkState(numExpressions == 1, "Cannot handle more than one identifier in aggregation function.");
+    ExpressionContext expression = expressions.get(0);
+    Preconditions.checkState(expression.getType().equals(ExpressionContext.Type.IDENTIFIER));
+    int index = _colNameToIndexMap.get(expression.getIdentifier());
+
+    DataSchema.ColumnDataType dataType = inputDataSchema.getColumnDataType(index);
+    Preconditions.checkState(block.getType().equals(DataBlock.Type.ROW), "Datablock type is not ROW");
+    return Collections.singletonMap(expression,
+        new IntermediateStageBlockValSet(dataType, block.getDataBlock(), index));

Review Comment:
   `block.getDataBlock()` is lazy. e.g. if the previous block is not mailbox received, then the getDataBlock() will actually convert the List<Object[]> unserialized format into a serialized format of BaseDataBlock, then convert it back to the column value primitive array. this is very inefficient. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] abhioncbr commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "abhioncbr (via GitHub)" <gi...@apache.org>.
abhioncbr commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1219879274


##########
pinot-core/src/main/java/org/apache/pinot/core/common/IntermediateStageBlockValSet.java:
##########
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common;
+
+import java.math.BigDecimal;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * In the multistage engine, the leaf stage servers process the data in columnar fashion. By the time the
+ * intermediate stage receives the projected column, they are converted to a row based format. This class provides
+ * the capability to convert the row based represenation into blocks so that they can be used to process
+ * aggregations.
+ * TODO: Support MV
+ */
+public class IntermediateStageBlockValSet implements BlockValSet {
+  private final FieldSpec.DataType _dataType;
+  private final PinotDataType _pinotDataType;
+  private final List<Object> _values;
+  private final RoaringBitmap _nullBitMap;
+  private boolean _nullBitMapSet;
+
+  public IntermediateStageBlockValSet(DataSchema.ColumnDataType columnDataType, List<Object> values) {
+    _dataType = columnDataType.toDataType();
+    _pinotDataType = PinotDataType.getPinotDataTypeForExecution(columnDataType);
+    _values = values;
+    _nullBitMap = new RoaringBitmap();
+  }
+
+  /**
+   * Returns a bitmap of indices where null values are found.
+   */
+  @Nullable
+  @Override
+  public RoaringBitmap getNullBitmap() {
+    if (!_nullBitMapSet) {
+      if (_values == null) {
+        return _nullBitMap;
+      }
+
+      for (int i = 0; i < _values.size(); i++) {
+        if (_values.get(i) == null) {
+          _nullBitMap.add(i);
+        }
+      }
+      _nullBitMapSet = true;
+    }
+    return _nullBitMap;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return _dataType;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    // TODO: Needs to be changed when we start supporting MV in multistage
+    return true;
+  }
+
+  @Nullable
+  @Override
+  public Dictionary getDictionary() {
+    return null;
+  }
+
+  @Override
+  public int[] getDictionaryIdsSV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int[] getIntValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+
+    int length = _values.size();
+    int[] values = new int[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toInt(value);
+      }
+    }
+    return values;
+  }
+
+  @Override
+  public long[] getLongValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+
+    int length = _values.size();
+    long[] values = new long[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toLong(value);
+      }
+    }
+
+    return values;
+  }
+
+  @Override
+  public float[] getFloatValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+    int length = _values.size();
+    float[] values = new float[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toFloat(value);
+      }
+    }
+
+    return values;
+  }
+
+  @Override
+  public double[] getDoubleValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+    int length = _values.size();
+    double[] values = new double[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toDouble(value);
+      }
+    }
+
+    return values;
+  }
+
+  @Override
+  public BigDecimal[] getBigDecimalValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+    int length = _values.size();
+    BigDecimal[] values = new BigDecimal[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toBigDecimal(value);
+      }
+    }
+    return values;
+  }
+
+  @Override
+  public String[] getStringValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+    int length = _values.size();
+    String[] values = new String[length];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toString(value);
+      }
+    }
+    return values;
+  }
+
+  @Override
+  public byte[][] getBytesValuesSV() {
+    if (_values == null) {
+      return null;
+    }
+    int length = _values.size();
+    byte[][] values = new byte[length][];
+    for (int i = 0; i < length; i++) {
+      Object value = _values.get(i);
+      if (value != null) {
+        values[i] = _pinotDataType.toBytes(value);
+      }
+    }
+    return values;
+  }
+
+  @Override
+  public int[][] getDictionaryIdsMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int[][] getIntValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long[][] getLongValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public float[][] getFloatValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public double[][] getDoubleValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String[][] getStringValuesMV() {
+    throw new UnsupportedOperationException();

Review Comment:
   Hi, 
   Thanks for putting this work. I was waiting for this as I think it helps me with supporting the [json scalar functions](https://github.com/apache/pinot/pull/10790) in multistage. 
   
   Just a question, why we are not supporting MV types? With the MV types support we can handle scalar functions like `string split` and many other. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] vvivekiyer commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "vvivekiyer (via GitHub)" <gi...@apache.org>.
vvivekiyer commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1242589650


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NewAggregateOperator.java:
##########
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.IntermediateStageBlockValSet;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.function.AggFunctionQueryContext;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+
+
+/**
+ *
+ * AggregateOperator is used to aggregate values over a set of group by keys.
+ * Output data will be in the format of [group by key, aggregate result1, ... aggregate resultN]
+ * Currently, we only support the following aggregation functions:
+ * 1. SUM
+ * 2. COUNT
+ * 3. MIN
+ * 4. MAX
+ * 5. DistinctCount and Count(Distinct)
+ * 6.AVG
+ * 7. FourthMoment
+ * 8. BoolAnd and BoolOr
+ *
+ * When the list of aggregation calls is empty, this class is used to calculate distinct result based on group by keys.
+ * In this case, the input can be any type.
+ *
+ * If the list of aggregation calls is not empty, the input of aggregation has to be a number.
+ *
+ * Note: This class performs aggregation over the double value of input.
+ * If the input is single value, the output type will be input type. Otherwise, the output type will be double.
+ */
+// TODO: Rename to AggregateOperator when merging Planner support.
+public class NewAggregateOperator extends MultiStageOperator {
+  private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+
+  private final MultiStageOperator _inputOperator;
+  private final DataSchema _resultSchema;
+
+  // Aggregation containers
+  private final AggregationFunction[] _aggregationFunctions;
+  private final AggregationResultHolder[] _aggregationResultHolders;
+
+  // Group By containers
+  private final List<ExpressionContext> _groupSet;
+  private final GroupByResultHolder[] _groupByResultHolders;
+  // Mapping from the group by row-key to the values in the row.
+  private final Map<Key, Object[]> _groupByKeyHolder;
+  // groupId and groupIdMap are used to create a 0-based index for group-by keys instead of using the hash value
+  // directly - similar to GroupByKeyGenerator. This is useful when we invoke the aggregation functions because they
+  // use the group by key indexes to store results.
+  private int _groupId = 0;
+  private Map<Integer, Integer> _groupIdMap;
+
+  private TransferableBlock _upstreamErrorBlock;
+  private boolean _readyToConstruct;
+  private boolean _hasReturnedAggregateBlock;
+
+  // Denotes whether this aggregation operator should merge intermediate results.
+  private boolean _isMergeAggregation;
+
+  // TODO: refactor Pinot Reducer code to support the intermediate stage agg operator.
+  // aggCalls has to be a list of FunctionCall and cannot be null
+  // groupSet has to be a list of InputRef and cannot be null
+  // TODO: Add these two checks when we confirm we can handle error in upstream ctor call.
+
+  @VisibleForTesting
+  public NewAggregateOperator(OpChainExecutionContext context, MultiStageOperator inputOperator,
+      DataSchema resultSchema, List<FunctionContext> functionContexts, List<ExpressionContext> groupSet,
+      boolean isMergeAggregation, boolean isSingleStageAggregation) {
+    super(context);
+    _inputOperator = inputOperator;
+    _resultSchema = resultSchema;
+
+    _groupSet = groupSet;
+    _groupByKeyHolder = new HashMap<>();
+    _groupIdMap = new HashMap<>();
+    _aggregationFunctions = new AggregationFunction[functionContexts.size()];
+    _aggregationResultHolders = new AggregationResultHolder[functionContexts.size()];
+    _groupByResultHolders = new GroupByResultHolder[functionContexts.size()];
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      _aggregationFunctions[i] = AggregationFunctionFactory.getAggregationFunction(functionContexts.get(i),
+          new AggFunctionQueryContext(true));
+      _aggregationResultHolders[i] = _aggregationFunctions[i].createAggregationResultHolder();
+      _groupByResultHolders[i] = _aggregationFunctions[i].createGroupByResultHolder(
+          InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+          InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+    }
+
+    _upstreamErrorBlock = null;
+    _readyToConstruct = false;
+    _hasReturnedAggregateBlock = false;
+    _isMergeAggregation = isMergeAggregation && !isSingleStageAggregation;
+  }
+
+  @Override
+  public List<MultiStageOperator> getChildOperators() {
+    return ImmutableList.of(_inputOperator);
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected TransferableBlock getNextBlock() {
+    try {
+      if (!_readyToConstruct && !consumeInputBlocks()) {
+        return TransferableBlockUtils.getNoOpTransferableBlock();
+      }
+
+      if (_upstreamErrorBlock != null) {
+        return _upstreamErrorBlock;
+      }
+
+      if (!_hasReturnedAggregateBlock) {
+        return produceAggregatedBlock();
+      } else {
+        // TODO: Move to close call.
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      }
+    } catch (Exception e) {
+      return TransferableBlockUtils.getErrorTransferableBlock(e);
+    }
+  }
+
+  private TransferableBlock produceAggregatedBlock() {
+    List<Object[]> rows = _groupSet.isEmpty() ? collectAggregationResultRows() : collectGroupByResultRows();
+
+    _hasReturnedAggregateBlock = true;
+    if (rows.size() == 0) {
+      if (_groupSet.size() == 0) {
+        return constructEmptyAggResultBlock();
+      } else {
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      }
+    } else {
+      return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
+    }
+  }
+
+  private List<Object[]> collectAggregationResultRows() {
+    List<Object[]> rows = new ArrayList<>();
+
+    Object[] row = new Object[_aggregationFunctions.length];
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      AggregationFunction aggregationFunction = _aggregationFunctions[i];
+      row[i] = aggregationFunction.extractAggregationResult(_aggregationResultHolders[i]);
+    }
+    rows.add(row);
+    return rows;
+  }
+
+  private List<Object[]> collectGroupByResultRows() {
+    List<Object[]> rows = new ArrayList<>();
+    for (Map.Entry<Key, Object[]> e : _groupByKeyHolder.entrySet()) {
+      Object[] row = new Object[_aggregationFunctions.length + _groupSet.size()];
+      Object[] keyElements = e.getValue();
+      System.arraycopy(keyElements, 0, row, 0, keyElements.length);
+      for (int i = 0; i < _aggregationFunctions.length; i++) {
+        row[i + _groupSet.size()] = _aggregationFunctions[i].extractGroupByResult(_groupByResultHolders[i],
+            _groupIdMap.get(e.getKey().hashCode()));

Review Comment:
   Use
   Key -> Object array  (Intermediate results for various aggregations)
   
   
   Also look at reusing IndexTable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on PR #10845:
URL: https://github.com/apache/pinot/pull/10845#issuecomment-1615988294

   merging this one first. and continue if any follow up is needed on #10846 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1248121185


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseBooleanAggregationFunction.java:
##########
@@ -233,7 +233,8 @@ public Integer merge(Integer intermediateResult1, Integer intermediateResult2) {
 
   @Override
   public DataSchema.ColumnDataType getIntermediateResultColumnType() {
-    return DataSchema.ColumnDataType.BOOLEAN;
+    // The intermediateResult type is integer. The final result is converted to a boolean.
+    return DataSchema.ColumnDataType.INT;

Review Comment:
   this could cause backward-compatibility issue when upgrading. let's make sure this doesn't cause issues (where is this function being used in v1) 
   
   in v2 as long as the expected data type conforms with calcite we are good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] somandal commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "somandal (via GitHub)" <gi...@apache.org>.
somandal commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1230366679


##########
pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintStrategyTable.java:
##########
@@ -29,6 +29,9 @@
 public class PinotHintStrategyTable {
   public static final String INTERNAL_AGG_INTERMEDIATE_STAGE = "aggIntermediateStage";
   public static final String INTERNAL_AGG_FINAL_STAGE = "aggFinalStage";
+  // Hint to denote that aggregation is to be run as a single stage rather than split into an intermediate and a final
+  // stage
+  public static final String INTERNAL_IS_SINGLE_STAGE_AGG = "isSingleStageAgg";

Review Comment:
   @walterddr this will make more sense once you look at this PR: https://github.com/apache/pinot/pull/10846
   
   We cannot leverage the final stage hint because we use that to decide when to use the singleton exchange for aggregations without group-by (to perform global aggregation step in final). For both of the aggregation hints, skip leaf aggregation, and partition keys = group by keys, we run the aggregation as a single stage and must treat the incoming records as the intermediate stage and not the final stage. That's why we decided to expose this new hint for these scenarios. We also cannot change the final stage hint to intermediate stage hint for single stage aggregations because of the need for the singleton exchange.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1248017898


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -268,4 +269,381 @@ private static Object[] extractRowFromDataBlock(DataBlock dataBlock, int rowId,
     }
     return row;
   }
+
+  /**
+   * Given a datablock and the column index, extracts the integer values for the column. Prefer using this function over
+   * extractRowFromDatablock if the desired datatype is known to prevent autoboxing to Object and later unboxing to the
+   * desired type.
+   *
+   * @return int array of values in the column
+   */
+  public static int[] extractIntRowsForColumn(DataBlock dataBlock, int columnIndex) {
+    DataSchema dataSchema = dataBlock.getDataSchema();
+    DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+    // Get null bitmap for the column.
+    RoaringBitmap nullBitmap = extractNullBitmaps(dataBlock)[columnIndex];
+    int numRows = dataBlock.getNumberOfRows();
+
+    int[] rows = new int[numRows];
+    switch (columnDataTypes[columnIndex]) {
+      case INT:
+      case BOOLEAN:
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          if (nullBitmap != null && nullBitmap.contains(rowId)) {
+            continue;
+          }

Review Comment:
   nit: consider switching from a per-row nullbitmap check to a global if-else then follow by 2 switch-case, one dealing with null and one without



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] vvivekiyer commented on pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "vvivekiyer (via GitHub)" <gi...@apache.org>.
vvivekiyer commented on PR #10845:
URL: https://github.com/apache/pinot/pull/10845#issuecomment-1614011227

   - Incorporated the review comments from @Jackie-Jiang  and @walterddr 
   - Updated the steps to merge this PR and the planner PR https://github.com/apache/pinot/pull/10846 in the PR Description.
   - Updated the future work in the PR Description
   
   This PR is ready to be reviewed @walterddr @Jackie-Jiang @kishoreg @xiangfu0 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] somandal commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "somandal (via GitHub)" <gi...@apache.org>.
somandal commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1230366679


##########
pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintStrategyTable.java:
##########
@@ -29,6 +29,9 @@
 public class PinotHintStrategyTable {
   public static final String INTERNAL_AGG_INTERMEDIATE_STAGE = "aggIntermediateStage";
   public static final String INTERNAL_AGG_FINAL_STAGE = "aggFinalStage";
+  // Hint to denote that aggregation is to be run as a single stage rather than split into an intermediate and a final
+  // stage
+  public static final String INTERNAL_IS_SINGLE_STAGE_AGG = "isSingleStageAgg";

Review Comment:
   @walterddr this will make more sense once you look at this PR: https://github.com/apache/pinot/pull/10846
   We cannot leverage the final stage hint because we need to differentiate between the intermediate stage actually running in the v1 engine as a leaf stage vs. intermediate stage running in the v2 engine after performing operations such as JOIN (agg on top of join vs. agg in leaf)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] vvivekiyer commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "vvivekiyer (via GitHub)" <gi...@apache.org>.
vvivekiyer commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1237801796


##########
pinot-common/src/main/java/org/apache/pinot/common/request/context/IdentifierContext.java:
##########
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.request.context;
+
+import java.util.Objects;
+import org.apache.pinot.common.utils.DataSchema;
+
+/**
+ * The {@code IdentifierContext} class represents a Identifer in the query.
+ * <p> This class includes information that about an identifier. The v1 engine uses column names for identifiers. The
+ * multistage query engine uses ordinals to distinctly track each identifier. So this context is set up to support both
+ * v1 and multistage engine identifiers.
+ */
+public class IdentifierContext {

Review Comment:
   V1 engine uses the actual column name and uses it to fetch the values.
   In V2 engine, we need the ordinal info (index) to fetch the values from container rows. 
   
   As you suggested, I did think about constructing a custom name like "$colname_$ordinal" to use in Identifier. But this would involve using an expensive string operations to extract the ordinal information from the identifier name (in `extractIntermediateValue`) and hence decided against it.  Please let me know your thoughts.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NewAggregateOperator.java:
##########
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.IntermediateStageBlockValSet;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.function.AggFunctionQueryContext;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+
+
+/**
+ *
+ * AggregateOperator is used to aggregate values over a set of group by keys.
+ * Output data will be in the format of [group by key, aggregate result1, ... aggregate resultN]
+ * Currently, we only support the following aggregation functions:
+ * 1. SUM
+ * 2. COUNT
+ * 3. MIN
+ * 4. MAX
+ * 5. DistinctCount and Count(Distinct)
+ * 6.AVG
+ * 7. FourthMoment
+ * 8. BoolAnd and BoolOr
+ *
+ * When the list of aggregation calls is empty, this class is used to calculate distinct result based on group by keys.
+ * In this case, the input can be any type.
+ *
+ * If the list of aggregation calls is not empty, the input of aggregation has to be a number.
+ *
+ * Note: This class performs aggregation over the double value of input.
+ * If the input is single value, the output type will be input type. Otherwise, the output type will be double.
+ */
+// TODO: Rename to AggregateOperator when merging Planner support.
+public class NewAggregateOperator extends MultiStageOperator {
+  private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+
+  private final MultiStageOperator _inputOperator;
+  private final DataSchema _resultSchema;
+
+  // Aggregation containers
+  private final AggregationFunction[] _aggregationFunctions;
+  private final AggregationResultHolder[] _aggregationResultHolders;
+
+  // Group By containers
+  private final List<ExpressionContext> _groupSet;
+  private final GroupByResultHolder[] _groupByResultHolders;
+  // Mapping from the group by row-key to the values in the row.
+  private final Map<Key, Object[]> _groupByKeyHolder;
+  // groupId and groupIdMap are used to create a 0-based index for group-by keys instead of using the hash value
+  // directly - similar to GroupByKeyGenerator. This is useful when we invoke the aggregation functions because they
+  // use the group by key indexes to store results.
+  private int _groupId = 0;
+  private Map<Integer, Integer> _groupIdMap;
+
+  private TransferableBlock _upstreamErrorBlock;
+  private boolean _readyToConstruct;
+  private boolean _hasReturnedAggregateBlock;
+
+  // Denotes whether this aggregation operator should merge intermediate results.
+  private boolean _isMergeAggregation;
+
+  // TODO: refactor Pinot Reducer code to support the intermediate stage agg operator.
+  // aggCalls has to be a list of FunctionCall and cannot be null
+  // groupSet has to be a list of InputRef and cannot be null
+  // TODO: Add these two checks when we confirm we can handle error in upstream ctor call.
+
+  @VisibleForTesting
+  public NewAggregateOperator(OpChainExecutionContext context, MultiStageOperator inputOperator,
+      DataSchema resultSchema, List<FunctionContext> functionContexts, List<ExpressionContext> groupSet,
+      boolean isMergeAggregation, boolean isSingleStageAggregation) {
+    super(context);
+    _inputOperator = inputOperator;
+    _resultSchema = resultSchema;
+
+    _groupSet = groupSet;
+    _groupByKeyHolder = new HashMap<>();
+    _groupIdMap = new HashMap<>();
+    _aggregationFunctions = new AggregationFunction[functionContexts.size()];
+    _aggregationResultHolders = new AggregationResultHolder[functionContexts.size()];
+    _groupByResultHolders = new GroupByResultHolder[functionContexts.size()];
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      _aggregationFunctions[i] = AggregationFunctionFactory.getAggregationFunction(functionContexts.get(i),
+          new AggFunctionQueryContext(true));
+      _aggregationResultHolders[i] = _aggregationFunctions[i].createAggregationResultHolder();
+      _groupByResultHolders[i] = _aggregationFunctions[i].createGroupByResultHolder(
+          InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+          InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+    }
+
+    _upstreamErrorBlock = null;
+    _readyToConstruct = false;
+    _hasReturnedAggregateBlock = false;
+    _isMergeAggregation = isMergeAggregation && !isSingleStageAggregation;
+  }
+
+  @Override
+  public List<MultiStageOperator> getChildOperators() {
+    return ImmutableList.of(_inputOperator);
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected TransferableBlock getNextBlock() {
+    try {
+      if (!_readyToConstruct && !consumeInputBlocks()) {
+        return TransferableBlockUtils.getNoOpTransferableBlock();
+      }
+
+      if (_upstreamErrorBlock != null) {
+        return _upstreamErrorBlock;
+      }
+
+      if (!_hasReturnedAggregateBlock) {
+        return produceAggregatedBlock();
+      } else {
+        // TODO: Move to close call.
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      }
+    } catch (Exception e) {
+      return TransferableBlockUtils.getErrorTransferableBlock(e);
+    }
+  }
+
+  private TransferableBlock produceAggregatedBlock() {
+    List<Object[]> rows = _groupSet.isEmpty() ? collectAggregationResultRows() : collectGroupByResultRows();
+
+    _hasReturnedAggregateBlock = true;
+    if (rows.size() == 0) {
+      if (_groupSet.size() == 0) {
+        return constructEmptyAggResultBlock();
+      } else {
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      }
+    } else {
+      return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
+    }
+  }
+
+  private List<Object[]> collectAggregationResultRows() {
+    List<Object[]> rows = new ArrayList<>();
+
+    Object[] row = new Object[_aggregationFunctions.length];
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      AggregationFunction aggregationFunction = _aggregationFunctions[i];
+      row[i] = aggregationFunction.extractAggregationResult(_aggregationResultHolders[i]);
+    }
+    rows.add(row);
+    return rows;
+  }
+
+  private List<Object[]> collectGroupByResultRows() {
+    List<Object[]> rows = new ArrayList<>();
+    for (Map.Entry<Key, Object[]> e : _groupByKeyHolder.entrySet()) {
+      Object[] row = new Object[_aggregationFunctions.length + _groupSet.size()];
+      Object[] keyElements = e.getValue();
+      System.arraycopy(keyElements, 0, row, 0, keyElements.length);
+      for (int i = 0; i < _aggregationFunctions.length; i++) {
+        row[i + _groupSet.size()] = _aggregationFunctions[i].extractGroupByResult(_groupByResultHolders[i],
+            _groupIdMap.get(e.getKey().hashCode()));
+      }
+      rows.add(row);
+    }
+    return rows;
+  }
+
+  /**
+   * @return an empty agg result block for non-group-by aggregation.
+   */
+  private TransferableBlock constructEmptyAggResultBlock() {
+    Object[] row = new Object[_aggregationFunctions.length];
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      AggregationFunction aggFunction = _aggregationFunctions[i];
+      row[i] = aggFunction.extractAggregationResult(aggFunction.createAggregationResultHolder());
+    }
+    return new TransferableBlock(Collections.singletonList(row), _resultSchema, DataBlock.Type.ROW);
+  }
+
+  /**
+   * @return whether or not the operator is ready to move on (EOS or ERROR)
+   */
+  private boolean consumeInputBlocks() {
+    TransferableBlock block = _inputOperator.nextBlock();
+    while (!block.isNoOpBlock()) {
+      // setting upstream error block
+      if (block.isErrorBlock()) {
+        _upstreamErrorBlock = block;
+        return true;
+      } else if (block.isEndOfStreamBlock()) {
+        _readyToConstruct = true;
+        return true;
+      }
+
+      List<Object[]> container = block.getContainer();
+      if (_isMergeAggregation) {
+        mergeIntermediateValues(container);
+      } else {
+        aggregateValues(container);
+      }
+
+      block = _inputOperator.nextBlock();
+    }
+    return false;
+  }
+
+  private void mergeIntermediateValues(List<Object[]> container) {
+    if (_groupSet.isEmpty()) {
+      performMergeAggregation(container);
+    } else {
+      performMergeGroupBy(container);
+    }
+  }
+
+  private void performMergeAggregation(List<Object[]> container) {
+    // Simple aggregation function.
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      List<ExpressionContext> expressions = _aggregationFunctions[i].getInputExpressions();
+      for (Object[] row : container) {
+        Object intermediateResultToMerge = extractIntermediateValue(row, expressions);
+        _aggregationFunctions[i].mergeAndUpdateResultHolder(intermediateResultToMerge,
+            _aggregationResultHolders[i]);
+      }
+    }
+  }
+
+  private void performMergeGroupBy(List<Object[]> container) {
+    // Create group by keys for each row.
+    int[] intKeys = generateGroupByKeys(container);
+
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      GroupByResultHolder groupByResultHolder = _groupByResultHolders[i];
+      groupByResultHolder.ensureCapacity(_groupIdMap.size());
+      List<ExpressionContext> expressions = _aggregationFunctions[i].getInputExpressions();
+
+      for (int j = 0; j < container.size(); j++) {
+        Object[] row = container.get(j);
+        Object intermediateResultToMerge = extractIntermediateValue(row, expressions);
+        _aggregationFunctions[i].mergeAndUpdateResultHolder(intermediateResultToMerge, groupByResultHolder,
+            intKeys[j]);
+      }
+    }
+  }
+
+  Object extractIntermediateValue(Object[] row, List<ExpressionContext> expressions) {
+    // TODO: Add support to handle aggregation functions where:
+    //       1. The identifier need not be the first argument
+    //       2. There are more than one identifiers.
+    Preconditions.checkState(expressions.size() <= 1);
+    Preconditions.checkState(!expressions.isEmpty());
+    ExpressionContext expr = expressions.get(0);
+
+    Object result = expr.getType().equals(ExpressionContext.Type.IDENTIFIER) ? row[expr.getIdentifierIndex()]
+        : expr.getLiteral().getValue();
+    return result;
+  }
+
+  public void aggregateValues(List<Object[]> container) {
+    // Convert row to columnar representation
+    Map<Integer, List<Object>> columnValuesMap = new HashMap<>();

Review Comment:
   Answered below.



##########
pinot-core/src/main/java/org/apache/pinot/core/common/IntermediateStageBlockValSet.java:
##########
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common;
+
+import java.math.BigDecimal;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * In the multistage engine, the leaf stage servers process the data in columnar fashion. By the time the
+ * intermediate stage receives the projected column, they are converted to a row based format. This class provides
+ * the capability to convert the row based represenation into blocks so that they can be used to process
+ * aggregations.
+ * TODO: Support MV
+ */
+public class IntermediateStageBlockValSet implements BlockValSet {
+  private final FieldSpec.DataType _dataType;
+  private final PinotDataType _pinotDataType;
+  private final List<Object> _values;
+  private final RoaringBitmap _nullBitMap;
+  private boolean _nullBitMapSet;
+
+  public IntermediateStageBlockValSet(DataSchema.ColumnDataType columnDataType, List<Object> values) {

Review Comment:
   We just fetch the `Object` value from every row of container and pass it as a list of objects here. There's no boxing to Object that happens here. Am I missing something? 
   
   Are you suggesting that when we call `DataBlockUtils.extractRows()` we can avoid the boxing to Object as we already know the type and that we can use it here in `IntermediateStageBlockValSet`? If yes, this could be a great optimization that we could pick up orthogonal to this PR and that would also address your comment about using DataBlockUtils above.
   
   > is there any better way of handling this? converting row to columnar should be done via RowDataBlock and ColumnDataBlock abstraction?
   > 
   > I agree we can do the refactoring later but was wondering if there's any way we can directly extract this part into a DataBlockUtils



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggFunctionQueryContext.java:
##########
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+/**
+ * The <code>AggFunctionQueryContext</code> class contains extracted details from QueryContext that can be used for
+ * Aggregation Functions.
+ */
+public class AggFunctionQueryContext {

Review Comment:
   ` AggregationFunctionFactory.getAggregationFunction()` used queryContext earlier to figure out the details like - nullHandling, Limit, OrderBy.
   
   We need this refactor for this effort because the intermediate operators are unaware of QueryContext. 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunction.java:
##########
@@ -106,6 +106,27 @@ void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder
    */
   IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2);
 
+  /**
+   * Merges two intermediate results and also updates the aggregation result holder. This is needed when aggregation
+   * is processed in multiple stages to store the intermediate results.
+   */
+  default void mergeAndUpdateResultHolder(IntermediateResult intermediateResult,
+      AggregationResultHolder aggregationResultHolder) {
+    // TODO: Remove when support for all aggregation functions is added to the Multistage engine.

Review Comment:
   The end state is that every aggregation function that is yet to be supported (eg: mode, covariance) will have their own implementations of `mergeAndUpdateResultHolder()`



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java:
##########
@@ -187,17 +189,19 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
       } else {
         switch (AggregationFunctionType.valueOf(upperCaseFunctionName)) {
           case COUNT:
-            return new CountAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
+            return new CountAggregationFunction(firstArgument, aggFunctionQueryContext.isNullHandlingEnabled());
           case MIN:
-            return new MinAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
+            return new MinAggregationFunction(firstArgument, aggFunctionQueryContext.isNullHandlingEnabled());
           case MAX:
-            return new MaxAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
+            return new MaxAggregationFunction(firstArgument, aggFunctionQueryContext.isNullHandlingEnabled());
+          // TODO(Sonam): Uncomment SUM0 when merging planner changes
           case SUM:
-            return new SumAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
+          // case SUM0:

Review Comment:
   The return type etc is set by the planner changes in [AggregationFunctionType.java](https://github.com/apache/pinot/pull/10846/files#diff-57508c9bebed5cdf9f9fcdbf19728615ae9f224f9e09e42de03e60f5235c7306)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] vvivekiyer commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "vvivekiyer (via GitHub)" <gi...@apache.org>.
vvivekiyer commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1242534682


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java:
##########
@@ -309,8 +313,8 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
           case DISTINCTAVGMV:
             return new DistinctAvgMVAggregationFunction(firstArgument);
           case DISTINCT:
-            return new DistinctAggregationFunction(arguments, queryContext.getOrderByExpressions(),

Review Comment:
   @Jackie-Jiang  can we clean this up so that Distinct is not modeled as an aggregation function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] vvivekiyer commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "vvivekiyer (via GitHub)" <gi...@apache.org>.
vvivekiyer commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1242588866


##########
pinot-core/src/main/java/org/apache/pinot/core/common/IntermediateStageBlockValSet.java:
##########
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common;
+
+import java.math.BigDecimal;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * In the multistage engine, the leaf stage servers process the data in columnar fashion. By the time the
+ * intermediate stage receives the projected column, they are converted to a row based format. This class provides
+ * the capability to convert the row based represenation into blocks so that they can be used to process
+ * aggregations.
+ * TODO: Support MV
+ */
+public class IntermediateStageBlockValSet implements BlockValSet {
+  private final FieldSpec.DataType _dataType;
+  private final PinotDataType _pinotDataType;
+  private final List<Object> _values;
+  private final RoaringBitmap _nullBitMap;
+  private boolean _nullBitMapSet;
+
+  public IntermediateStageBlockValSet(DataSchema.ColumnDataType columnDataType, List<Object> values) {
+    _dataType = columnDataType.toDataType();
+    _pinotDataType = PinotDataType.getPinotDataTypeForExecution(columnDataType);
+    _values = values;

Review Comment:
   Instead of extracting the list of objects here 
   - store the original block here (entire block) along with an index.
   - When you are actually extracting the primitive types, use the index to get the value out.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1230300258


##########
pinot-common/src/main/java/org/apache/pinot/common/request/context/IdentifierContext.java:
##########
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.request.context;
+
+import java.util.Objects;
+import org.apache.pinot.common.utils.DataSchema;
+
+/**
+ * The {@code IdentifierContext} class represents a Identifer in the query.
+ * <p> This class includes information that about an identifier. The v1 engine uses column names for identifiers. The
+ * multistage query engine uses ordinals to distinctly track each identifier. So this context is set up to support both
+ * v1 and multistage engine identifiers.
+ */
+public class IdentifierContext {

Review Comment:
   why is this class necessary? 
   - all PlanNode has schema and rexexpression, you could derive the column name from the schema right?
   - i think the intermediate stage input schema column name is just going to be `EXPR$0` for ordinal `[0]`
   - even if it is not named like that, intermediate stage schema column name is totally up to us, we can make it ordinal based such as `$colName__$ordinal` format so we dont need this
   
   It is not a deal breaker but would make lots of the v1 file changes unnecessary



##########
pinot-core/src/main/java/org/apache/pinot/core/common/IntermediateStageBlockValSet.java:
##########
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common;
+
+import java.math.BigDecimal;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * In the multistage engine, the leaf stage servers process the data in columnar fashion. By the time the
+ * intermediate stage receives the projected column, they are converted to a row based format. This class provides
+ * the capability to convert the row based represenation into blocks so that they can be used to process
+ * aggregations.
+ * TODO: Support MV
+ */
+public class IntermediateStageBlockValSet implements BlockValSet {
+  private final FieldSpec.DataType _dataType;
+  private final PinotDataType _pinotDataType;
+  private final List<Object> _values;
+  private final RoaringBitmap _nullBitMap;
+  private boolean _nullBitMapSet;
+
+  public IntermediateStageBlockValSet(DataSchema.ColumnDataType columnDataType, List<Object> values) {

Review Comment:
   if we already knew the raw type. let's dont even box it in `Object`



##########
pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintStrategyTable.java:
##########
@@ -29,6 +29,9 @@
 public class PinotHintStrategyTable {
   public static final String INTERNAL_AGG_INTERMEDIATE_STAGE = "aggIntermediateStage";
   public static final String INTERNAL_AGG_FINAL_STAGE = "aggFinalStage";
+  // Hint to denote that aggregation is to be run as a single stage rather than split into an intermediate and a final
+  // stage
+  public static final String INTERNAL_IS_SINGLE_STAGE_AGG = "isSingleStageAgg";

Review Comment:
   what does this hint do? this hint is the same as `aggFinalStage` right?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunction.java:
##########
@@ -106,6 +106,27 @@ void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder
    */
   IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2);
 
+  /**
+   * Merges two intermediate results and also updates the aggregation result holder. This is needed when aggregation
+   * is processed in multiple stages to store the intermediate results.
+   */
+  default void mergeAndUpdateResultHolder(IntermediateResult intermediateResult,
+      AggregationResultHolder aggregationResultHolder) {
+    // TODO: Remove when support for all aggregation functions is added to the Multistage engine.

Review Comment:
   i am confused with this comment, what's the end state after removing this function? 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java:
##########
@@ -187,17 +189,19 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
       } else {
         switch (AggregationFunctionType.valueOf(upperCaseFunctionName)) {
           case COUNT:
-            return new CountAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
+            return new CountAggregationFunction(firstArgument, aggFunctionQueryContext.isNullHandlingEnabled());
           case MIN:
-            return new MinAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
+            return new MinAggregationFunction(firstArgument, aggFunctionQueryContext.isNullHandlingEnabled());
           case MAX:
-            return new MaxAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
+            return new MaxAggregationFunction(firstArgument, aggFunctionQueryContext.isNullHandlingEnabled());
+          // TODO(Sonam): Uncomment SUM0 when merging planner changes
           case SUM:
-            return new SumAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
+          // case SUM0:

Review Comment:
   how did we removed SUM,SUM0 and other variance?
   
   IMO this move is not exactly ok, b/c the variance has different behavior on
   - what to return when nothing is in the group set; or nothing is in global agg
   - what to do with null values



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggFunctionQueryContext.java:
##########
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+/**
+ * The <code>AggFunctionQueryContext</code> class contains extracted details from QueryContext that can be used for
+ * Aggregation Functions.
+ */
+public class AggFunctionQueryContext {

Review Comment:
   is this a refactor? can we individually pull this one out as a separate PR?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseBooleanAggregationFunction.java:
##########
@@ -231,9 +231,27 @@ public Integer merge(Integer intermediateResult1, Integer intermediateResult2) {
     return _merger.merge(intermediateResult1, intermediateResult2);
   }
 
+  @Override
+  public void mergeAndUpdateResultHolder(Integer intermediateResult,

Review Comment:
   this and other functions. can we split the supported functions into 
   - what currently v2 supports, MIN/MAX/SUM/COUNT
   - what we plan to demonstrate in this PR (preferrably, distinctCountHLL?)
   - what we can add later (all the others)
   
   This way we can make this PR smaller by not including the rest of the agg functions 
   - we are not supporting them anyway correct?
   - these 2 added APIs are only used by multistage v2 operators yes?
   
   [note to self] skip reviewing the rest of the agg functions



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java:
##########
@@ -96,10 +102,88 @@ public MultiStageOperator visitMailboxSend(MailboxSendNode node, PhysicalPlanCon
   @Override
   public MultiStageOperator visitAggregate(AggregateNode node, PhysicalPlanContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
+
+    // TODO: Will need to use a collection of inputSchema when we support aggregation functions with multiple
+    //  columns.

Review Comment:
   i am not quite sure what this TODO is about, could you elaborate with an example?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NewAggregateOperator.java:
##########
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.IntermediateStageBlockValSet;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.function.AggFunctionQueryContext;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+
+
+/**
+ *
+ * AggregateOperator is used to aggregate values over a set of group by keys.
+ * Output data will be in the format of [group by key, aggregate result1, ... aggregate resultN]
+ * Currently, we only support the following aggregation functions:
+ * 1. SUM
+ * 2. COUNT
+ * 3. MIN
+ * 4. MAX
+ * 5. DistinctCount and Count(Distinct)
+ * 6.AVG
+ * 7. FourthMoment
+ * 8. BoolAnd and BoolOr
+ *
+ * When the list of aggregation calls is empty, this class is used to calculate distinct result based on group by keys.
+ * In this case, the input can be any type.
+ *
+ * If the list of aggregation calls is not empty, the input of aggregation has to be a number.
+ *
+ * Note: This class performs aggregation over the double value of input.
+ * If the input is single value, the output type will be input type. Otherwise, the output type will be double.
+ */
+// TODO: Rename to AggregateOperator when merging Planner support.
+public class NewAggregateOperator extends MultiStageOperator {
+  private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+
+  private final MultiStageOperator _inputOperator;
+  private final DataSchema _resultSchema;
+
+  // Aggregation containers
+  private final AggregationFunction[] _aggregationFunctions;
+  private final AggregationResultHolder[] _aggregationResultHolders;
+
+  // Group By containers
+  private final List<ExpressionContext> _groupSet;
+  private final GroupByResultHolder[] _groupByResultHolders;
+  // Mapping from the group by row-key to the values in the row.
+  private final Map<Key, Object[]> _groupByKeyHolder;
+  // groupId and groupIdMap are used to create a 0-based index for group-by keys instead of using the hash value
+  // directly - similar to GroupByKeyGenerator. This is useful when we invoke the aggregation functions because they
+  // use the group by key indexes to store results.
+  private int _groupId = 0;
+  private Map<Integer, Integer> _groupIdMap;
+
+  private TransferableBlock _upstreamErrorBlock;
+  private boolean _readyToConstruct;
+  private boolean _hasReturnedAggregateBlock;
+
+  // Denotes whether this aggregation operator should merge intermediate results.
+  private boolean _isMergeAggregation;
+
+  // TODO: refactor Pinot Reducer code to support the intermediate stage agg operator.
+  // aggCalls has to be a list of FunctionCall and cannot be null
+  // groupSet has to be a list of InputRef and cannot be null
+  // TODO: Add these two checks when we confirm we can handle error in upstream ctor call.
+
+  @VisibleForTesting
+  public NewAggregateOperator(OpChainExecutionContext context, MultiStageOperator inputOperator,
+      DataSchema resultSchema, List<FunctionContext> functionContexts, List<ExpressionContext> groupSet,
+      boolean isMergeAggregation, boolean isSingleStageAggregation) {
+    super(context);
+    _inputOperator = inputOperator;
+    _resultSchema = resultSchema;
+
+    _groupSet = groupSet;
+    _groupByKeyHolder = new HashMap<>();
+    _groupIdMap = new HashMap<>();
+    _aggregationFunctions = new AggregationFunction[functionContexts.size()];
+    _aggregationResultHolders = new AggregationResultHolder[functionContexts.size()];
+    _groupByResultHolders = new GroupByResultHolder[functionContexts.size()];
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      _aggregationFunctions[i] = AggregationFunctionFactory.getAggregationFunction(functionContexts.get(i),
+          new AggFunctionQueryContext(true));
+      _aggregationResultHolders[i] = _aggregationFunctions[i].createAggregationResultHolder();
+      _groupByResultHolders[i] = _aggregationFunctions[i].createGroupByResultHolder(
+          InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+          InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+    }
+
+    _upstreamErrorBlock = null;
+    _readyToConstruct = false;
+    _hasReturnedAggregateBlock = false;
+    _isMergeAggregation = isMergeAggregation && !isSingleStageAggregation;
+  }
+
+  @Override
+  public List<MultiStageOperator> getChildOperators() {
+    return ImmutableList.of(_inputOperator);
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected TransferableBlock getNextBlock() {
+    try {
+      if (!_readyToConstruct && !consumeInputBlocks()) {
+        return TransferableBlockUtils.getNoOpTransferableBlock();
+      }
+
+      if (_upstreamErrorBlock != null) {
+        return _upstreamErrorBlock;
+      }
+
+      if (!_hasReturnedAggregateBlock) {
+        return produceAggregatedBlock();
+      } else {
+        // TODO: Move to close call.
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      }
+    } catch (Exception e) {
+      return TransferableBlockUtils.getErrorTransferableBlock(e);
+    }
+  }
+
+  private TransferableBlock produceAggregatedBlock() {
+    List<Object[]> rows = _groupSet.isEmpty() ? collectAggregationResultRows() : collectGroupByResultRows();
+
+    _hasReturnedAggregateBlock = true;
+    if (rows.size() == 0) {
+      if (_groupSet.size() == 0) {
+        return constructEmptyAggResultBlock();
+      } else {
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      }
+    } else {
+      return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
+    }
+  }
+
+  private List<Object[]> collectAggregationResultRows() {
+    List<Object[]> rows = new ArrayList<>();
+
+    Object[] row = new Object[_aggregationFunctions.length];
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      AggregationFunction aggregationFunction = _aggregationFunctions[i];
+      row[i] = aggregationFunction.extractAggregationResult(_aggregationResultHolders[i]);
+    }
+    rows.add(row);
+    return rows;
+  }
+
+  private List<Object[]> collectGroupByResultRows() {
+    List<Object[]> rows = new ArrayList<>();
+    for (Map.Entry<Key, Object[]> e : _groupByKeyHolder.entrySet()) {
+      Object[] row = new Object[_aggregationFunctions.length + _groupSet.size()];
+      Object[] keyElements = e.getValue();
+      System.arraycopy(keyElements, 0, row, 0, keyElements.length);
+      for (int i = 0; i < _aggregationFunctions.length; i++) {
+        row[i + _groupSet.size()] = _aggregationFunctions[i].extractGroupByResult(_groupByResultHolders[i],
+            _groupIdMap.get(e.getKey().hashCode()));
+      }
+      rows.add(row);
+    }
+    return rows;
+  }
+
+  /**
+   * @return an empty agg result block for non-group-by aggregation.
+   */
+  private TransferableBlock constructEmptyAggResultBlock() {
+    Object[] row = new Object[_aggregationFunctions.length];
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      AggregationFunction aggFunction = _aggregationFunctions[i];
+      row[i] = aggFunction.extractAggregationResult(aggFunction.createAggregationResultHolder());
+    }
+    return new TransferableBlock(Collections.singletonList(row), _resultSchema, DataBlock.Type.ROW);
+  }
+
+  /**
+   * @return whether or not the operator is ready to move on (EOS or ERROR)
+   */
+  private boolean consumeInputBlocks() {
+    TransferableBlock block = _inputOperator.nextBlock();
+    while (!block.isNoOpBlock()) {
+      // setting upstream error block
+      if (block.isErrorBlock()) {
+        _upstreamErrorBlock = block;
+        return true;
+      } else if (block.isEndOfStreamBlock()) {
+        _readyToConstruct = true;
+        return true;
+      }
+
+      List<Object[]> container = block.getContainer();
+      if (_isMergeAggregation) {
+        mergeIntermediateValues(container);
+      } else {
+        aggregateValues(container);
+      }
+
+      block = _inputOperator.nextBlock();
+    }
+    return false;
+  }
+
+  private void mergeIntermediateValues(List<Object[]> container) {
+    if (_groupSet.isEmpty()) {
+      performMergeAggregation(container);
+    } else {
+      performMergeGroupBy(container);
+    }
+  }
+
+  private void performMergeAggregation(List<Object[]> container) {
+    // Simple aggregation function.
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      List<ExpressionContext> expressions = _aggregationFunctions[i].getInputExpressions();
+      for (Object[] row : container) {
+        Object intermediateResultToMerge = extractIntermediateValue(row, expressions);
+        _aggregationFunctions[i].mergeAndUpdateResultHolder(intermediateResultToMerge,
+            _aggregationResultHolders[i]);
+      }
+    }
+  }
+
+  private void performMergeGroupBy(List<Object[]> container) {
+    // Create group by keys for each row.
+    int[] intKeys = generateGroupByKeys(container);
+
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      GroupByResultHolder groupByResultHolder = _groupByResultHolders[i];
+      groupByResultHolder.ensureCapacity(_groupIdMap.size());
+      List<ExpressionContext> expressions = _aggregationFunctions[i].getInputExpressions();
+
+      for (int j = 0; j < container.size(); j++) {
+        Object[] row = container.get(j);
+        Object intermediateResultToMerge = extractIntermediateValue(row, expressions);
+        _aggregationFunctions[i].mergeAndUpdateResultHolder(intermediateResultToMerge, groupByResultHolder,
+            intKeys[j]);
+      }
+    }
+  }
+
+  Object extractIntermediateValue(Object[] row, List<ExpressionContext> expressions) {
+    // TODO: Add support to handle aggregation functions where:
+    //       1. The identifier need not be the first argument
+    //       2. There are more than one identifiers.
+    Preconditions.checkState(expressions.size() <= 1);
+    Preconditions.checkState(!expressions.isEmpty());
+    ExpressionContext expr = expressions.get(0);
+
+    Object result = expr.getType().equals(ExpressionContext.Type.IDENTIFIER) ? row[expr.getIdentifierIndex()]
+        : expr.getLiteral().getValue();
+    return result;
+  }
+
+  public void aggregateValues(List<Object[]> container) {
+    // Convert row to columnar representation
+    Map<Integer, List<Object>> columnValuesMap = new HashMap<>();

Review Comment:
   is there any better way of handling this? converting row to columnar should be done via RowDataBlock and ColumnDataBlock abstraction?
   
   I agree we can do the refactoring later but was wondering if there's any way we can directly extract this part into a DataBlockUtils



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] vvivekiyer commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "vvivekiyer (via GitHub)" <gi...@apache.org>.
vvivekiyer commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1242513304


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java:
##########
@@ -96,10 +102,85 @@ public MultiStageOperator visitMailboxSend(MailboxSendNode node, PhysicalPlanCon
   @Override
   public MultiStageOperator visitAggregate(AggregateNode node, PhysicalPlanContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
+    DataSchema inputSchema = node.getInputs().get(0).getDataSchema();
+    DataSchema resultSchema = node.getDataSchema();
+
+    // Convert aggCalls to FunctionContext and ExpressionContext that our aggregation functions understand.

Review Comment:
   Move this logic to convert RexExpression to FunctionContext/ExpressionContext -> NewAggregateOperator.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1248121775


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java:
##########
@@ -30,7 +30,7 @@
 import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
-
+// TODO: Add null handling.

Review Comment:
   null support is not there in v1 anyway (for most of agg functions)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] vvivekiyer commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "vvivekiyer (via GitHub)" <gi...@apache.org>.
vvivekiyer commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1248152940


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageAggregationExecutor.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.IntermediateStageBlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+/**
+ * Class that executes all aggregation functions (without group-bys) for the multistage AggregateOperator.
+ */
+public class MultistageAggregationExecutor {
+  private final NewAggregateOperator.Mode _mode;
+  // The identifier operands for the aggregation function only store the column name. This map contains mapping
+  // from column name to their index.
+  private final Map<String, Integer> _colNameToIndexMap;
+
+  private final AggregationFunction[] _aggFunctions;
+
+  // Result holders for each mode.
+  private final AggregationResultHolder[] _aggregateResultHolder;
+  private final Object[] _mergeResultHolder;
+  private final Object[] _finalResultHolder;
+
+  public MultistageAggregationExecutor(AggregationFunction[] aggFunctions, NewAggregateOperator.Mode mode,
+      Map<String, Integer> colNameToIndexMap) {
+    _aggFunctions = aggFunctions;
+    _mode = mode;
+    _colNameToIndexMap = colNameToIndexMap;
+
+    _aggregateResultHolder = new AggregationResultHolder[aggFunctions.length];
+    _mergeResultHolder = new Object[aggFunctions.length];
+    _finalResultHolder = new Object[aggFunctions.length];
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      _aggregateResultHolder[i] = _aggFunctions[i].createAggregationResultHolder();
+    }
+  }
+
+  /**
+   * Performs aggregation for the data in the block.
+   */
+  public void processBlock(TransferableBlock block, DataSchema inputDataSchema) {
+    if (_mode.equals(NewAggregateOperator.Mode.AGGREGATE)) {
+      processAggregate(block, inputDataSchema);
+    } else if (_mode.equals(NewAggregateOperator.Mode.MERGE)) {
+      processMerge(block);
+    } else if (_mode.equals(NewAggregateOperator.Mode.EXTRACT_RESULT)) {
+      collectResult(block);
+    }
+  }
+
+  /**
+   * Fetches the result.
+   */
+  public List<Object[]> getResult() {
+    List<Object[]> rows = new ArrayList<>();
+    Object[] row = new Object[_aggFunctions.length];
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      AggregationFunction aggFunction = _aggFunctions[i];
+      if (_mode.equals(NewAggregateOperator.Mode.MERGE)) {
+        row[i] = _mergeResultHolder[i];
+      } else if (_mode.equals(NewAggregateOperator.Mode.AGGREGATE)) {
+        row[i] = aggFunction.extractAggregationResult(_aggregateResultHolder[i]);
+      } else {
+        assert _mode.equals(NewAggregateOperator.Mode.EXTRACT_RESULT);
+        Comparable result = aggFunction.extractFinalResult(_finalResultHolder[i]);
+        row[i] = result == null ? null : aggFunction.getFinalResultColumnType().convert(result);
+      }
+    }
+    rows.add(row);
+    return rows;
+  }
+
+  /**
+   * @return an empty agg result block for non-group-by aggregation.
+   */
+  public Object[] constructEmptyAggResultRow() {
+    Object[] row = new Object[_aggFunctions.length];
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      AggregationFunction aggFunction = _aggFunctions[i];
+      row[i] = aggFunction.extractAggregationResult(aggFunction.createAggregationResultHolder());
+    }
+    return row;
+  }
+
+  private void processAggregate(TransferableBlock block, DataSchema inputDataSchema) {
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      AggregationFunction aggregationFunction = _aggFunctions[i];
+      Map<ExpressionContext, BlockValSet> blockValSetMap =
+          getBlockValSetMap(aggregationFunction, block, inputDataSchema);
+      aggregationFunction.aggregate(block.getNumRows(), _aggregateResultHolder[i], blockValSetMap);
+    }
+  }
+
+  private void processMerge(TransferableBlock block) {
+    List<Object[]> container = block.getContainer();
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      List<ExpressionContext> expressions = _aggFunctions[i].getInputExpressions();
+      for (Object[] row : container) {
+        Object intermediateResultToMerge = extractValueFromRow(row, expressions);
+        Object mergedIntermediateResult = _mergeResultHolder[i];
+
+        // Not all V1 aggregation functions have null-handling logic. Handle null values before calling merge.
+        if (intermediateResultToMerge == null) {
+          continue;
+        }
+        if (mergedIntermediateResult == null) {
+          _mergeResultHolder[i] = intermediateResultToMerge;
+          continue;
+        }
+
+        _mergeResultHolder[i] = _aggFunctions[i].merge(intermediateResultToMerge, mergedIntermediateResult);
+      }
+    }
+  }
+
+  private void collectResult(TransferableBlock block) {
+    List<Object[]> container = block.getContainer();
+    assert container.size() == 1;
+    Object[] row = container.get(0);
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      List<ExpressionContext> expressions = _aggFunctions[i].getInputExpressions();
+      _finalResultHolder[i] = extractValueFromRow(row, expressions);
+    }
+  }
+
+  private Map<ExpressionContext, BlockValSet> getBlockValSetMap(AggregationFunction aggFunction,
+      TransferableBlock block, DataSchema inputDataSchema) {
+    List<ExpressionContext> expressions = aggFunction.getInputExpressions();
+    int numExpressions = expressions.size();
+    if (numExpressions == 0) {
+      return Collections.emptyMap();
+    }
+
+    Preconditions.checkState(numExpressions == 1, "Cannot handle more than one identifier in aggregation function.");
+    ExpressionContext expression = expressions.get(0);
+    Preconditions.checkState(expression.getType().equals(ExpressionContext.Type.IDENTIFIER));
+    int index = _colNameToIndexMap.get(expression.getIdentifier());
+
+    DataSchema.ColumnDataType dataType = inputDataSchema.getColumnDataType(index);
+    Preconditions.checkState(block.getType().equals(DataBlock.Type.ROW), "Datablock type is not ROW");
+    return Collections.singletonMap(expression,
+        new IntermediateStageBlockValSet(dataType, block.getDataBlock(), index));

Review Comment:
   Added a TODO as discussed offline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] somandal commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "somandal (via GitHub)" <gi...@apache.org>.
somandal commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1230366679


##########
pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintStrategyTable.java:
##########
@@ -29,6 +29,9 @@
 public class PinotHintStrategyTable {
   public static final String INTERNAL_AGG_INTERMEDIATE_STAGE = "aggIntermediateStage";
   public static final String INTERNAL_AGG_FINAL_STAGE = "aggFinalStage";
+  // Hint to denote that aggregation is to be run as a single stage rather than split into an intermediate and a final
+  // stage
+  public static final String INTERNAL_IS_SINGLE_STAGE_AGG = "isSingleStageAgg";

Review Comment:
   @walterddr this will make more sense once you look at this PR: https://github.com/apache/pinot/pull/10846
   
   We cannot leverage the final stage hint because we use that to decide when to use the singleton exchange for aggregations without group-by (to perform global aggregation step in final). For both of the aggregation hints, skip leaf aggregation, and partition keys = group by keys, we run the aggregation as a single stage and must treat the incoming records as the intermediate stage and not the final stage. That's why we decided to expose this new hint for these scenarios. We also cannot change the final stage hint to intermediate stage hint for single stage aggregations even though that would work to run this single stage as an intermediate stage because of the need for the singleton exchange.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] vvivekiyer commented on pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "vvivekiyer (via GitHub)" <gi...@apache.org>.
vvivekiyer commented on PR #10845:
URL: https://github.com/apache/pinot/pull/10845#issuecomment-1601806576

   Hey @walterddr , sorry about the late response (I was on-call last week). @somandal  and I could do a code walk-through to explain our changes. Would that make the review process easier? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #10845:
URL: https://github.com/apache/pinot/pull/10845#issuecomment-1590081259

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/10845?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#10845](https://app.codecov.io/gh/apache/pinot/pull/10845?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (408db95) into [master](https://app.codecov.io/gh/apache/pinot/commit/da6944882da5c30b1f790b21d1e247a8466da67b?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (da69448) will **decrease** coverage by `68.31%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #10845       +/-   ##
   =============================================
   - Coverage     68.42%    0.11%   -68.31%     
   =============================================
     Files          2170     2137       -33     
     Lines        116688   115378     -1310     
     Branches      17661    17514      -147     
   =============================================
   - Hits          79846      137    -79709     
   - Misses        31196   115221    +84025     
   + Partials       5646       20     -5626     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | unittests1 | `?` | |
   | unittests2 | `?` | |
   | unittests2temurin17 | `0.11% <0.00%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://app.codecov.io/gh/apache/pinot/pull/10845?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...inot/common/request/context/ExpressionContext.java](https://app.codecov.io/gh/apache/pinot/pull/10845?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vcmVxdWVzdC9jb250ZXh0L0V4cHJlc3Npb25Db250ZXh0LmphdmE=) | `0.00% <0.00%> (-84.62%)` | :arrow_down: |
   | [...inot/common/request/context/IdentifierContext.java](https://app.codecov.io/gh/apache/pinot/pull/10845?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vcmVxdWVzdC9jb250ZXh0L0lkZW50aWZpZXJDb250ZXh0LmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...r/recommender/rules/impl/AggregateMetricsRule.java](https://app.codecov.io/gh/apache/pinot/pull/10845?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9yZWNvbW1lbmRlci9ydWxlcy9pbXBsL0FnZ3JlZ2F0ZU1ldHJpY3NSdWxlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...inot/core/common/IntermediateStageBlockValSet.java](https://app.codecov.io/gh/apache/pinot/pull/10845?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vSW50ZXJtZWRpYXRlU3RhZ2VCbG9ja1ZhbFNldC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...apache/pinot/core/operator/ProjectionOperator.java](https://app.codecov.io/gh/apache/pinot/pull/10845?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9vcGVyYXRvci9Qcm9qZWN0aW9uT3BlcmF0b3IuamF2YQ==) | `0.00% <0.00%> (-87.10%)` | :arrow_down: |
   | [...he/pinot/core/operator/blocks/ProjectionBlock.java](https://app.codecov.io/gh/apache/pinot/pull/10845?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9vcGVyYXRvci9ibG9ja3MvUHJvamVjdGlvbkJsb2NrLmphdmE=) | `0.00% <0.00%> (-68.75%)` | :arrow_down: |
   | [...nMaxValueBasedSelectionOrderByCombineOperator.java](https://app.codecov.io/gh/apache/pinot/pull/10845?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9vcGVyYXRvci9jb21iaW5lL01pbk1heFZhbHVlQmFzZWRTZWxlY3Rpb25PcmRlckJ5Q29tYmluZU9wZXJhdG9yLmphdmE=) | `0.00% <0.00%> (-74.27%)` | :arrow_down: |
   | [...ore/operator/filter/BitmapBasedFilterOperator.java](https://app.codecov.io/gh/apache/pinot/pull/10845?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9vcGVyYXRvci9maWx0ZXIvQml0bWFwQmFzZWRGaWx0ZXJPcGVyYXRvci5qYXZh) | `0.00% <0.00%> (-85.19%)` | :arrow_down: |
   | [...perator/filter/H3InclusionIndexFilterOperator.java](https://app.codecov.io/gh/apache/pinot/pull/10845?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9vcGVyYXRvci9maWx0ZXIvSDNJbmNsdXNpb25JbmRleEZpbHRlck9wZXJhdG9yLmphdmE=) | `0.00% <0.00%> (-82.70%)` | :arrow_down: |
   | [...ot/core/operator/filter/H3IndexFilterOperator.java](https://app.codecov.io/gh/apache/pinot/pull/10845?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9vcGVyYXRvci9maWx0ZXIvSDNJbmRleEZpbHRlck9wZXJhdG9yLmphdmE=) | `0.00% <0.00%> (-82.48%)` | :arrow_down: |
   | ... and [41 more](https://app.codecov.io/gh/apache/pinot/pull/10845?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | |
   
   ... and [1909 files with indirect coverage changes](https://app.codecov.io/gh/apache/pinot/pull/10845/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] somandal commented on pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "somandal (via GitHub)" <gi...@apache.org>.
somandal commented on PR #10845:
URL: https://github.com/apache/pinot/pull/10845#issuecomment-1592272566

   > thank you for the contribution. @vvivekiyer and @somandal
   > 
   > could i ask a generic question what's the relationship between this PR and #10846? i guess specifically how do i review them? should i download both, cherry-pick/merge together and try at the same time? please share your thoughts. many thanks!
   
   @walterddr yes these two PRs should ideally be reviewed together. https://github.com/apache/pinot/pull/10846 is the planner side changes necessary for this feature. Keep in mind that we haven't yet added support for function arguments as literals etc which are heavily used for some of our aggregation functions (and this will be done as part of phase 2). 
   Merging together may not be enough, since this PR creates a new aggregation operator. A small code change will be required to call the aggregation operator in this PR rather than the existing `AggregateOperator`. Hope this makes sense, let us know if you have any more questions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on PR #10845:
URL: https://github.com/apache/pinot/pull/10845#issuecomment-1592179933

   thank you for the contribution. @vvivekiyer and @somandal 
   
   could i ask a generic question what's the relationship between this PR and #10846? i guess specifically how do i review them? should i download both, cherry-pick/merge together and try at the same time? please share your thoughts. many thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] somandal commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "somandal (via GitHub)" <gi...@apache.org>.
somandal commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1230366679


##########
pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintStrategyTable.java:
##########
@@ -29,6 +29,9 @@
 public class PinotHintStrategyTable {
   public static final String INTERNAL_AGG_INTERMEDIATE_STAGE = "aggIntermediateStage";
   public static final String INTERNAL_AGG_FINAL_STAGE = "aggFinalStage";
+  // Hint to denote that aggregation is to be run as a single stage rather than split into an intermediate and a final
+  // stage
+  public static final String INTERNAL_IS_SINGLE_STAGE_AGG = "isSingleStageAgg";

Review Comment:
   @walterddr this will make more sense once you look at this PR: https://github.com/apache/pinot/pull/10846
   We cannot leverage the final stage hint because we need to differentiate between the intermediate stage actually running in the v1 engine as a leaf stage vs. intermediate stage running in the v2 engine after performing operations such as JOIN (agg on top of join vs. agg in leaf). We also need to differentiate between scenarios where the skip leaf aggregation hint is set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] vvivekiyer commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "vvivekiyer (via GitHub)" <gi...@apache.org>.
vvivekiyer commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1248214238


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseBooleanAggregationFunction.java:
##########
@@ -233,7 +233,8 @@ public Integer merge(Integer intermediateResult1, Integer intermediateResult2) {
 
   @Override
   public DataSchema.ColumnDataType getIntermediateResultColumnType() {
-    return DataSchema.ColumnDataType.BOOLEAN;
+    // The intermediateResult type is integer. The final result is converted to a boolean.
+    return DataSchema.ColumnDataType.INT;

Review Comment:
   Got it. 
   Addressed this by converting Boolean objects to Integer objects in NewAggregateOperator. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10845: [Multistage] Runtime changes for leveraging V1 Aggregation Functions

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10845:
URL: https://github.com/apache/pinot/pull/10845#discussion_r1248265090


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageAggregationExecutor.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.IntermediateStageBlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+/**
+ * Class that executes all aggregation functions (without group-bys) for the multistage AggregateOperator.
+ */
+public class MultistageAggregationExecutor {
+  private final NewAggregateOperator.Mode _mode;
+  // The identifier operands for the aggregation function only store the column name. This map contains mapping
+  // from column name to their index.
+  private final Map<String, Integer> _colNameToIndexMap;
+
+  private final AggregationFunction[] _aggFunctions;
+
+  // Result holders for each mode.
+  private final AggregationResultHolder[] _aggregateResultHolder;
+  private final Object[] _mergeResultHolder;
+  private final Object[] _finalResultHolder;
+
+  public MultistageAggregationExecutor(AggregationFunction[] aggFunctions, NewAggregateOperator.Mode mode,
+      Map<String, Integer> colNameToIndexMap) {
+    _aggFunctions = aggFunctions;
+    _mode = mode;
+    _colNameToIndexMap = colNameToIndexMap;
+
+    _aggregateResultHolder = new AggregationResultHolder[aggFunctions.length];
+    _mergeResultHolder = new Object[aggFunctions.length];
+    _finalResultHolder = new Object[aggFunctions.length];
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      _aggregateResultHolder[i] = _aggFunctions[i].createAggregationResultHolder();
+    }
+  }
+
+  /**
+   * Performs aggregation for the data in the block.
+   */
+  public void processBlock(TransferableBlock block, DataSchema inputDataSchema) {
+    if (_mode.equals(NewAggregateOperator.Mode.AGGREGATE)) {
+      processAggregate(block, inputDataSchema);
+    } else if (_mode.equals(NewAggregateOperator.Mode.MERGE)) {
+      processMerge(block);
+    } else if (_mode.equals(NewAggregateOperator.Mode.EXTRACT_RESULT)) {
+      collectResult(block);
+    }
+  }
+
+  /**
+   * Fetches the result.
+   */
+  public List<Object[]> getResult() {
+    List<Object[]> rows = new ArrayList<>();
+    Object[] row = new Object[_aggFunctions.length];
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      AggregationFunction aggFunction = _aggFunctions[i];
+      if (_mode.equals(NewAggregateOperator.Mode.MERGE)) {
+        row[i] = _mergeResultHolder[i];
+      } else if (_mode.equals(NewAggregateOperator.Mode.AGGREGATE)) {
+        row[i] = aggFunction.extractAggregationResult(_aggregateResultHolder[i]);
+      } else {
+        assert _mode.equals(NewAggregateOperator.Mode.EXTRACT_RESULT);
+        Comparable result = aggFunction.extractFinalResult(_finalResultHolder[i]);
+        row[i] = result == null ? null : aggFunction.getFinalResultColumnType().convert(result);
+      }
+    }
+    rows.add(row);
+    return rows;
+  }
+
+  /**
+   * @return an empty agg result block for non-group-by aggregation.
+   */
+  public Object[] constructEmptyAggResultRow() {
+    Object[] row = new Object[_aggFunctions.length];
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      AggregationFunction aggFunction = _aggFunctions[i];
+      row[i] = aggFunction.extractAggregationResult(aggFunction.createAggregationResultHolder());
+    }
+    return row;
+  }
+
+  private void processAggregate(TransferableBlock block, DataSchema inputDataSchema) {
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      AggregationFunction aggregationFunction = _aggFunctions[i];
+      Map<ExpressionContext, BlockValSet> blockValSetMap =
+          getBlockValSetMap(aggregationFunction, block, inputDataSchema);
+      aggregationFunction.aggregate(block.getNumRows(), _aggregateResultHolder[i], blockValSetMap);
+    }
+  }
+
+  private void processMerge(TransferableBlock block) {
+    List<Object[]> container = block.getContainer();
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      List<ExpressionContext> expressions = _aggFunctions[i].getInputExpressions();
+      for (Object[] row : container) {
+        Object intermediateResultToMerge = extractValueFromRow(row, expressions);
+        Object mergedIntermediateResult = _mergeResultHolder[i];
+
+        // Not all V1 aggregation functions have null-handling logic. Handle null values before calling merge.
+        if (intermediateResultToMerge == null) {
+          continue;
+        }
+        if (mergedIntermediateResult == null) {
+          _mergeResultHolder[i] = intermediateResultToMerge;
+          continue;
+        }
+
+        _mergeResultHolder[i] = _aggFunctions[i].merge(intermediateResultToMerge, mergedIntermediateResult);
+      }
+    }
+  }
+
+  private void collectResult(TransferableBlock block) {
+    List<Object[]> container = block.getContainer();
+    assert container.size() == 1;
+    Object[] row = container.get(0);
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      List<ExpressionContext> expressions = _aggFunctions[i].getInputExpressions();
+      _finalResultHolder[i] = extractValueFromRow(row, expressions);
+    }
+  }
+
+  private Map<ExpressionContext, BlockValSet> getBlockValSetMap(AggregationFunction aggFunction,
+      TransferableBlock block, DataSchema inputDataSchema) {
+    List<ExpressionContext> expressions = aggFunction.getInputExpressions();
+    int numExpressions = expressions.size();
+    if (numExpressions == 0) {
+      return Collections.emptyMap();
+    }
+
+    Preconditions.checkState(numExpressions == 1, "Cannot handle more than one identifier in aggregation function.");
+    ExpressionContext expression = expressions.get(0);
+    Preconditions.checkState(expression.getType().equals(ExpressionContext.Type.IDENTIFIER));
+    int index = _colNameToIndexMap.get(expression.getIdentifier());
+
+    DataSchema.ColumnDataType dataType = inputDataSchema.getColumnDataType(index);
+    Preconditions.checkState(block.getType().equals(DataBlock.Type.ROW), "Datablock type is not ROW");
+    return Collections.singletonMap(expression,
+        new IntermediateStageBlockValSet(dataType, block.getDataBlock(), index));

Review Comment:
   yeah i forgot about the nullbitmap. so long story short we might really have to do some rethinking on this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org