You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/07/01 16:32:38 UTC

[pinot] branch master updated: [Multistage] Runtime changes for leveraging V1 Aggregation Functions (#10845)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a6ee2773a2 [Multistage] Runtime changes for leveraging V1 Aggregation Functions  (#10845)
a6ee2773a2 is described below

commit a6ee2773a28b2339ac1a4fc80fba978ae69dbbde
Author: Vivek Iyer Vaidyanathan <vv...@gmail.com>
AuthorDate: Sat Jul 1 09:32:31 2023 -0700

    [Multistage] Runtime changes for leveraging V1 Aggregation Functions  (#10845)
    
    * [Multistage] Runtime changes for supporting Aggregations
    * Special handling for bool_and bool_or
---
 .../pinot/common/datablock/DataBlockUtils.java     | 292 +++++++
 .../core/common/IntermediateStageBlockValSet.java  | 155 ++++
 .../function/AggregationFunctionFactory.java       |   3 +
 .../operator/MultistageAggregationExecutor.java    | 223 +++++
 .../operator/MultistageGroupByExecutor.java        | 307 +++++++
 .../runtime/operator/NewAggregateOperator.java     | 295 +++++++
 .../query/runtime/plan/PhysicalPlanVisitor.java    |  13 +
 .../runtime/operator/AggregateOperatorTest.java    |   2 +-
 .../src/test/resources/queries/Aggregates.json     | 922 ++++++++++++++++++---
 .../src/test/resources/queries/NullHandling.json   |  11 +-
 .../src/test/resources/queries/WithStatements.json |  72 +-
 11 files changed, 2147 insertions(+), 148 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
index 6bf6a0e05f..99c3e4df46 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.common.datablock;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.sql.Timestamp;
 import java.util.ArrayList;
@@ -268,4 +269,295 @@ public final class DataBlockUtils {
     }
     return row;
   }
+
+  /**
+   * Given a datablock and the column index, extracts the integer values for the column. Prefer using this function over
+   * extractRowFromDatablock if the desired datatype is known to prevent autoboxing to Object and later unboxing to the
+   * desired type.
+   * This only works on ROW format.
+   * TODO: Add support for COLUMNAR format.
+   * @return int array of values in the column
+   */
+  public static int[] extractIntValuesForColumn(DataBlock dataBlock, int columnIndex) {
+    DataSchema dataSchema = dataBlock.getDataSchema();
+    DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+    // Get null bitmap for the column.
+    RoaringBitmap nullBitmap = extractNullBitmaps(dataBlock)[columnIndex];
+    int numRows = dataBlock.getNumberOfRows();
+
+    int[] rows = new int[numRows];
+    for (int rowId = 0; rowId < numRows; rowId++) {
+      if (nullBitmap != null && nullBitmap.contains(rowId)) {
+        continue;
+      }
+
+      switch (columnDataTypes[columnIndex]) {
+        case INT:
+        case BOOLEAN:
+          rows[rowId] = dataBlock.getInt(rowId, columnIndex);
+          break;
+        case LONG:
+          rows[rowId] = (int) dataBlock.getLong(rowId, columnIndex);
+          break;
+        case FLOAT:
+          rows[rowId] = (int) dataBlock.getFloat(rowId, columnIndex);
+          break;
+        case DOUBLE:
+          rows[rowId] = (int) dataBlock.getDouble(rowId, columnIndex);
+          break;
+        case BIG_DECIMAL:
+          rows[rowId] = dataBlock.getBigDecimal(rowId, columnIndex).intValue();
+          break;
+        default:
+          throw new IllegalStateException(
+              String.format("Unsupported data type: %s for column: %s", columnDataTypes[columnIndex], columnIndex));
+      }
+    }
+    return rows;
+  }
+
+  /**
+   * Given a datablock and the column index, extracts the long values for the column. Prefer using this function over
+   * extractRowFromDatablock if the desired datatype is known to prevent autoboxing to Object and later unboxing to the
+   * desired type.
+   * This only works on ROW format.
+   * TODO: Add support for COLUMNAR format.
+   * @return long array of values in the column
+   */
+  public static long[] extractLongValuesForColumn(DataBlock dataBlock, int columnIndex) {
+    DataSchema dataSchema = dataBlock.getDataSchema();
+    DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+    // Get null bitmap for the column.
+    RoaringBitmap nullBitmap = extractNullBitmaps(dataBlock)[columnIndex];
+    int numRows = dataBlock.getNumberOfRows();
+
+    long[] rows = new long[numRows];
+    for (int rowId = 0; rowId < numRows; rowId++) {
+      if (nullBitmap != null && nullBitmap.contains(rowId)) {
+        continue;
+      }
+
+      switch (columnDataTypes[columnIndex]) {
+        case INT:
+        case BOOLEAN:
+          rows[rowId] = dataBlock.getInt(rowId, columnIndex);
+          break;
+        case LONG:
+          rows[rowId] = dataBlock.getLong(rowId, columnIndex);
+          break;
+        case FLOAT:
+          rows[rowId] = (long) dataBlock.getFloat(rowId, columnIndex);
+          break;
+        case DOUBLE:
+          rows[rowId] = (long) dataBlock.getDouble(rowId, columnIndex);
+          break;
+        case BIG_DECIMAL:
+          rows[rowId] = dataBlock.getBigDecimal(rowId, columnIndex).longValue();
+          break;
+        default:
+          throw new IllegalStateException(
+              String.format("Unsupported data type: %s for column: %s", columnDataTypes[columnIndex], columnIndex));
+      }
+    }
+    return rows;
+  }
+
+  /**
+   * Given a datablock and the column index, extracts the float values for the column. Prefer using this function over
+   * extractRowFromDatablock if the desired datatype is known to prevent autoboxing to Object and later unboxing to the
+   * desired type.
+   * This only works on ROW format.
+   * TODO: Add support for COLUMNAR format.
+   * @return float array of values in the column
+   */
+  public static float[] extractFloatValuesForColumn(DataBlock dataBlock, int columnIndex) {
+    DataSchema dataSchema = dataBlock.getDataSchema();
+    DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+    // Get null bitmap for the column.
+    RoaringBitmap nullBitmap = extractNullBitmaps(dataBlock)[columnIndex];
+    int numRows = dataBlock.getNumberOfRows();
+
+    float[] rows = new float[numRows];
+    for (int rowId = 0; rowId < numRows; rowId++) {
+      if (nullBitmap != null && nullBitmap.contains(rowId)) {
+        continue;
+      }
+
+      switch (columnDataTypes[columnIndex]) {
+        case INT:
+        case BOOLEAN:
+          rows[rowId] = dataBlock.getInt(rowId, columnIndex);
+          break;
+        case LONG:
+          rows[rowId] = dataBlock.getLong(rowId, columnIndex);
+          break;
+        case FLOAT:
+          rows[rowId] = dataBlock.getFloat(rowId, columnIndex);
+          break;
+        case DOUBLE:
+          rows[rowId] = (float) dataBlock.getDouble(rowId, columnIndex);
+          break;
+        case BIG_DECIMAL:
+          rows[rowId] = dataBlock.getBigDecimal(rowId, columnIndex).floatValue();
+          break;
+        default:
+          throw new IllegalStateException(
+              String.format("Unsupported data type: %s for column: %s", columnDataTypes[columnIndex], columnIndex));
+      }
+    }
+
+    return rows;
+  }
+
+  /**
+   * Given a datablock and the column index, extracts the double values for the column. Prefer using this function over
+   * extractRowFromDatablock if the desired datatype is known to prevent autoboxing to Object and later unboxing to the
+   * desired type.
+   * This only works on ROW format.
+   * TODO: Add support for COLUMNAR format.
+   * @return double array of values in the column
+   */
+  public static double[] extractDoubleValuesForColumn(DataBlock dataBlock, int columnIndex) {
+    DataSchema dataSchema = dataBlock.getDataSchema();
+    DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+    // Get null bitmap for the column.
+    RoaringBitmap nullBitmap = extractNullBitmaps(dataBlock)[columnIndex];
+    int numRows = dataBlock.getNumberOfRows();
+
+    double[] rows = new double[numRows];
+    for (int rowId = 0; rowId < numRows; rowId++) {
+      if (nullBitmap != null && nullBitmap.contains(rowId)) {
+        continue;
+      }
+      switch (columnDataTypes[columnIndex]) {
+        case INT:
+        case BOOLEAN:
+          rows[rowId] = dataBlock.getInt(rowId, columnIndex);
+          break;
+        case LONG:
+          rows[rowId] = dataBlock.getLong(rowId, columnIndex);
+          break;
+        case FLOAT:
+          rows[rowId] = dataBlock.getFloat(rowId, columnIndex);
+          break;
+        case DOUBLE:
+          rows[rowId] = dataBlock.getDouble(rowId, columnIndex);
+          break;
+        case BIG_DECIMAL:
+          rows[rowId] = dataBlock.getBigDecimal(rowId, columnIndex).doubleValue();
+          break;
+        default:
+          throw new IllegalStateException(
+              String.format("Unsupported data type: %s for column: %s", columnDataTypes[columnIndex], columnIndex));
+      }
+    }
+
+    return rows;
+  }
+
+  /**
+   * Given a datablock and the column index, extracts the BigDecimal values for the column. Prefer using this function
+   * over extractRowFromDatablock if the desired datatype is known to prevent autoboxing to Object and later unboxing to
+   * the desired type.
+   * This only works on ROW format.
+   * TODO: Add support for COLUMNAR format.
+   * @return BigDecimal array of values in the column
+   */
+  public static BigDecimal[] extractBigDecimalValuesForColumn(DataBlock dataBlock, int columnIndex) {
+    DataSchema dataSchema = dataBlock.getDataSchema();
+    DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+    // Get null bitmap for the column.
+    RoaringBitmap nullBitmap = extractNullBitmaps(dataBlock)[columnIndex];
+    int numRows = dataBlock.getNumberOfRows();
+
+    BigDecimal[] rows = new BigDecimal[numRows];
+    for (int rowId = 0; rowId < numRows; rowId++) {
+      if (nullBitmap != null && nullBitmap.contains(rowId)) {
+        continue;
+      }
+
+      switch (columnDataTypes[columnIndex]) {
+        case INT:
+        case BOOLEAN:
+          rows[rowId] = BigDecimal.valueOf(dataBlock.getInt(rowId, columnIndex));
+          break;
+        case LONG:
+          rows[rowId] = BigDecimal.valueOf(dataBlock.getLong(rowId, columnIndex));
+          break;
+        case FLOAT:
+          rows[rowId] = BigDecimal.valueOf(dataBlock.getFloat(rowId, columnIndex));
+          break;
+        case DOUBLE:
+          rows[rowId] = BigDecimal.valueOf(dataBlock.getDouble(rowId, columnIndex));
+          break;
+        case BIG_DECIMAL:
+          rows[rowId] = BigDecimal.valueOf(dataBlock.getBigDecimal(rowId, columnIndex).doubleValue());
+          break;
+        default:
+          throw new IllegalStateException(
+              String.format("Unsupported data type: %s for column: %s", columnDataTypes[columnIndex], columnIndex));
+      }
+    }
+
+    return rows;
+  }
+
+  /**
+   * Given a datablock and the column index, extracts the String values for the column. Prefer using this function over
+   * extractRowFromDatablock if the desired datatype is known to prevent autoboxing to Object and later unboxing to the
+   * desired type.
+   * This only works on ROW format.
+   * TODO: Add support for COLUMNAR format.
+   * @return String array of values in the column
+   */
+  public static String[] extractStringValuesForColumn(DataBlock dataBlock, int columnIndex) {
+    DataSchema dataSchema = dataBlock.getDataSchema();
+    DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+    // Get null bitmap for the column.
+    RoaringBitmap nullBitmap = extractNullBitmaps(dataBlock)[columnIndex];
+    int numRows = dataBlock.getNumberOfRows();
+
+    String[] rows = new String[numRows];
+    for (int rowId = 0; rowId < numRows; rowId++) {
+      if (nullBitmap != null && nullBitmap.contains(rowId)) {
+        continue;
+      }
+      rows[rowId] = dataBlock.getString(rowId, columnIndex);
+    }
+
+    return rows;
+  }
+
+  /**
+   * Given a datablock and the column index, extracts the byte values for the column. Prefer using this function over
+   * extractRowFromDatablock if the desired datatype is known to prevent autoboxing to Object and later unboxing to the
+   * desired type.
+   * This only works on ROW format.
+   * TODO: Add support for COLUMNAR format.
+   * @return byte array of values in the column
+   */
+  public static byte[][] extractBytesValuesForColumn(DataBlock dataBlock, int columnIndex) {
+    DataSchema dataSchema = dataBlock.getDataSchema();
+    DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+    // Get null bitmap for the column.
+    RoaringBitmap nullBitmap = extractNullBitmaps(dataBlock)[columnIndex];
+    int numRows = dataBlock.getNumberOfRows();
+
+    byte[][] rows = new byte[numRows][];
+    for (int rowId = 0; rowId < numRows; rowId++) {
+      if (nullBitmap != null && nullBitmap.contains(rowId)) {
+        continue;
+      }
+      rows[rowId] = dataBlock.getBytes(rowId, columnIndex).getBytes();
+    }
+
+    return rows;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/IntermediateStageBlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/common/IntermediateStageBlockValSet.java
new file mode 100644
index 0000000000..7ddaaf04c9
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/IntermediateStageBlockValSet.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common;
+
+import java.math.BigDecimal;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * In the multistage engine, the leaf stage servers process the data in columnar fashion. By the time the
+ * intermediate stage receives the projected column, they are converted to a row based format. This class provides
+ * the capability to convert the row based representation into columnar blocks so that they can be used to process
+ * aggregations using v1 aggregation functions.
+ * TODO: Support MV
+ */
+public class IntermediateStageBlockValSet implements BlockValSet {
+  private final FieldSpec.DataType _dataType;
+  private final DataBlock _dataBlock;
+  private final int _index;
+  private final RoaringBitmap _nullBitMap;
+
+  public IntermediateStageBlockValSet(DataSchema.ColumnDataType columnDataType, DataBlock dataBlock, int colIndex) {
+    _dataType = columnDataType.toDataType();
+    _dataBlock = dataBlock;
+    _index = colIndex;
+    _nullBitMap = dataBlock.getNullRowIds(colIndex);
+  }
+
+  /**
+   * Returns a bitmap of indices where null values are found.
+   */
+  @Nullable
+  @Override
+  public RoaringBitmap getNullBitmap() {
+    return _nullBitMap;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return _dataType;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    // TODO: Needs to be changed when we start supporting MV in multistage
+    return true;
+  }
+
+  @Nullable
+  @Override
+  public Dictionary getDictionary() {
+    return null;
+  }
+
+  @Override
+  public int[] getDictionaryIdsSV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int[] getIntValuesSV() {
+    return DataBlockUtils.extractIntValuesForColumn(_dataBlock, _index);
+  }
+
+  @Override
+  public long[] getLongValuesSV() {
+    return DataBlockUtils.extractLongValuesForColumn(_dataBlock, _index);
+  }
+
+  @Override
+  public float[] getFloatValuesSV() {
+    return DataBlockUtils.extractFloatValuesForColumn(_dataBlock, _index);
+  }
+
+  @Override
+  public double[] getDoubleValuesSV() {
+    return DataBlockUtils.extractDoubleValuesForColumn(_dataBlock, _index);
+  }
+
+  @Override
+  public BigDecimal[] getBigDecimalValuesSV() {
+    return DataBlockUtils.extractBigDecimalValuesForColumn(_dataBlock, _index);
+  }
+
+  @Override
+  public String[] getStringValuesSV() {
+    return DataBlockUtils.extractStringValuesForColumn(_dataBlock, _index);
+  }
+
+  @Override
+  public byte[][] getBytesValuesSV() {
+    return DataBlockUtils.extractBytesValuesForColumn(_dataBlock, _index);
+  }
+
+  @Override
+  public int[][] getDictionaryIdsMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int[][] getIntValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long[][] getLongValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public float[][] getFloatValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public double[][] getDoubleValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String[][] getStringValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public byte[][][] getBytesValuesMV() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int[] getNumMVEntries() {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 4a01580e61..eb8ea016fa 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -43,6 +43,7 @@ public class AggregationFunctionFactory {
    */
   public static AggregationFunction getAggregationFunction(FunctionContext function, boolean nullHandlingEnabled) {
     try {
+      // TODO(Sonam): Replace $ removal with a util function
       String upperCaseFunctionName = StringUtils.remove(function.getFunctionName(), '_').toUpperCase();
       List<ExpressionContext> arguments = function.getArguments();
       ExpressionContext firstArgument = arguments.get(0);
@@ -188,6 +189,8 @@ public class AggregationFunctionFactory {
           case MAX:
             return new MaxAggregationFunction(firstArgument, nullHandlingEnabled);
           case SUM:
+          // TODO(Sonam): Uncomment SUM0 when merging planner changes
+          // case SUM0:
             return new SumAggregationFunction(firstArgument, nullHandlingEnabled);
           case SUMPRECISION:
             return new SumPrecisionAggregationFunction(arguments, nullHandlingEnabled);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageAggregationExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageAggregationExecutor.java
new file mode 100644
index 0000000000..5a3f61198e
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageAggregationExecutor.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.IntermediateStageBlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * Class that executes all aggregation functions (without group-bys) for the multistage AggregateOperator.
+ */
+public class MultistageAggregationExecutor {
+  private final NewAggregateOperator.Mode _mode;
+  // The identifier operands for the aggregation function only store the column name. This map contains mapping
+  // from column name to their index.
+  private final Map<String, Integer> _colNameToIndexMap;
+
+  private final AggregationFunction[] _aggFunctions;
+
+  // Result holders for each mode.
+  private final AggregationResultHolder[] _aggregateResultHolder;
+  private final Object[] _mergeResultHolder;
+  private final Object[] _finalResultHolder;
+
+  public MultistageAggregationExecutor(AggregationFunction[] aggFunctions, NewAggregateOperator.Mode mode,
+      Map<String, Integer> colNameToIndexMap) {
+    _aggFunctions = aggFunctions;
+    _mode = mode;
+    _colNameToIndexMap = colNameToIndexMap;
+
+    _aggregateResultHolder = new AggregationResultHolder[aggFunctions.length];
+    _mergeResultHolder = new Object[aggFunctions.length];
+    _finalResultHolder = new Object[aggFunctions.length];
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      _aggregateResultHolder[i] = _aggFunctions[i].createAggregationResultHolder();
+    }
+  }
+
+  /**
+   * Performs aggregation for the data in the block.
+   */
+  public void processBlock(TransferableBlock block, DataSchema inputDataSchema) {
+    if (_mode.equals(NewAggregateOperator.Mode.AGGREGATE)) {
+      processAggregate(block, inputDataSchema);
+    } else if (_mode.equals(NewAggregateOperator.Mode.MERGE)) {
+      processMerge(block);
+    } else if (_mode.equals(NewAggregateOperator.Mode.EXTRACT_RESULT)) {
+      collectResult(block);
+    }
+  }
+
+  /**
+   * @return an empty agg result block for non-group-by aggregation.
+   */
+  public Object[] constructEmptyAggResultRow() {
+    Object[] row = new Object[_aggFunctions.length];
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      AggregationFunction aggFunction = _aggFunctions[i];
+      row[i] = aggFunction.extractAggregationResult(aggFunction.createAggregationResultHolder());
+    }
+    return row;
+  }
+
+  /**
+   * Fetches the result.
+   */
+  public List<Object[]> getResult() {
+    List<Object[]> rows = new ArrayList<>();
+    Object[] row = new Object[_aggFunctions.length];
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      AggregationFunction aggFunction = _aggFunctions[i];
+      if (_mode.equals(NewAggregateOperator.Mode.MERGE)) {
+        Object value = _mergeResultHolder[i];
+        row[i] = convertObjectToReturnType(_aggFunctions[i].getType(), value);
+      } else if (_mode.equals(NewAggregateOperator.Mode.AGGREGATE)) {
+        Object value = aggFunction.extractAggregationResult(_aggregateResultHolder[i]);
+        row[i] = convertObjectToReturnType(_aggFunctions[i].getType(), value);
+      } else {
+        assert _mode.equals(NewAggregateOperator.Mode.EXTRACT_RESULT);
+        Comparable result = aggFunction.extractFinalResult(_finalResultHolder[i]);
+        row[i] = result == null ? null : aggFunction.getFinalResultColumnType().convert(result);
+      }
+    }
+    rows.add(row);
+    return rows;
+  }
+
+  private Object convertObjectToReturnType(AggregationFunctionType aggType, Object value) {
+    // For bool_and and bool_or aggregation functions, the return type for aggregate and merge modes are set as
+    // boolean. However, the v1 bool_and and bool_or function uses Integer as the intermediate type.
+    boolean boolAndOrAgg =
+        aggType.equals(AggregationFunctionType.BOOLAND) || aggType.equals(AggregationFunctionType.BOOLOR);
+    if (boolAndOrAgg && value instanceof Integer) {
+      Boolean boolVal = ((Number) value).intValue() > 0 ? true : false;
+      return boolVal;
+    }
+    return value;
+  }
+
+  private void processAggregate(TransferableBlock block, DataSchema inputDataSchema) {
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      AggregationFunction aggregationFunction = _aggFunctions[i];
+      Map<ExpressionContext, BlockValSet> blockValSetMap =
+          getBlockValSetMap(aggregationFunction, block, inputDataSchema);
+      aggregationFunction.aggregate(block.getNumRows(), _aggregateResultHolder[i], blockValSetMap);
+    }
+  }
+
+  private void processMerge(TransferableBlock block) {
+    List<Object[]> container = block.getContainer();
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      for (Object[] row : container) {
+        Object intermediateResultToMerge = extractValueFromRow(_aggFunctions[i], row);
+        Object mergedIntermediateResult = _mergeResultHolder[i];
+
+        // Not all V1 aggregation functions have null-handling logic. Handle null values before calling merge.
+        if (intermediateResultToMerge == null) {
+          continue;
+        }
+        if (mergedIntermediateResult == null) {
+          _mergeResultHolder[i] = intermediateResultToMerge;
+          continue;
+        }
+
+        _mergeResultHolder[i] = _aggFunctions[i].merge(intermediateResultToMerge, mergedIntermediateResult);
+      }
+    }
+  }
+
+  private void collectResult(TransferableBlock block) {
+    List<Object[]> container = block.getContainer();
+    assert container.size() == 1;
+    Object[] row = container.get(0);
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      _finalResultHolder[i] = extractValueFromRow(_aggFunctions[i], row);
+    }
+  }
+
+  private Map<ExpressionContext, BlockValSet> getBlockValSetMap(AggregationFunction aggFunction,
+      TransferableBlock block, DataSchema inputDataSchema) {
+    List<ExpressionContext> expressions = aggFunction.getInputExpressions();
+    int numExpressions = expressions.size();
+    if (numExpressions == 0) {
+      return Collections.emptyMap();
+    }
+
+    Preconditions.checkState(numExpressions == 1, "Cannot handle more than one identifier in aggregation function.");
+    ExpressionContext expression = expressions.get(0);
+    Preconditions.checkState(expression.getType().equals(ExpressionContext.Type.IDENTIFIER));
+    int index = _colNameToIndexMap.get(expression.getIdentifier());
+
+    DataSchema.ColumnDataType dataType = inputDataSchema.getColumnDataType(index);
+    Preconditions.checkState(block.getType().equals(DataBlock.Type.ROW), "Datablock type is not ROW");
+    // TODO: If the previous block is not mailbox received, this method is not efficient.  Then getDataBlock() will
+    //  convert the unserialized format to serialized format of BaseDataBlock. Then it will convert it back to column
+    //  value primitive type.
+    return Collections.singletonMap(expression,
+        new IntermediateStageBlockValSet(dataType, block.getDataBlock(), index));
+  }
+
+  Object extractValueFromRow(AggregationFunction aggregationFunction, Object[] row) {
+    // TODO: Add support to handle aggregation functions where:
+    //       1. The identifier need not be the first argument
+    //       2. There are more than one identifiers.
+    List<ExpressionContext> expressions = aggregationFunction.getInputExpressions();
+    Preconditions.checkState(expressions.size() == 1);
+    ExpressionContext expr = expressions.get(0);
+    ExpressionContext.Type exprType = expr.getType();
+
+    if (exprType.equals(ExpressionContext.Type.IDENTIFIER)) {
+      String colName = expr.getIdentifier();
+      int colIndex = _colNameToIndexMap.get(colName);
+
+      Object value = row[colIndex];
+
+      // Boolean aggregation functions like BOOL_AND and BOOL_OR have return types set to Boolean. However, their
+      // intermediateResultType is Integer. To handle this case convert Boolean objects to Integer objects.
+      boolean boolAndOrAgg =
+          aggregationFunction.getType().equals(AggregationFunctionType.BOOLAND) || aggregationFunction.getType()
+              .equals(AggregationFunctionType.BOOLOR);
+      if (boolAndOrAgg && value instanceof Boolean) {
+        Integer intVal = ((Boolean) value).booleanValue() ? 1 : 0;
+        return intVal;
+      }
+
+      return value;
+    }
+
+    Preconditions.checkState(exprType.equals(ExpressionContext.Type.LITERAL), "Invalid expression type");
+    return expr.getLiteral().getValue();
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
new file mode 100644
index 0000000000..dedb9f9b58
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.IntermediateStageBlockValSet;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * Class that executes the group by aggregations for the multistage AggregateOperator.
+ */
+public class MultistageGroupByExecutor {
+  private final NewAggregateOperator.Mode _mode;
+  // The identifier operands for the aggregation function only store the column name. This map contains mapping
+  // between column name to their index which is used in v2 engine.
+  private final Map<String, Integer> _colNameToIndexMap;
+
+  private final List<ExpressionContext> _groupSet;
+  private final AggregationFunction[] _aggFunctions;
+
+  // Group By Result holders for each mode
+  private final GroupByResultHolder[] _aggregateResultHolders;
+  private final Map<Integer, Object[]> _mergeResultHolder;
+  private final List<Object[]> _finalResultHolder;
+
+  // Mapping from the row-key to a zero based integer index. This is used when we invoke the v1 aggregation functions
+  // because they use the zero based integer indexes to store results.
+  private int _groupId = 0;
+  private Map<Key, Integer> _groupKeyToIdMap;
+
+  // Mapping from the group by row-key to the values in the row which form the key. Used to fetch the actual row
+  // values when populating the result.
+  private final Map<Key, Object[]> _groupByKeyHolder;
+
+  public MultistageGroupByExecutor(List<ExpressionContext> groupByExpr, AggregationFunction[] aggFunctions,
+      NewAggregateOperator.Mode mode, Map<String, Integer> colNameToIndexMap) {
+    _mode = mode;
+    _colNameToIndexMap = colNameToIndexMap;
+    _groupSet = groupByExpr;
+    _aggFunctions = aggFunctions;
+
+    _aggregateResultHolders = new GroupByResultHolder[_aggFunctions.length];
+    _mergeResultHolder = new HashMap<>();
+    _finalResultHolder = new ArrayList<>();
+
+    _groupKeyToIdMap = new HashMap<>();
+    _groupByKeyHolder = new HashMap<>();
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      _aggregateResultHolders[i] =
+          _aggFunctions[i].createGroupByResultHolder(InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+              InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+    }
+  }
+
+  /**
+   * Performs group-by aggregation for the data in the block.
+   */
+  public void processBlock(TransferableBlock block, DataSchema inputDataSchema) {
+    if (_mode.equals(NewAggregateOperator.Mode.AGGREGATE)) {
+      processAggregate(block, inputDataSchema);
+    } else if (_mode.equals(NewAggregateOperator.Mode.MERGE)) {
+      processMerge(block);
+    } else if (_mode.equals(NewAggregateOperator.Mode.EXTRACT_RESULT)) {
+      collectResult(block);
+    }
+  }
+
+  /**
+   * Fetches the result.
+   */
+  public List<Object[]> getResult() {
+    List<Object[]> rows = new ArrayList<>();
+
+    if (_mode.equals(NewAggregateOperator.Mode.EXTRACT_RESULT)) {
+      return extractFinalGroupByResult();
+    }
+
+    // If the mode is MERGE or AGGREGATE, the groupby keys are already collected in _groupByKeyHolder by virtue of
+    // generating the row keys.
+    for (Map.Entry<Key, Object[]> e : _groupByKeyHolder.entrySet()) {
+      int numCols = _groupSet.size() + _aggFunctions.length;
+      Object[] row = new Object[numCols];
+      Object[] keyElements = e.getValue();
+      System.arraycopy(keyElements, 0, row, 0, keyElements.length);
+
+      for (int i = 0; i < _aggFunctions.length; i++) {
+        int index = i + _groupSet.size();
+        int groupId = _groupKeyToIdMap.get(e.getKey());
+        if (_mode.equals(NewAggregateOperator.Mode.MERGE)) {
+          Object value = _mergeResultHolder.get(groupId)[i];
+          row[index] = convertObjectToReturnType(_aggFunctions[i].getType(), value);
+        } else if (_mode.equals(NewAggregateOperator.Mode.AGGREGATE)) {
+          Object value = _aggFunctions[i].extractGroupByResult(_aggregateResultHolders[i], groupId);
+          row[index] = convertObjectToReturnType(_aggFunctions[i].getType(), value);
+        }
+      }
+
+      rows.add(row);
+    }
+
+    return rows;
+  }
+
+  private List<Object[]> extractFinalGroupByResult() {
+    List<Object[]> rows = new ArrayList<>();
+    for (Object[] resultRow : _finalResultHolder) {
+      int numCols = _groupSet.size() + _aggFunctions.length;
+      Object[] row = new Object[numCols];
+      System.arraycopy(resultRow, 0, row, 0, _groupSet.size());
+
+      for (int i = 0; i < _aggFunctions.length; i++) {
+        int aggIdx = i + _groupSet.size();
+        Comparable result = _aggFunctions[i].extractFinalResult(resultRow[aggIdx]);
+        row[aggIdx] = result == null ? null : _aggFunctions[i].getFinalResultColumnType().convert(result);
+      }
+
+      rows.add(row);
+    }
+    return rows;
+  }
+
+  private Object convertObjectToReturnType(AggregationFunctionType aggType, Object value) {
+    // For bool_and and bool_or aggregation functions, the return type for aggregate and merge modes are set as
+    // boolean. However, the v1 bool_and and bool_or function uses Integer as the intermediate type.
+    boolean boolAndOrAgg =
+        aggType.equals(AggregationFunctionType.BOOLAND) || aggType.equals(AggregationFunctionType.BOOLOR);
+    if (boolAndOrAgg && value instanceof Integer) {
+      Boolean boolVal = ((Number) value).intValue() > 0 ? true : false;
+      return boolVal;
+    }
+    return value;
+  }
+
+  private void processAggregate(TransferableBlock block, DataSchema inputDataSchema) {
+    int[] intKeys = generateGroupByKeys(block.getContainer());
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      AggregationFunction aggregationFunction = _aggFunctions[i];
+      Map<ExpressionContext, BlockValSet> blockValSetMap =
+          getBlockValSetMap(aggregationFunction, block, inputDataSchema);
+      GroupByResultHolder groupByResultHolder = _aggregateResultHolders[i];
+      groupByResultHolder.ensureCapacity(_groupKeyToIdMap.size());
+      aggregationFunction.aggregateGroupBySV(block.getNumRows(), intKeys, groupByResultHolder, blockValSetMap);
+    }
+  }
+
+  private void processMerge(TransferableBlock block) {
+    List<Object[]> container = block.getContainer();
+    int[] intKeys = generateGroupByKeys(container);
+
+    for (int i = 0; i < _aggFunctions.length; i++) {
+      for (int j = 0; j < container.size(); j++) {
+        Object[] row = container.get(j);
+        int rowKey = intKeys[j];
+        if (!_mergeResultHolder.containsKey(rowKey)) {
+          _mergeResultHolder.put(rowKey, new Object[_aggFunctions.length]);
+        }
+        Object intermediateResultToMerge = extractValueFromRow(_aggFunctions[i], row);
+        Object mergedIntermediateResult = _mergeResultHolder.get(rowKey)[i];
+
+        // Not all V1 aggregation functions have null-handling. So handle null values and call merge only if necessary.
+        if (intermediateResultToMerge == null) {
+          continue;
+        }
+        if (mergedIntermediateResult == null) {
+          _mergeResultHolder.get(rowKey)[i] = intermediateResultToMerge;
+          continue;
+        }
+
+        _mergeResultHolder.get(rowKey)[i] = _aggFunctions[i].merge(intermediateResultToMerge, mergedIntermediateResult);
+      }
+    }
+  }
+
+  private void collectResult(TransferableBlock block) {
+    List<Object[]> container = block.getContainer();
+    for (Object[] row : container) {
+      assert row.length == _groupSet.size() + _aggFunctions.length;
+      Object[] resultRow = new Object[row.length];
+      System.arraycopy(row, 0, resultRow, 0, _groupSet.size());
+
+      for (int i = 0; i < _aggFunctions.length; i++) {
+        int index = _groupSet.size() + i;
+        resultRow[index] = extractValueFromRow(_aggFunctions[i], row);
+      }
+
+      _finalResultHolder.add(resultRow);
+    }
+  }
+
+  /**
+   * Creates the group by key for each row. Converts the key into a 0-index based int value that can be used by
+   * GroupByAggregationResultHolders used in v1 aggregations.
+   * <p>
+   * Returns the int key for each row.
+   */
+  private int[] generateGroupByKeys(List<Object[]> rows) {
+    int[] rowKeys = new int[rows.size()];
+    int numGroups = _groupSet.size();
+
+    for (int i = 0; i < rows.size(); i++) {
+      Object[] row = rows.get(i);
+
+      Object[] keyElements = new Object[numGroups];
+      for (int j = 0; j < numGroups; j++) {
+        String colName = _groupSet.get(j).getIdentifier();
+        int colIndex = _colNameToIndexMap.get(colName);
+        keyElements[j] = row[colIndex];
+      }
+
+      Key rowKey = new Key(keyElements);
+      _groupByKeyHolder.put(rowKey, rowKey.getValues());
+      if (!_groupKeyToIdMap.containsKey(rowKey)) {
+        _groupKeyToIdMap.put(rowKey, _groupId);
+        ++_groupId;
+      }
+      rowKeys[i] = _groupKeyToIdMap.get(rowKey);
+    }
+
+    return rowKeys;
+  }
+
+  private Map<ExpressionContext, BlockValSet> getBlockValSetMap(AggregationFunction aggFunction,
+      TransferableBlock block, DataSchema inputDataSchema) {
+    List<ExpressionContext> expressions = aggFunction.getInputExpressions();
+    int numExpressions = expressions.size();
+    if (numExpressions == 0) {
+      return Collections.emptyMap();
+    }
+
+    Preconditions.checkState(numExpressions == 1, "Cannot handle more than one identifier in aggregation function.");
+    ExpressionContext expression = expressions.get(0);
+    Preconditions.checkState(expression.getType().equals(ExpressionContext.Type.IDENTIFIER));
+    int index = _colNameToIndexMap.get(expression.getIdentifier());
+
+    DataSchema.ColumnDataType dataType = inputDataSchema.getColumnDataType(index);
+    Preconditions.checkState(block.getType().equals(DataBlock.Type.ROW), "Datablock type is not ROW");
+    // TODO: If the previous block is not mailbox received, this method is not efficient.  Then getDataBlock() will
+    //  convert the unserialized format to serialized format of BaseDataBlock. Then it will convert it back to column
+    //  value primitive type.
+    return Collections.singletonMap(expression,
+        new IntermediateStageBlockValSet(dataType, block.getDataBlock(), index));
+  }
+
+  Object extractValueFromRow(AggregationFunction aggregationFunction, Object[] row) {
+    // TODO: Add support to handle aggregation functions where:
+    //       1. The identifier need not be the first argument
+    //       2. There are more than one identifiers.
+    List<ExpressionContext> expressions = aggregationFunction.getInputExpressions();
+    Preconditions.checkState(expressions.size() == 1);
+    ExpressionContext expr = expressions.get(0);
+    ExpressionContext.Type exprType = expr.getType();
+
+    if (exprType.equals(ExpressionContext.Type.IDENTIFIER)) {
+      String colName = expr.getIdentifier();
+      int colIndex = _colNameToIndexMap.get(colName);
+      Object value = row[colIndex];
+
+      // Boolean aggregation functions like BOOL_AND and BOOL_OR have return types set to Boolean. However, their
+      // intermediateResultType is Integer. To handle this case convert Boolean objects to Integer objects.
+      boolean boolAndOrAgg =
+          aggregationFunction.getType().equals(AggregationFunctionType.BOOLAND) || aggregationFunction.getType()
+              .equals(AggregationFunctionType.BOOLOR);
+      if (boolAndOrAgg && value instanceof Boolean) {
+        Integer intVal = ((Boolean) value).booleanValue() ? 1 : 0;
+        return intVal;
+      }
+
+      return value;
+    }
+
+    Preconditions.checkState(exprType.equals(ExpressionContext.Type.LITERAL), "Invalid expression type");
+    return expr.getLiteral().getValue();
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NewAggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NewAggregateOperator.java
new file mode 100644
index 0000000000..fda873b84e
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NewAggregateOperator.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ *
+ * AggregateOperator is used to aggregate values over a set of group by keys.
+ * Output data will be in the format of [group by key, aggregate result1, ... aggregate resultN]
+ * Currently, we only support the following aggregation functions:
+ * 1. SUM
+ * 2. COUNT
+ * 3. MIN
+ * 4. MAX
+ * 5. DistinctCount and Count(Distinct)
+ * 6. AVG
+ * 7. FourthMoment
+ * 8. BoolAnd and BoolOr
+ *
+ * When the list of aggregation calls is empty, this class is used to calculate distinct result based on group by keys.
+ * In this case, the input can be any type.
+ *
+ * If the list of aggregation calls is not empty, the input of aggregation has to be a number.
+ * Note: This class performs aggregation over the double value of input.
+ * If the input is single value, the output type will be input type. Otherwise, the output type will be double.
+ */
+// TODO(Sonam): Rename to AggregateOperator when merging Planner support.
+public class NewAggregateOperator extends MultiStageOperator {
+  private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+
+  private final MultiStageOperator _inputOperator;
+  private final DataSchema _resultSchema;
+  private final DataSchema _inputSchema;
+
+  // Map that maintains the mapping between columnName and the column ordinal index. It is used to fetch the required
+  // column value from row-based container and fetch the input datatype for the column.
+  private final HashMap<String, Integer> _colNameToIndexMap;
+
+  private TransferableBlock _upstreamErrorBlock;
+  private boolean _readyToConstruct;
+  private boolean _hasReturnedAggregateBlock;
+
+  private final boolean _isGroupByAggregation;
+  private MultistageAggregationExecutor _aggregationExecutor;
+  private MultistageGroupByExecutor _groupByExecutor;
+
+  // This aggregation operator uses the v1 Aggregation functions to process the rows. It operates in one of the 3 modes:
+  // 1. Aggregate Mode:
+  //    - Calls aggregate(), aggregateGroupBySV(), aggregateGroupByMV()
+  //    - This mode is used when isLeafStage hint is set or if treatIntermediateAsLeaf is true.
+  // 2. Merge Mode:
+  //    - Calls merge()
+  //    - This mode is used when isIntermediateStage hint is set and if treatIntermediateAsLeaf is false.
+  // 3. ExtractResult Mode:
+  //    - Calls extractFinalResult()
+  //    - This mode is used when isFinalStage() hint is set.
+  enum Mode {
+    AGGREGATE,
+    MERGE,
+    EXTRACT_RESULT
+  }
+
+  // TODO: refactor Pinot Reducer code to support the intermediate stage agg operator.
+  // aggCalls has to be a list of FunctionCall and cannot be null
+  // groupSet has to be a list of InputRef and cannot be null
+  // TODO: Add these two checks when we confirm we can handle error in upstream ctor call.
+
+  @VisibleForTesting
+  public NewAggregateOperator(OpChainExecutionContext context, MultiStageOperator inputOperator,
+      DataSchema resultSchema, DataSchema inputSchema, List<RexExpression> aggCalls, List<RexExpression> groupSet,
+      boolean isLeafStage, boolean isIntermediateStage, boolean extractFinalResult, boolean treatIntermediateAsLeaf) {
+    super(context);
+    _inputOperator = inputOperator;
+    _resultSchema = resultSchema;
+    _inputSchema = inputSchema;
+
+    _upstreamErrorBlock = null;
+    _readyToConstruct = false;
+    _hasReturnedAggregateBlock = false;
+    _colNameToIndexMap = new HashMap<>();
+
+    Mode mode;
+    if (extractFinalResult) {
+      mode = Mode.EXTRACT_RESULT;
+    } else if (isIntermediateStage && !treatIntermediateAsLeaf) {
+      mode = Mode.MERGE;
+    } else {
+      assert isLeafStage || (isIntermediateStage && treatIntermediateAsLeaf);
+      mode = Mode.AGGREGATE;
+    }
+
+    // Convert groupSet to ExpressionContext that our aggregation functions understand.
+    List<ExpressionContext> groupByExpr = getGroupSet(groupSet);
+    List<FunctionContext> functionContexts = getFunctionContexts(aggCalls);
+    AggregationFunction[] aggFunctions = new AggregationFunction[functionContexts.size()];
+
+    for (int i = 0; i < functionContexts.size(); i++) {
+      aggFunctions[i] = AggregationFunctionFactory.getAggregationFunction(functionContexts.get(i), true);
+    }
+
+    // Initialize the appropriate executor.
+    if (!groupSet.isEmpty()) {
+      _isGroupByAggregation = true;
+      _groupByExecutor = new MultistageGroupByExecutor(groupByExpr, aggFunctions, mode, _colNameToIndexMap);
+    } else {
+      _isGroupByAggregation = false;
+      _aggregationExecutor = new MultistageAggregationExecutor(aggFunctions, mode, _colNameToIndexMap);
+    }
+  }
+
+  @Override
+  public List<MultiStageOperator> getChildOperators() {
+    return ImmutableList.of(_inputOperator);
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected TransferableBlock getNextBlock() {
+    try {
+      if (!_readyToConstruct && !consumeInputBlocks()) {
+        return TransferableBlockUtils.getNoOpTransferableBlock();
+      }
+
+      if (_upstreamErrorBlock != null) {
+        return _upstreamErrorBlock;
+      }
+
+      if (!_hasReturnedAggregateBlock) {
+        return produceAggregatedBlock();
+      } else {
+        // TODO: Move to close call.
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      }
+    } catch (Exception e) {
+      return TransferableBlockUtils.getErrorTransferableBlock(e);
+    }
+  }
+
+  private TransferableBlock produceAggregatedBlock() {
+    List<Object[]> rows = _isGroupByAggregation ? _groupByExecutor.getResult() : _aggregationExecutor.getResult();
+
+    _hasReturnedAggregateBlock = true;
+    if (rows.size() == 0) {
+      if (!_isGroupByAggregation) {
+        Object[] row = _aggregationExecutor.constructEmptyAggResultRow();
+        return new TransferableBlock(Collections.singletonList(row), _resultSchema, DataBlock.Type.ROW);
+      } else {
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      }
+    } else {
+      return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
+    }
+  }
+
+  /**
+   * @return whether or not the operator is ready to move on (EOS or ERROR)
+   */
+  private boolean consumeInputBlocks() {
+    TransferableBlock block = _inputOperator.nextBlock();
+    while (!block.isNoOpBlock()) {
+      // setting upstream error block
+      if (block.isErrorBlock()) {
+        _upstreamErrorBlock = block;
+        return true;
+      } else if (block.isEndOfStreamBlock()) {
+        _readyToConstruct = true;
+        return true;
+      }
+
+      if (_isGroupByAggregation) {
+        _groupByExecutor.processBlock(block, _inputSchema);
+      } else {
+        _aggregationExecutor.processBlock(block, _inputSchema);
+      }
+
+      block = _inputOperator.nextBlock();
+    }
+    return false;
+  }
+
+  private List<FunctionContext> getFunctionContexts(List<RexExpression> aggCalls) {
+    List<RexExpression.FunctionCall> aggFunctionCalls =
+        aggCalls.stream().map(RexExpression.FunctionCall.class::cast).collect(Collectors.toList());
+    List<FunctionContext> functionContexts = new ArrayList<>();
+    for (RexExpression.FunctionCall functionCall : aggFunctionCalls) {
+      FunctionContext funcContext = convertRexExpressionsToFunctionContext(functionCall);
+      functionContexts.add(funcContext);
+    }
+    return functionContexts;
+  }
+
+  private FunctionContext convertRexExpressionsToFunctionContext(RexExpression.FunctionCall aggFunctionCall) {
+    // Extract details from RexExpression aggFunctionCall.
+    String functionName = aggFunctionCall.getFunctionName();
+    List<RexExpression> functionOperands = aggFunctionCall.getFunctionOperands();
+
+    List<ExpressionContext> aggArguments = new ArrayList<>();
+    for (RexExpression operand : functionOperands) {
+      ExpressionContext exprContext = convertRexExpressionToExpressionContext(operand);
+      aggArguments.add(exprContext);
+    }
+
+    if (aggArguments.isEmpty()) {
+      // This can only be true for COUNT aggregation functions.
+      // The literal value here does not matter. We create a dummy literal here just so that the count aggregation
+      // has some column to process.
+      ExpressionContext literalExpr = ExpressionContext.forLiteralContext(FieldSpec.DataType.LONG, 1L);
+      aggArguments.add(literalExpr);
+    }
+
+    FunctionContext functionContext = new FunctionContext(FunctionContext.Type.AGGREGATION, functionName, aggArguments);
+    return functionContext;
+  }
+
+  private List<ExpressionContext> getGroupSet(List<RexExpression> groupBySetRexExpr) {
+    List<ExpressionContext> groupByExprContext = new ArrayList<>();
+    for (RexExpression groupByRexExpr : groupBySetRexExpr) {
+      ExpressionContext exprContext = convertRexExpressionToExpressionContext(groupByRexExpr);
+      groupByExprContext.add(exprContext);
+    }
+
+    return groupByExprContext;
+  }
+
+  private ExpressionContext convertRexExpressionToExpressionContext(RexExpression rexExpr) {
+    ExpressionContext exprContext;
+
+    // This is used only for aggregation arguments and groupby columns. The rexExpression can never be a function type.
+    switch (rexExpr.getKind()) {
+      case INPUT_REF: {
+        RexExpression.InputRef inputRef = (RexExpression.InputRef) rexExpr;
+        int identifierIndex = inputRef.getIndex();
+        String columnName = _inputSchema.getColumnName(identifierIndex);
+        // Calcite generates unique column names for aggregation functions. For example, select avg(col1), sum(col1)
+        // will generate names $f0 and $f1 for avg and sum respectively. We use a map to store the name -> index
+        // mapping to extract the required column value from row-based container and fetch the input datatype for the
+        // column.
+        _colNameToIndexMap.put(columnName, identifierIndex);
+        exprContext = ExpressionContext.forIdentifier(columnName);
+        break;
+      }
+      case LITERAL: {
+        RexExpression.Literal literalRexExp = (RexExpression.Literal) rexExpr;
+        Object value = literalRexExp.getValue();
+        exprContext = ExpressionContext.forLiteralContext(literalRexExp.getDataType(), value);
+        break;
+      }
+      default:
+        throw new IllegalStateException("Aggregation Function operands or GroupBy columns cannot be a function.");
+    }
+
+    return exprContext;
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 722790d867..7385ca1d48 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.plan;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.plannode.AggregateNode;
 import org.apache.pinot.query.planner.plannode.ExchangeNode;
 import org.apache.pinot.query.planner.plannode.FilterNode;
@@ -96,6 +97,18 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
   @Override
   public MultiStageOperator visitAggregate(AggregateNode node, PhysicalPlanContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
+    DataSchema inputSchema = node.getInputs().get(0).getDataSchema();
+    DataSchema resultSchema = node.getDataSchema();
+
+    // TODO(Sonam): Rename to AggregateOperator when the planner changes are merged.
+//    boolean extractFinalResult = AggregateNode.isFinalStage(node);
+//    boolean isIntermediateStage = AggregateNode.isIntermediateStage(node);
+//    boolean isLeafStage = AggregateNode.isLeafStage(node);
+//    boolean treatIntermediateAsLeaf = node.isTreatIntermediateStageAsLeaf();
+//
+//    return new NewAggregateOperator(context.getOpChainExecutionContext(), nextOperator, resultSchema, inputSchema,
+//        node.getAggCalls(), node.getGroupSet(), isLeafStage, isIntermediateStage, extractFinalResult,
+//        treatIntermediateAsLeaf);
     return new AggregateOperator(context.getOpChainExecutionContext(), nextOperator, node.getDataSchema(),
         node.getAggCalls(), node.getGroupSet(), node.getInputs().get(0).getDataSchema());
   }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index bda5086a40..21b4b047bd 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -44,7 +44,7 @@ import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.DOUBLE;
 import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
 import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.STRING;
 
-
+// TODO(Sonam): Ensure test passes when the switch to AggregateOperator is made.
 public class AggregateOperatorTest {
 
   private AutoCloseable _mocks;
diff --git a/pinot-query-runtime/src/test/resources/queries/Aggregates.json b/pinot-query-runtime/src/test/resources/queries/Aggregates.json
index aaafa39651..bd3132090b 100644
--- a/pinot-query-runtime/src/test/resources/queries/Aggregates.json
+++ b/pinot-query-runtime/src/test/resources/queries/Aggregates.json
@@ -1,5 +1,5 @@
 {
-  "general_aggregate": {
+  "general_aggregate_no_filter": {
     "tables": {
       "tbl": {
         "schema": [
@@ -22,54 +22,638 @@
     "queries": [
       {
         "psql": "4.2.7",
-        "description": "average double",
-        "sql": "SELECT avg(double_col) FROM {tbl}"
+        "description": "count * aggregation",
+        "sql": "SELECT count(*) FROM {tbl}"
       },
       {
         "psql": "4.2.7",
-        "description": "average double with filter",
-        "sql": "SELECT avg(double_col) FROM {tbl} WHERE double_col >= 100"
+        "description": "aggregations on double column",
+        "sql": "SELECT min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl}"
       },
       {
         "psql": "4.2.7",
-        "description": "sum double",
-        "sql": "SELECT sum(double_col) FROM {tbl}"
+        "description": "aggregations on int column",
+        "sql": "SELECT min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl}"
       },
       {
         "psql": "4.2.7",
-        "description": "sum int",
-        "sql": "SELECT sum(int_col) FROM {tbl}"
+        "description": "aggregations on bool column",
+        "sql": "SELECT bool_and(bool_col), bool_or(bool_col), count(bool_col), count(distinct(bool_col)), count(*) FROM {tbl}"
       },
       {
         "psql": "4.2.7",
-        "description": "max double",
-        "sql": "SELECT max(double_col) FROM {tbl}"
+        "description": "aggregations on string column",
+        "sql": "SELECT count(string_col), count(distinct(string_col)), count(*) FROM {tbl}"
+      }
+    ]
+  },
+  "general_aggregate_with_filter": {
+    "tables": {
+      "tbl": {
+        "schema": [
+          {"name": "int_col", "type": "INT"},
+          {"name": "double_col", "type": "DOUBLE"},
+          {"name": "string_col", "type": "STRING"},
+          {"name": "bool_col", "type": "BOOLEAN"}
+        ],
+        "inputs": [
+          [2, 300, "a", true],
+          [2, 400, "a", true],
+          [3, 100, "b", false],
+          [100, 1, "b", false],
+          [101, 1.01, "c", false],
+          [150, 1.5, "c", false],
+          [175, 1.75, "c", true]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "psql": "4.2.7",
+        "description": "count aggregation with filters",
+        "sql": "SELECT count(*) FROM {tbl} WHERE string_col='b'"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on double column with string filters",
+        "sql": "SELECT min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} WHERE string_col='b'"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on double column with int filters",
+        "sql": "SELECT min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} WHERE int_col > 100"
+      },
+      {
+        "psql": "4.2.7",
+        "ignored": true,
+        "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code because existing code returns INT 0. But H2 returns a double.",
+        "description": "aggregate int column and filter by int column",
+        "sql": "SELECT sum(1 / int_col) FROM {tbl} WHERE int_col > 0",
+        "h2Sql": "SELECT sum(1.0 / int_col) FROM {tbl} WHERE int_col > 0"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregate double column and filter by double column",
+        "sql": "SELECT sum(1 / double_col) FROM {tbl} WHERE double_col < 2"
+      },
+      {
+        "psql": "4.2.7",
+        "ignored": true,
+        "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code but is fixed with this patch",
+        "description": "aggregations on double column with filters",
+        "sql": "SELECT min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on int column with filters",
+        "sql": "SELECT min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE string_col='b'"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on int column with filters",
+        "sql": "SELECT min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on int column with filters",
+        "sql": "SELECT min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on int column with filters",
+        "sql": "SELECT min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE string_col='b'"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on int column with filters",
+        "sql": "SELECT min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on int column with filters",
+        "sql": "SELECT min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on bool column with filters",
+        "sql": "SELECT bool_and(bool_col), bool_or(bool_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE string_col='b'"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on bool column with filters",
+        "sql": "SELECT bool_and(bool_col), bool_or(bool_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on bool column with filters",
+        "sql": "SELECT bool_and(bool_col), bool_or(bool_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on string column with filters",
+        "sql": "SELECT count(string_col), count(distinct(string_col)), count(*) FROM {tbl} WHERE string_col='b'"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on string column with filters",
+        "sql": "SELECT count(string_col), count(distinct(string_col)), count(*) FROM {tbl} WHERE int_col > 100"
+      },
+      {
+        "psql": "4.2.7",
+        "ignored": true,
+        "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code but is fixed with this patch.",
+        "description": "aggregations on string column with filters",
+        "sql": "SELECT count(string_col), count(distinct(string_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75"
+      }
+    ]
+  },
+  "general_group_by_aggregate_no_filter": {
+    "tables": {
+      "tbl": {
+        "schema": [
+          {"name": "int_col", "type": "INT"},
+          {"name": "double_col", "type": "DOUBLE"},
+          {"name": "string_col", "type": "STRING"},
+          {"name": "bool_col", "type": "BOOLEAN"}
+        ],
+        "inputs": [
+          [2, 300, "a", true],
+          [2, 400, "a", true],
+          [3, 100, "b", false],
+          [100, 1, "b", false],
+          [101, 1.01, "c", false],
+          [150, 1.5, "c", false],
+          [175, 1.75, "c", true]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "psql": "4.2.7",
+        "description": "group by with count",
+        "sql": "SELECT int_col, count(*) FROM {tbl} GROUP BY int_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "group by with aggregation on double column",
+        "sql": "SELECT int_col, min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} GROUP BY int_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "group by with aggregation on double column and order by",
+        "sql": "SELECT int_col, min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} GROUP BY int_col ORDER BY int_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "group by with aggregations on int column",
+        "sql": "SELECT double_col, min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} GROUP BY double_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "group by with aggregations on int column and order by",
+        "sql": "SELECT double_col, min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} GROUP BY double_col ORDER BY double_col"
       },
       {
         "psql": "4.2.7",
-        "description": "max int",
-        "sql": "SELECT max(int_col) FROM {tbl}"
+        "description": "group by with aggregations on bool column",
+        "sql": "SELECT string_col, bool_and(bool_col), bool_or(bool_col), count(bool_col), count(distinct(bool_col)), count(*) FROM {tbl} GROUP BY string_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "group by with aggregations on bool column and order by",
+        "sql": "SELECT string_col, bool_and(bool_col), bool_or(bool_col), count(bool_col), count(distinct(bool_col)), count(*) FROM {tbl} GROUP BY string_col ORDER BY string_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "group by with aggregations on string column",
+        "sql": "SELECT bool_col, count(string_col), count(distinct(string_col)), count(*) FROM {tbl} GROUP BY bool_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "group by with aggregations on string column and order by",
+        "sql": "SELECT bool_col, count(string_col), count(distinct(string_col)), count(*) FROM {tbl} GROUP BY bool_col ORDER BY bool_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "group by on multiple columns with aggregation on double column",
+        "sql": "SELECT int_col, string_col, min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} GROUP BY int_col, string_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "group by multiple columns with aggregations on int column",
+        "sql": "SELECT double_col, string_col, bool_col, min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} GROUP BY double_col, string_col, bool_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "group by and order by on multiple columns with aggregation on double column",
+        "sql": "SELECT int_col, string_col, min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} GROUP BY int_col, string_col ORDER BY int_col, string_col"
+      }
+    ]
+  },
+  "general_group_by_aggregate_with_filter": {
+    "tables": {
+      "tbl": {
+        "schema": [
+          {"name": "int_col", "type": "INT"},
+          {"name": "double_col", "type": "DOUBLE"},
+          {"name": "string_col", "type": "STRING"},
+          {"name": "bool_col", "type": "BOOLEAN"}
+        ],
+        "inputs": [
+          [2, 300, "a", true],
+          [2, 400, "a", true],
+          [3, 100, "b", false],
+          [100, 1, "b", false],
+          [101, 1.01, "c", false],
+          [150, 1.5, "c", false],
+          [175, 1.75, "c", true]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on double column with filters",
+        "sql": "SELECT string_col, count(*) FROM {tbl} WHERE string_col='b' GROUP BY string_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on double column with filters",
+        "sql": "SELECT int_col, min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} WHERE string_col='b' GROUP BY int_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on double column with filters",
+        "sql": "SELECT int_col, string_col, min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} WHERE int_col > 100 GROUP BY int_col, string_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on double column with filters",
+        "sql": "SELECT int_col, string_col, bool_col, min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75 GROUP BY int_col, string_col, bool_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on int column with filters",
+        "sql": "SELECT bool_col, min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE string_col='b' GROUP BY bool_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on int column with filters",
+        "sql": "SELECT bool_col, double_col, min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100 GROUP BY bool_col, double_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on int column with filters",
+        "sql": "SELECT bool_col, double_col, string_col, min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75 GROUP BY bool_col, double_col, string_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on bool column with filters",
+        "sql": "SELECT string_col, bool_and(bool_col), bool_or(bool_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE string_col='b' GROUP BY string_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on bool column with filters",
+        "sql": "SELECT string_col, double_col, bool_and(bool_col), bool_or(bool_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100 GROUP BY string_col, double_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on bool column with filters",
+        "sql": "SELECT string_col, double_col, int_col, bool_and(bool_col), bool_or(bool_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75GROUP BY string_col, double_col, int_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on string column with filters",
+        "sql": "SELECT double_col, count(string_col), count(distinct(string_col)), count(*) FROM {tbl} WHERE string_col='b' GROUP BY double_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on string column with filters",
+        "sql": "SELECT double_col, int_col, count(string_col), count(distinct(string_col)), count(*) FROM {tbl} WHERE int_col > 100 GROUP BY double_col, int_col"
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregations on string column with filters",
+        "sql": "SELECT double_col, int_col, bool_col, count(string_col), count(distinct(string_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75 GROUP BY double_col, int_col, bool_col"
+      }
+    ]
+  },
+  "aggregations_with_transform": {
+    "tables": {
+      "tbl": {
+        "schema": [
+          {"name": "int_col", "type": "INT"},
+          {"name": "double_col", "type": "DOUBLE"},
+          {"name": "string_col", "type": "STRING"},
+          {"name": "bool_col", "type": "BOOLEAN"}
+        ],
+        "inputs": [
+          [2, 300, "a", true],
+          [2, 400, "a", true],
+          [3, 100, "b", false],
+          [100, 1, "b", false],
+          [101, 1.01, "c", false],
+          [150, 1.5, "c", false],
+          [175, 1.75, "c", true]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "psql": "4.2.7",
+        "description": "transform in groupby column with filter",
+        "sql": "SELECT CONCAT(string_col, bool_col, '-') as c1, min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} WHERE string_col='b' GROUP BY c1",
+        "outputs": [
+          ["b-false", 1.0, 100.0, 50.5, 101.0, 2, 2, 2]
+        ]
+      },
+      {
+        "psql": "4.2.7",
+        "description": "aggregation with transform column with filter",
+        "sql": "SELECT count(CONCAT(string_col, bool_col, '-')), count(distinct(CONCAT(string_col, bool_col, '-'))), count(*) FROM {tbl}",
+        "outputs": [
+          [7, 4, 7]
+        ]
+      },
+      {
+        "psql": "4.2.7",
+        "description": "group by and aggregation with transform column with filter",
+        "sql": "SELECT int_col, count(CONCAT(string_col, bool_col, '-')), count(distinct(CONCAT(string_col, bool_col, '-'))), count(*) FROM {tbl} GROUP BY int_col",
+        "outputs": [
+          [2, 2, 1, 2],
+          [3, 1, 1, 1],
+          [100, 1, 1, 1],
+          [101, 1, 1, 1],
+          [150, 1, 1, 1],
+          [175, 1, 1, 1]
+        ]
+      }
+    ]
+  },
+  "aggregation_with_complex_sql": {
+    "tables": {
+      "tbl1": {
+        "schema": [
+          {"name": "int_col", "type": "INT"},
+          {"name": "double_col", "type": "DOUBLE"},
+          {"name": "string_col", "type": "STRING"},
+          {"name": "bool_col", "type": "BOOLEAN"}
+        ],
+        "inputs": [
+          [2, 300, "a", true],
+          [2, 400, "a", true],
+          [3, 100, "b", false],
+          [100, 1, "b", false],
+          [101, 1.01, "c", false],
+          [150, 1.5, "c", false],
+          [175, 1.75, "c", true]
+        ]
+      },
+      "tbl2": {
+        "schema":[
+          {"name": "int_col2", "type": "INT"},
+          {"name": "string_col2", "type": "STRING"},
+          {"name":  "double_col2", "type":  "DOUBLE"}
+        ],
+        "inputs": [
+          [1, "apple", 1000.0],
+          [2, "a", 1.323],
+          [3, "b", 1212.12],
+          [3, "c", 341],
+          [4, "orange", 1212.121]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "psql": "9.21.0",
+        "ignored": true,
+        "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code but is fixed with this patch",
+        "description": "JOIN with simple aggregations on int and double columns",
+        "sql": "SELECT min(double_col), min(int_col), min(double_col2), min(int_col2), max(double_col), max(int_col), max(double_col2), max(int_col2), avg(int_col), avg(double_col), avg(double_col2), avg(int_col2), sum(int_col), sum(double_col), sum(double_col2), sum(int_col2), count(*), count(distinct int_col), count(distinct double_col), count(distinct int_col2), count(distinct double_col2) from {tbl1} JOIN {tbl2} ON string_col = string_col2"
+      },
+      {
+        "psql": "9.21.0",
+        "ignored": true,
+        "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code but is fixed with this patch",
+        "description": "JOIN with simple aggregations on boolean columns",
+        "sql": "SELECT count(bool_col), count(distinct bool_col), bool_and(bool_col), bool_or(bool_col) from {tbl1} JOIN {tbl2} ON string_col = string_col2"
+      },
+      {
+        "psql": "9.21.0",
+        "description": "JOIN with simple aggregations on string columns",
+        "sql": "SELECT count(string_col), count(string_col2), count(distinct string_col), count(distinct string_col2) from {tbl1} JOIN {tbl2} ON int_col = int_col2"
+      },
+      {
+        "psql": "9.21.0",
+        "ignored": true,
+        "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code but is fixed with this patch",
+        "description": "JOIN with simple aggregations on int and double columns and group by on bool_col",
+        "sql": "SELECT bool_col, min(double_col), min(int_col), min(double_col2), min(int_col2), max(double_col), max(int_col), max(double_col2), max(int_col2), avg(int_col), avg(double_col), avg(double_col2), avg(int_col2), sum(int_col), sum(double_col), sum(double_col2), sum(int_col2), count(*), count(distinct int_col), count(distinct double_col), count(distinct int_col2), count(distinct double_col2) from {tbl1} JOIN {tbl2} ON string_col = string_col2 GROUP BY bool_col"
+      },
+      {
+        "psql": "9.21.0",
+        "ignored": true,
+        "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code but is fixed with this patch",
+        "description": "JOIN with aggregations on int and double columns and group by on bool_col and string_col",
+        "sql": "SELECT bool_col, string_col, min(double_col), min(int_col), min(double_col2), min(int_col2), max(double_col), max(int_col), max(double_col2), max(int_col2), avg(int_col), avg(double_col), avg(double_col2), avg(int_col2), sum(int_col), sum(double_col), sum(double_col2), sum(int_col2), count(*), count(distinct int_col), count(distinct double_col), count(distinct int_col2), count(distinct double_col2) from {tbl1} JOIN {tbl2} ON string_col = string_col2 GROUP BY bool_col, str [...]
+      },
+      {
+        "psql": "9.21.0",
+        "description": "JOIN with  aggregations on boolean columns and group by on string",
+        "sql": "SELECT string_col, count(bool_col), count(distinct bool_col), bool_and(bool_col), bool_or(bool_col) from {tbl1} JOIN {tbl2} ON string_col = string_col2 GROUP BY string_col"
+      },
+      {
+        "psql": "9.21.0",
+        "ignored": true,
+        "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code but is fixed with this patch",
+        "description": "JOIN with  aggregations on boolean columns and group by on string and int",
+        "sql": "SELECT string_col, int_col, count(bool_col), count(distinct bool_col), bool_and(bool_col), bool_or(bool_col) from {tbl1} JOIN {tbl2} ON string_col = string_col2 GROUP BY string_col, int_col"
+      },
+      {
+        "psql": "9.21.0",
+        "description": "JOIN with  aggregations on string columns and group by on double",
+        "sql": "SELECT double_col, count(string_col), count(string_col2), count(distinct string_col), count(distinct string_col2) from {tbl1} JOIN {tbl2} ON int_col = int_col2 GROUP BY double_col"
+      },
+      {
+        "psql": "9.21.0",
+        "description": "JOIN with  aggregations on string columns and group by on double, bool and int",
+        "sql": "SELECT double_col, bool_col, int_col, count(string_col), count(string_col2), count(distinct string_col), count(distinct string_col2) from {tbl1} JOIN {tbl2} ON int_col = int_col2 GROUP BY double_col, bool_col, int_col"
+      },
+      {
+        "psql": "9.21.0",
+        "description": "Aggregation on semi join with multiple not in",
+        "sql": "SELECT bool_col, int_col, count(string_col), count(distinct string_col), min(double_col), max(double_col), avg(double_col), count(double_col) from {tbl1} WHERE string_col = 'c' AND int_col NOT IN (SELECT int_col2 FROM {tbl2} WHERE string_col2='a') AND int_col NOT IN (SELECT int_col2 FROM {tbl2} WHERE string_col2='bar') AND int_col NOT IN (SELECT int_col2 FROM {tbl2} WHERE string_col2='orange') GROUP BY bool_col, int_col"
+      }
+    ]
+  },
+  "aggregations_not_supported_by_h2": {
+    "tables": {
+      "tbl": {
+        "schema": [
+          {"name": "int_col", "type": "INT"},
+          {"name": "double_col", "type": "DOUBLE"},
+          {"name": "string_col", "type": "STRING"},
+          {"name": "bool_col", "type": "BOOLEAN"}
+        ],
+        "inputs": [
+          [2, 300, "a", true],
+          [3, 3213, "a", true],
+          [11, 545.12, "a", true],
+          [12, 543.12, "a", true],
+          [13, 765.4, "a", true],
+          [13, 400, "a", true],
+          [3, 100, "b", false],
+          [100, 1, "b", false],
+          [30, 10120, "b", false],
+          [100, 23121, "b", false],
+          [3, 43, "b", false],
+          [101, 1.01, "c", false],
+          [150, 1.5, "c", false],
+          [175, 1.75, "c", true],
+          [152, 1.25, "c", false],
+          [177, 1.095, "c", true]
+        ]
+      },
+      "tbl2": {
+        "schema":[
+          {"name": "int_col2", "type": "INT"},
+          {"name": "string_col2", "type": "STRING"},
+          {"name":  "double_col2", "type":  "DOUBLE"}
+        ],
+        "inputs": [
+          [1, "apple", 1000.0],
+          [2, "a", 1.323],
+          [3, "b", 1212.12],
+          [4, "orange", 1212.121]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "psql": "9.21.0",
+        "description": "distinct count on all columns",
+        "ignored": true,
+        "comments": "TODO(Sonam): Remove ignore check. Current code doesn't support distinctcount on numeric operands",
+        "sql": "SELECT distinctcount(int_col), distinctcount(double_col), distinctcount(bool_col), distinctcount(string_col) FROM {tbl}",
+        "outputs": [
+          [12, 16, 2, 3]
+        ]
+      },
+      {
+        "psql": "9.21.0",
+        "description": "distinct count with filter",
+        "ignored": true,
+        "comments": "TODO(Sonam): Remove ignore check. Current code doesn't support distinctcount on numeric operands",
+        "sql": "SELECT distinctcount(int_col), distinctcount(double_col), distinctcount(bool_col), distinctcount(string_col) FROM {tbl} where string_col='c'",
+        "outputs": [
+          [5, 5, 2, 1]
+        ]
+      },
+      {
+        "psql": "9.21.0",
+        "description": "distintcount with group by",
+        "ignored": true,
+        "comments": "TODO(Sonam): Remove ignore check. Current code doesn't support distinctcount on numeric operands",
+        "sql": "SELECT string_col, distinctcount(int_col), distinctcount(double_col), distinctcount(bool_col) FROM {tbl} group by string_col",
+        "outputs": [
+          ["a", 5, 6, 1],
+          ["b", 3, 5, 1],
+          ["c", 5, 5, 2]
+        ]
+      },
+      {
+        "psql": "9.21.0",
+        "description": "distinctcount with multiple group by columns",
+        "ignored": true,
+        "comments": "TODO(Sonam): Remove ignore check. Current code doesn't support distinctcount on numeric operands",
+        "sql": "SELECT string_col, bool_col, distinctcount(int_col), distinctcount(double_col) FROM {tbl} group by string_col, bool_col",
+        "outputs": [
+          ["a", true, 5, 6],
+          ["b", false, 3, 5],
+          ["c", true, 2, 2],
+          ["c", false, 3, 3]
+        ]
+      },
+      {
+        "psql": "9.21.0",
+        "description": "JOIN with distinct count on columns",
+        "ignored": true,
+        "comments": "TODO(Sonam): Remove ignore check. Current code doesn't support distinctcount on numeric operands",
+        "sql": "SELECT distinctcount(double_col), distinctcount(int_col), distinctcount(double_col2), distinctcount(int_col2), distinctcount(bool_col), distinctcount(string_col), distinctcount(string_col2) from {tbl} JOIN {tbl2} ON string_col = string_col2",
+        "outputs": [
+          [11, 7, 2, 2, 2, 2, 2]
+        ]
+      },
+      {
+        "psql": "9.21.0",
+        "description": "JOIN with distinct count and group by on columns",
+        "ignored": true,
+        "comments": "TODO(Sonam): Remove ignore check. Current code doesn't support distinctcount on numeric operands",
+        "sql": "SELECT string_col, distinctcount(double_col), distinctcount(int_col), distinctcount(double_col2), distinctcount(int_col2), distinctcount(bool_col) from {tbl} JOIN {tbl2} ON string_col = string_col2 GROUP BY string_col",
+        "outputs": [
+          ["a", 6, 5, 1, 1, 1],
+          ["b", 5, 3, 1, 1, 1]
+        ]
       },
       {
-        "psql": "4.2.7",
-        "description": "count int",
-        "sql": "SELECT count(int_col) FROM {tbl}"
+        "psql": "9.21.0",
+        "description": "skewness and kurtosis and int and double cols",
+        "sql": "SELECT skewness(double_col), skewness(int_col), kurtosis(double_col), kurtosis(int_col) FROM {tbl}",
+        "outputs": [
+          [3.1103657693253997, 0.5794666900768517, 9.965595397275615, -1.4550234124062964]
+        ]
       },
       {
-        "psql": "4.2.7",
-        "description": "count double",
-        "sql": "SELECT count(double_col) FROM {tbl}"
+        "psql": "9.21.0",
+        "description": "skewness and kurtosis and int and double cols with group by",
+        "sql": "SELECT string_col, skewness(double_col), skewness(int_col), kurtosis(double_col), kurtosis(int_col) FROM {tbl} group by string_col",
+        "outputs": [
+          ["a", 2.3416450161609847, -0.8960998790160103, 5.582974376555998, -1.791124260355027],
+          ["b", 1.4293497662867718, 0.40194821599933056, 1.220463765626068, -3.1341049510321075],
+          ["c", 0.6353100324008337, -1.3564793650180287, -1.1100427274943443, 1.989350375759732]
+        ]
       },
       {
-        "psql": "4.2.7",
-        "description": "count, sum group by order by",
-        "sql": "select string_col, count(int_col), sum(double_col) from {tbl} group by string_col order by string_col;"
+        "psql": "9.21.0",
+        "description": "JOIN with skewness on columns",
+        "sql": "SELECT skewness(double_col), skewness(int_col), kurtosis(double_col2), kurtosis(int_col2) from {tbl} JOIN {tbl2} ON string_col = string_col2",
+        "outputs": [
+          [2.5196193851884536, 1.7344829395977062, -2.444444444444445, -2.444444444444444]
+        ]
       },
       {
-        "psql": "4.2.7",
-        "description": "min, max",
-        "sql": "SELECT min(int_col), max(int_col) FROM {tbl}"
-      },
+        "psql": "9.21.0",
+        "description": "JOIN with distinct count and group by on columns",
+        "sql": "SELECT string_col, skewness(double_col), skewness(int_col), kurtosis(double_col2), kurtosis(int_col2) from {tbl} JOIN {tbl2} ON string_col = string_col2 GROUP BY string_col",
+        "outputs": [
+          ["a", 2.3416450161609847, -0.8960998790160102, 0.0, 0.0],
+          ["b", 1.4293497662867716, 0.4019482159993307, 0.0, 0.0]
+        ]
+      }
+    ]
+  },
+  "unsupported_aggregates": {
+    "tables": {
+      "tbl": {
+        "schema": [
+          {"name": "int_col", "type": "INT"},
+          {"name": "double_col", "type": "DOUBLE"},
+          {"name": "string_col", "type": "STRING"},
+          {"name": "bool_col", "type": "BOOLEAN"}
+        ],
+        "inputs": [
+          [2, 300, "a", true],
+          [2, 400, "a", true],
+          [3, 100, "b", false],
+          [100, 1, "b", false],
+          [101, 1.01, "c", false],
+          [150, 1.5, "c", false],
+          [175, 1.75, "c", true]
+        ]
+      }
+    },
+    "queries": [
       {
         "psql": "4.2.7",
         "comments": "plan error:Unsupported SQL aggregation kind: {}. Only splittable aggregation functions are supported! [SINGLE_VALUE]",
@@ -111,16 +695,6 @@
         "description": "aggregate boolean column",
         "sql": "SELECT min(bool_col) FROM {tbl}"
       },
-      {
-        "psql": "9.21.0",
-        "description": "aggregate boolean column",
-        "sql": "SELECT bool_and(bool_col), bool_or(bool_col) FROM {tbl} GROUP BY string_col"
-      },
-      {
-        "psql": "9.21.0",
-        "description": "aggregate boolean column no group by",
-        "sql": "SELECT bool_and(bool_col), bool_or(bool_col) FROM {tbl}"
-      },
       {
         "ignored": true,
         "comment": "issue with converting data types:  Unexpected RelDataTypeField: ANY for column: EXPR$0",
@@ -135,36 +709,29 @@
         "sql": "SELECT sum(pow(int_col, 2)) FROM {tbl}"
       },
       {
+        "psql": "4.2.7",
+        "description": "aggregate boolean column and filter by string column",
         "ignored": true,
-        "comment": "sum empty returns [0] instead of [null] at the moment",
-        "description": "sum empty input after filter",
-        "sql": "SELECT sum(int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
-      },
-      {
-        "description": "count empty input after filter",
-        "sql": "SELECT count(*) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
-      },
-      {
-        "description": "count empty input after filter",
-        "sql": "SELECT count(int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+        "comments": "Cannot apply 'SUM' to arguments of type 'SUM(<BOOLEAN>)",
+        "sql": "SELECT sum(bool_col) FROM {tbl} WHERE string_col > 'a'"
       },
       {
+        "psql": "4.2.7",
+        "comments": "parse error:Encountered HAVING",
         "ignored": true,
-        "comment": "sum empty returns [0] instead of [null] at the moment",
-        "description": "sum empty input after filter with subquery",
-        "sql": "SELECT sum(int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
-      },
-      {
-        "description": "count empty input after filter with sub-query",
-        "sql": "SELECT count(*) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+        "description": "aggregate query with having clause",
+        "sql": "select string_col, count(int_col) from {tbl} a group by string_col order by string_col having exists(select 1 from {tbl} b where count(int_col) = b.int_col);"
       },
       {
-        "description": "count empty input after filter with sub-query",
-        "sql": "SELECT count(int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+        "psql": "4.2.7",
+        "comments": "parse error:Encountered HAVING",
+        "description": "aggregate query with having clause",
+        "ignored": true,
+        "sql": "select string_col, count(int_col) from {tbl} a group by string_col order by string_col having (select count(*) from {tbl} b where count(int_col) = b.int_col) > 0;"
       }
     ]
   },
-  "aggregate_filter": {
+  "empty_return_type_test_h2": {
     "tables": {
       "tbl": {
         "schema": [
@@ -174,109 +741,142 @@
           {"name": "bool_col", "type": "BOOLEAN"}
         ],
         "inputs": [
-          [2, 300, "a", false],
+          [2, 300, "a", true],
           [2, 400, "a", true],
-          [3, 100, "b", true],
-          [0.001, 1, "b", false],
+          [3, 100, "b", false],
+          [100, 1, "b", false],
           [101, 1.01, "c", false],
-          [150, 1.5, "c", true],
-          [175, 1.75, "c", true],
-          [-10000, 1.75, "c", false],
-          [-2, 0.5, "c", false]
+          [150, 1.5, "c", false],
+          [175, 1.75, "c", true]
         ]
       }
     },
     "queries": [
       {
-        "psql": "4.2.7",
-        "description": "aggregate int column and filter by int column",
-        "sql": "SELECT min(int_col) FROM {tbl} WHERE int_col < 100"
+        "ignored": true,
+        "comment": "sum empty returns [0] instead of [null] at the moment when leaf aggregation is performed by v1 engine.",
+        "description": "Return empty for sum",
+        "sql": "SELECT sum(int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
       },
       {
-        "psql": "4.2.7",
-        "description": "aggregate int column and filter by int column",
-        "sql": "SELECT sum(1 / int_col) FROM {tbl} WHERE int_col > 0",
-        "h2Sql": "SELECT sum(1.0 / int_col) FROM {tbl} WHERE int_col > 0"
+        "description": "Return empty for sum with subquery",
+        "sql": "SELECT sum(int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
       },
       {
-        "psql": "4.2.7",
-        "description": "aggregate double column and filter by double column",
-        "sql": "SELECT sum(1 / double_col) FROM {tbl} WHERE double_col < 1"
+        "ignored": true,
+        "comment": "min empty returns [infinity] instead of [null] at the moment when leaf aggregation is performed by v1 engine.",
+        "description": "Return empty for min",
+        "sql": "SELECT min(int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
       },
       {
-        "psql": "4.2.7",
-        "description": "aggregate double column and filter by string column",
-        "sql": "SELECT sum(double_col) FROM {tbl} WHERE string_col > 'a'"
+        "description": "Return empty for min with subquery",
+        "sql": "SELECT min(int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
       },
       {
-        "psql": "4.2.7",
-        "description": "aggregate boolean column and filter by string column",
         "ignored": true,
-        "comments": "Cannot apply 'SUM' to arguments of type 'SUM(<BOOLEAN>)",
-        "sql": "SELECT sum(bool_col) FROM {tbl} WHERE string_col > 'a'"
+        "comment": "max empty returns [-infinity] instead of [null] at the moment when leaf aggregation is performed by v1 engine.",
+        "description": "Return empty for max",
+        "sql": "SELECT max(int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+      },
+      {
+        "description": "Return empty for min with subquery",
+        "sql": "SELECT max(int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+      },
+      {
+        "comment": "avg empty returns [NaN] instead of [null] at the moment when leaf aggregation is performed by v1 engine.",
+        "description": "Return empty for avg",
+        "sql": "SELECT avg(int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+      },
+      {
+        "description": "Return empty for avg with subquery",
+        "sql": "SELECT avg(int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+      },
+      {
+        "description": "Return empty for count",
+        "sql": "SELECT count(int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+      },
+      {
+        "description": "Return empty for count with subquery",
+        "sql": "SELECT count(int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+      },
+      {
+        "description": "Return empty for count(*)",
+        "sql": "SELECT count(*) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+      },
+      {
+        "description": "Return empty for count * with subquery",
+        "sql": "SELECT count(*) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
       },
       {
-        "psql": "4.2.7",
-        "comments": "parse error:Encountered HAVING",
         "ignored": true,
-        "description": "aggregate query with having clause",
-        "sql": "select string_col, count(int_col) from {tbl} a group by string_col order by string_col having exists(select 1 from {tbl} b where count(int_col) = b.int_col);"
+        "comment": "bool_and empty returns [true] instead of [null] at the moment when leaf aggregation is performed by v1 engine.",
+        "description": "Return empty for bool_and",
+        "sql": "SELECT bool_and(bool_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
       },
       {
-        "psql": "4.2.7",
-        "comments": "parse error:Encountered HAVING",
-        "description": "aggregate query with having clause",
         "ignored": true,
-        "sql": "select string_col, count(int_col) from {tbl} a group by string_col order by string_col having (select count(*) from {tbl} b where count(int_col) = b.int_col) > 0;"
-      }
-    ]
-  },
-  "additional_aggregate": {
-    "tables": {
-      "tbl": {
-        "schema": [
-          {"name": "int_col", "type": "INT"},
-          {"name": "double_col", "type": "DOUBLE"},
-          {"name": "string_col", "type": "STRING"},
-          {"name": "bool_col", "type": "BOOLEAN"}
-        ],
-        "inputs": [
-          [2, 300, "a", false],
-          [2, 400, "a", true],
-          [3, 100, "b", true],
-          [0.001, 1, "b", false],
-          [101, 1.01, "c", false],
-          [150, 1.5, "c", true],
-          [175, 1.75, "c", true],
-          [-10000, 1.75, "c", false],
-          [-2, 0.5, "c", false]
-        ]
+        "comment": "bool_and empty returns [0 which gets converted to false] instead of [null] when leaf aggregation is performed by Intermediate Stage",
+        "description": "Return empty for bool_and with subquery",
+        "sql": "SELECT bool_and(bool_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+      },
+      {
+        "ignored": true,
+        "comment": "bool_or empty returns [false] instead of [null] at the moment when leaf aggregation is performed by v1 engine.",
+        "description": "Return empty for bool_or",
+        "sql": "SELECT bool_or(bool_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+      },
+      {
+        "ignored": true,
+        "comment": "bool_or empty returns [false] instead of [null] at the moment when leaf aggregation is performed by Intermediate Stage",
+        "description": "Return empty for bool_or with subquery",
+        "sql": "SELECT bool_or(bool_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+      },
+      {
+        "description": "Return empty for count distinct",
+        "sql": "SELECT count(distinct int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+      },
+      {
+        "ignored": true,
+        "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code because H2 expects 0. But we return null",
+        "description": "Return empty for count distinct with subquery",
+        "sql": "SELECT count(distinct int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+      },
+      {
+        "ignored": true,
+        "comment": "DistinctCount agg function doesn't support null handling",
+        "description": "Return empty for distinctcount",
+        "sql": "SELECT distinctcount(int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+      },
+      {
+        "ignored": true,
+        "comment": "DistinctCount agg function doesn't support null handling",
+        "description": "Return empty for distinctcount with subquery",
+        "sql": "SELECT distinctcount(int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+      },
+      {
+        "ignored": true,
+        "comment": "FourthMoment agg function doesn't support null handling",
+        "description": "Return empty for skewness",
+        "sql": "SELECT skewness(double_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+      },
+      {
+        "ignored": true,
+        "comment": "FourthMoment agg function doesn't support null handling",
+        "description": "Return empty for skewness with subquery",
+        "sql": "SELECT skewness(double_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+      },
+      {
+        "ignored": true,
+        "comment": "FourthMoment agg function doesn't support null handling",
+        "description": "Return empty for kurtosis",
+        "sql": "SELECT kurtosis(double_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+      },
+      {
+        "ignored": true,
+        "comment": "FourthMoment agg function doesn't support null handling",
+        "description": "Return empty for kurtosis with subquery",
+        "sql": "SELECT kurtosis(double_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
       }
-    },
-    "queries": [
-      { "sql": "SELECT min(int_col), min(int_col) FROM {tbl} WHERE int_col < 100" },
-      { "sql": "SELECT min(int_col), count(int_col), count(*) FROM {tbl}" },
-      { "sql": "SELECT string_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col" },
-      { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col, bool_col" },
-      { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col" },
-      { "sql": "SELECT string_col, string_col AS alias, count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col" },
-      { "sql": "SELECT string_col, bool_col, bool_col AS alias, sum(int_col), count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col" },
-      { "sql": "SELECT sum(int_col), count(int_col) FROM {tbl} GROUP BY string_col" },
-      { "sql": "SELECT string_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col ORDER BY string_col" },
-      { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col, bool_col ORDER BY string_col" },
-      { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col, bool_col ORDER BY bool_col, string_col" },
-      { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col ORDER BY bool_col, string_col" },
-      { "sql": "SELECT string_col, string_col AS alias, count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col ORDER BY string_col" },
-      { "sql": "SELECT string_col, string_col AS alias, count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col ORDER BY bool_col, alias"},
-      { "sql": "SELECT string_col, bool_col, bool_col AS alias, sum(int_col), count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col ORDER BY alias" },
-      { "sql": "SELECT sum(int_col), count(int_col) FROM {tbl} GROUP BY string_col ORDER BY string_col" },
-      { "sql": "SELECT CASE WHEN bool_col THEN int_col ELSE int_col - 5 END, count(int_col) FROM {tbl} GROUP BY CASE WHEN bool_col THEN int_col ELSE int_col - 5 END ORDER BY CASE WHEN bool_col THEN int_col ELSE int_col - 5 END" },
-      { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) ORDER BY upper(string_col)" },
-      { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) ORDER BY count(int_col)" },
-      { "sql": "SELECT * FROM (SELECT string_col, bool_col, min(int_col) AS m, count(int_col), count(*) AS c FROM {tbl} GROUP BY string_col, bool_col ORDER BY string_col) WHERE c < m" },
-      { "sql": "SELECT * FROM (SELECT string_col, bool_col, min(int_col) AS m, count(int_col), count(*) AS c FROM {tbl} GROUP BY string_col, bool_col ORDER BY bool_col, string_col) WHERE c < m" },
-      { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) HAVING sum(int_col) > 0 ORDER BY upper(string_col)" },
-      { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) HAVING sum(int_col) >= 0 AND count(int_col) >= 0 ORDER BY count(int_col)" }
     ]
   },
   "aggregate_with_hints": {
@@ -344,5 +944,53 @@
         "sql": "SELECT /*+ skipLeafStageGroupByAggregation */ double_col, sum(int_col) FROM {tbl} WHERE int_col > 3 AND double_col > 1.0 GROUP BY double_col"
       }
     ]
+  },
+  "additional_aggregate": {
+    "tables": {
+      "tbl": {
+        "schema": [
+          {"name": "int_col", "type": "INT"},
+          {"name": "double_col", "type": "DOUBLE"},
+          {"name": "string_col", "type": "STRING"},
+          {"name": "bool_col", "type": "BOOLEAN"}
+        ],
+        "inputs": [
+          [2, 300, "a", false],
+          [2, 400, "a", true],
+          [3, 100, "b", true],
+          [0.001, 1, "b", false],
+          [101, 1.01, "c", false],
+          [150, 1.5, "c", true],
+          [175, 1.75, "c", true],
+          [-10000, 1.75, "c", false],
+          [-2, 0.5, "c", false]
+        ]
+      }
+    },
+    "queries": [
+      { "sql": "SELECT min(int_col), min(int_col) FROM {tbl} WHERE int_col < 100" },
+      { "sql": "SELECT min(int_col), count(int_col), count(*) FROM {tbl}" },
+      { "sql": "SELECT string_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col" },
+      { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col, bool_col" },
+      { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col" },
+      { "sql": "SELECT string_col, string_col AS alias, count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col" },
+      { "sql": "SELECT string_col, bool_col, bool_col AS alias, sum(int_col), count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col" },
+      { "sql": "SELECT sum(int_col), count(int_col) FROM {tbl} GROUP BY string_col" },
+      { "sql": "SELECT string_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col ORDER BY string_col" },
+      { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col, bool_col ORDER BY string_col" },
+      { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col, bool_col ORDER BY bool_col, string_col" },
+      { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col ORDER BY bool_col, string_col" },
+      { "sql": "SELECT string_col, string_col AS alias, count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col ORDER BY string_col" },
+      { "sql": "SELECT string_col, string_col AS alias, count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col ORDER BY bool_col, alias"},
+      { "sql": "SELECT string_col, bool_col, bool_col AS alias, sum(int_col), count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col ORDER BY alias" },
+      { "sql": "SELECT sum(int_col), count(int_col) FROM {tbl} GROUP BY string_col ORDER BY string_col" },
+      { "sql": "SELECT CASE WHEN bool_col THEN int_col ELSE int_col - 5 END, count(int_col) FROM {tbl} GROUP BY CASE WHEN bool_col THEN int_col ELSE int_col - 5 END ORDER BY CASE WHEN bool_col THEN int_col ELSE int_col - 5 END" },
+      { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) ORDER BY upper(string_col)" },
+      { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) ORDER BY count(int_col)" },
+      { "sql": "SELECT * FROM (SELECT string_col, bool_col, min(int_col) AS m, count(int_col), count(*) AS c FROM {tbl} GROUP BY string_col, bool_col ORDER BY string_col) WHERE c < m" },
+      { "sql": "SELECT * FROM (SELECT string_col, bool_col, min(int_col) AS m, count(int_col), count(*) AS c FROM {tbl} GROUP BY string_col, bool_col ORDER BY bool_col, string_col) WHERE c < m" },
+      { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) HAVING sum(int_col) > 0 ORDER BY upper(string_col)" },
+      { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) HAVING sum(int_col) >= 0 AND count(int_col) >= 0 ORDER BY count(int_col)" }
+    ]
   }
 }
diff --git a/pinot-query-runtime/src/test/resources/queries/NullHandling.json b/pinot-query-runtime/src/test/resources/queries/NullHandling.json
index 36fad242e6..3c5a6a7add 100644
--- a/pinot-query-runtime/src/test/resources/queries/NullHandling.json
+++ b/pinot-query-runtime/src/test/resources/queries/NullHandling.json
@@ -17,11 +17,12 @@
           {"name": "strCol1", "type": "STRING"},
           {"name": "strCol2", "type": "STRING"},
           {"name": "intCol1", "type": "INT"},
-          {"name": "doubleCol1", "type": "DOUBLE"}
+          {"name": "doubleCol1", "type": "DOUBLE"},
+          {"name": "boolCol1", "type":  "BOOLEAN"}
         ],
         "inputs": [
-          ["foo", "bob", 3, 3.1416],
-          ["alice", "alice", 4, 2.7183]
+          ["foo", "bob", 3, 3.1416, true],
+          ["alice", "alice", 4, 2.7183, false]
         ]
       }
     },
@@ -36,7 +37,7 @@
       },
       {
         "description": "LEFT JOIN and AGGREGATE",
-        "sql": "SELECT COUNT({tbl2}.intCol1), MIN({tbl2}.intCol1), MAX({tbl2}.doubleCol1), SUM({tbl2}.doubleCol1) FROM {tbl1} LEFT OUTER JOIN {tbl2} ON {tbl1}.strCol1 = {tbl2}.strCol1"
+        "sql": "SELECT COUNT({tbl2}.intCol1), MIN({tbl2}.intCol1), MAX({tbl2}.doubleCol1), SUM({tbl2}.doubleCol1), AVG({tbl2}.doubleCol1), BOOL_AND({tbl2}.boolCol1), BOOL_OR({tbl2}.boolCol1) FROM {tbl1} LEFT OUTER JOIN {tbl2} ON {tbl1}.strCol1 = {tbl2}.strCol1"
       },
       {
         "description": "LEFT JOIN and GROUP BY",
@@ -44,7 +45,7 @@
       },
       {
         "description": "LEFT JOIN and GROUP BY with AGGREGATE",
-        "sql": "SELECT {tbl1}.strCol2, COUNT({tbl2}.intCol1), MIN({tbl2}.intCol1), MAX({tbl2}.doubleCol1), SUM({tbl2}.doubleCol1) FROM {tbl1} LEFT OUTER JOIN {tbl2} ON {tbl1}.strCol1 = {tbl2}.strCol1 GROUP BY {tbl1}.strCol2"
+        "sql": "SELECT {tbl1}.strCol2, COUNT({tbl2}.intCol1), MIN({tbl2}.intCol1), MAX({tbl2}.doubleCol1), SUM({tbl2}.doubleCol1), AVG({tbl2}.doubleCol1) FROM {tbl1} LEFT OUTER JOIN {tbl2} ON {tbl1}.strCol1 = {tbl2}.strCol1 GROUP BY {tbl1}.strCol2"
       },
       {
         "description": "LEFT JOIN and SORT (by default, H2 treats null as the smallest value, which is different from Postgres, thus we don't test the default ordering)",
diff --git a/pinot-query-runtime/src/test/resources/queries/WithStatements.json b/pinot-query-runtime/src/test/resources/queries/WithStatements.json
index 1b74c31bc9..401bf39e0b 100644
--- a/pinot-query-runtime/src/test/resources/queries/WithStatements.json
+++ b/pinot-query-runtime/src/test/resources/queries/WithStatements.json
@@ -18,13 +18,17 @@
         "schema": [
           {"name": "strCol1", "type": "STRING"},
           {"name": "strCol2", "type": "STRING"},
-          {"name": "intCol", "type": "INT"}
+          {"name": "intCol", "type": "INT"},
+          {"name": "doubleCol", "type": "DOUBLE"},
+          {"name": "floatCol", "type": "FLOAT"},
+          {"name": "longCol", "type": "LONG"},
+          {"name": "boolCol", "type": "BOOLEAN"}
         ],
         "inputs": [
-          ["a", "foo", 1],
-          ["a", "bar", 2],
-          ["b", "alice", 42],
-          ["b", "bob", 196883]
+          ["a", "foo", 1, 100.0, 12.0, 10000, true],
+          ["a", "bar", 2, 175.12, 22.22, 300000, false],
+          ["b", "alice", 42, 175.12, 22.22, 300000, true],
+          ["b", "bob", 196883, 175.12, 22.22, 300000, true]
         ]
       }
     },
@@ -39,6 +43,16 @@
           ["b", "bob", 2]
         ]
       },
+      {
+        "description": "single 'with' with all aggregations and group by",
+        "sql": "WITH w AS (SELECT strCol, SUM(intCol) AS c1, AVG(intCol) AS c2, MIN(intCol) AS c3, MAX(intCol) AS c4, COUNT(*) as c5, COUNT(DISTINCT(intCol)) as c6  FROM {tbl1} GROUP BY strCol) SELECT strCol, strCol2, c1, c2, c3, c4, c5, c6 FROM w JOIN {tbl2} ON w.strCol = {tbl2}.strCol1",
+        "outputs": [
+          ["a", "foo", 4, 2.0, 1, 3, 2, 2],
+          ["a", "bar", 4, 2.0, 1, 3, 2, 2],
+          ["b", "alice", 2, 2.0, 2.0, 2.0, 1, 1],
+          ["b", "bob", 2, 2.0, 2.0, 2.0, 1, 1]
+        ]
+      },
       {
         "description": "multi 'with' table",
         "sql": "WITH agg1 AS ( SELECT strCol, sum(intCol) AS sumVal FROM {tbl1} GROUP BY strCol), agg2 AS (SELECT strCol1, avg(intCol) AS avgVal FROM {tbl2} GROUP BY strCol1) SELECT strCol, sumVal - avgVal FROM agg1, agg2 WHERE agg1.strCol = agg2.strCol1",
@@ -47,6 +61,54 @@
           ["b", -98460.5]
         ]
       },
+      {
+        "description": "multi 'with' table with all aggregations on int column",
+        "sql": "WITH agg1 AS ( SELECT strCol, sum(intCol) AS sumVal FROM {tbl1} GROUP BY strCol), agg2 AS (SELECT strCol1, SUM(intCol) AS c1, AVG(intCol) AS c2, MIN(intCol) AS c3, MAX(intCol) AS c4, COUNT(*) as c5, COUNT(DISTINCT(intCol)) AS c6 FROM {tbl2} GROUP BY strCol1) SELECT strCol, sumVal - c2, c1, c3, c4, c5, c6 FROM agg1, agg2 WHERE agg1.strCol = agg2.strCol1",
+        "outputs": [
+          ["a", 2.5, 3.0, 1, 2, 2, 2],
+          ["b", -98460.5, 196925.0, 42.0, 196883.0, 2, 2]
+        ]
+      },
+      {
+        "description": "multi 'with' table with all aggregations on double column",
+        "sql": "WITH agg1 AS ( SELECT strCol, sum(intCol) AS sumVal FROM {tbl1} GROUP BY strCol), agg2 AS (SELECT strCol1, SUM(doubleCol) AS c1, AVG(doubleCol) AS c2, MIN(doubleCol) AS c3, MAX(doubleCol) AS c4, COUNT(*) as c5, COUNT(DISTINCT(doubleCol)) AS c6 FROM {tbl2} GROUP BY strCol1) SELECT strCol, sumVal - c2, c1, c3, c4, c5, c6 FROM agg1, agg2 WHERE agg1.strCol = agg2.strCol1",
+        "outputs": [
+          ["a", -133.56, 275.12, 100.0, 175.12, 2, 2],
+          ["b", -173.12, 350.24, 175.12, 175.12, 2, 1]
+        ]
+      },
+      {
+        "description": "multi 'with' table with all aggregations on float column",
+        "sql": "WITH agg1 AS ( SELECT strCol, sum(intCol) AS sumVal FROM {tbl1} GROUP BY strCol), agg2 AS (SELECT strCol1, SUM(floatCol) AS c1, AVG(floatCol) AS c2, MIN(floatCol) AS c3, MAX(floatCol) AS c4, COUNT(*) as c5, COUNT(DISTINCT(floatCol)) AS c6 FROM {tbl2} GROUP BY strCol1) SELECT strCol, sumVal - c2, c1, c3, c4, c5, c6 FROM agg1, agg2 WHERE agg1.strCol = agg2.strCol1",
+        "outputs": [
+          ["a", -13.109999656677246, 34.21999931335449, 12.0, 22.219999313354492, 2, 2],
+          ["b", -20.219999313354492, 44.439998626708984, 22.219999313354492, 22.219999313354492, 2, 1]
+        ]
+      },
+      {
+        "description": "multi 'with' table with all aggregations on long column",
+        "sql": "WITH agg1 AS ( SELECT strCol, sum(intCol) AS sumVal FROM {tbl1} GROUP BY strCol), agg2 AS (SELECT strCol1, SUM(longCol) AS c1, AVG(longCol) AS c2, MIN(longCol) AS c3, MAX(longCol) AS c4, COUNT(*) as c5, COUNT(DISTINCT(longCol)) AS c6 FROM {tbl2} GROUP BY strCol1) SELECT strCol, sumVal - c2, c1, c3, c4, c5, c6 FROM agg1, agg2 WHERE agg1.strCol = agg2.strCol1",
+        "outputs": [
+          ["a", -154996.0, 310000.0, 10000.0, 300000.0, 2, 2],
+          ["b", -299998.0, 600000.0, 300000.0, 300000.0, 2, 1]
+        ]
+      },
+      {
+        "description": "multi 'with' table with all aggregations on string column",
+        "sql": "WITH agg1 AS ( SELECT strCol, sum(intCol) AS sumVal FROM {tbl1} GROUP BY strCol), agg2 AS (SELECT strCol1, COUNT(strCol2) as c1, COUNT(DISTINCT(strCol2)) AS c2 FROM {tbl2} GROUP BY strCol1) SELECT strCol, c1, c2 FROM agg1, agg2 WHERE agg1.strCol = agg2.strCol1",
+        "outputs": [
+          ["a", 2, 2],
+          ["b", 2, 2]
+        ]
+      },
+      {
+        "description": "multi 'with' table with all aggregations on bool column",
+        "sql": "WITH agg1 AS ( SELECT strCol, sum(intCol) AS sumVal FROM {tbl1} GROUP BY strCol), agg2 AS (SELECT strCol1, bool_and(boolCol) AS c1, bool_or(boolCol) as c2, COUNT(boolCol) as c3, COUNT(DISTINCT(boolCol)) AS c4 FROM {tbl2} GROUP BY strCol1) SELECT strCol, c1, c2, c3, c4 FROM agg1, agg2 WHERE agg1.strCol = agg2.strCol1",
+        "outputs": [
+          ["a", false, true, 2, 2],
+          ["b", true, true, 2, 1]
+        ]
+      },
       {
         "description": "nested 'with' on agg table: (with a as ( ... ), select ... ",
         "sql": "WITH agg1 AS (SELECT strCol1, strCol2, sum(intCol) AS sumVal FROM {tbl2} GROUP BY strCol1, strCol2) SELECT strCol1, avg(sumVal) AS avgVal FROM agg1 GROUP BY strCol1",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org