You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/07/01 16:32:38 UTC
[pinot] branch master updated: [Multistage] Runtime changes for leveraging V1 Aggregation Functions (#10845)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a6ee2773a2 [Multistage] Runtime changes for leveraging V1 Aggregation Functions (#10845)
a6ee2773a2 is described below
commit a6ee2773a28b2339ac1a4fc80fba978ae69dbbde
Author: Vivek Iyer Vaidyanathan <vv...@gmail.com>
AuthorDate: Sat Jul 1 09:32:31 2023 -0700
[Multistage] Runtime changes for leveraging V1 Aggregation Functions (#10845)
* [Multistage] Runtime changes for supporting Aggregations
* Special handling for bool_and bool_or
---
.../pinot/common/datablock/DataBlockUtils.java | 292 +++++++
.../core/common/IntermediateStageBlockValSet.java | 155 ++++
.../function/AggregationFunctionFactory.java | 3 +
.../operator/MultistageAggregationExecutor.java | 223 +++++
.../operator/MultistageGroupByExecutor.java | 307 +++++++
.../runtime/operator/NewAggregateOperator.java | 295 +++++++
.../query/runtime/plan/PhysicalPlanVisitor.java | 13 +
.../runtime/operator/AggregateOperatorTest.java | 2 +-
.../src/test/resources/queries/Aggregates.json | 922 ++++++++++++++++++---
.../src/test/resources/queries/NullHandling.json | 11 +-
.../src/test/resources/queries/WithStatements.json | 72 +-
11 files changed, 2147 insertions(+), 148 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
index 6bf6a0e05f..99c3e4df46 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
@@ -19,6 +19,7 @@
package org.apache.pinot.common.datablock;
import java.io.IOException;
+import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.util.ArrayList;
@@ -268,4 +269,295 @@ public final class DataBlockUtils {
}
return row;
}
+
+ /**
+ * Given a datablock and the column index, extracts the integer values for the column. Prefer using this function over
+ * extractRowFromDatablock if the desired datatype is known to prevent autoboxing to Object and later unboxing to the
+ * desired type.
+ * This only works on ROW format.
+ * TODO: Add support for COLUMNAR format.
+ * @return int array of values in the column
+ */
+ public static int[] extractIntValuesForColumn(DataBlock dataBlock, int columnIndex) {
+ DataSchema dataSchema = dataBlock.getDataSchema();
+ DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+ // Get null bitmap for the column.
+ RoaringBitmap nullBitmap = extractNullBitmaps(dataBlock)[columnIndex];
+ int numRows = dataBlock.getNumberOfRows();
+
+ int[] rows = new int[numRows];
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ if (nullBitmap != null && nullBitmap.contains(rowId)) {
+ continue;
+ }
+
+ switch (columnDataTypes[columnIndex]) {
+ case INT:
+ case BOOLEAN:
+ rows[rowId] = dataBlock.getInt(rowId, columnIndex);
+ break;
+ case LONG:
+ rows[rowId] = (int) dataBlock.getLong(rowId, columnIndex);
+ break;
+ case FLOAT:
+ rows[rowId] = (int) dataBlock.getFloat(rowId, columnIndex);
+ break;
+ case DOUBLE:
+ rows[rowId] = (int) dataBlock.getDouble(rowId, columnIndex);
+ break;
+ case BIG_DECIMAL:
+ rows[rowId] = dataBlock.getBigDecimal(rowId, columnIndex).intValue();
+ break;
+ default:
+ throw new IllegalStateException(
+ String.format("Unsupported data type: %s for column: %s", columnDataTypes[columnIndex], columnIndex));
+ }
+ }
+ return rows;
+ }
+
+ /**
+ * Given a datablock and the column index, extracts the long values for the column. Prefer using this function over
+ * extractRowFromDatablock if the desired datatype is known to prevent autoboxing to Object and later unboxing to the
+ * desired type.
+ * This only works on ROW format.
+ * TODO: Add support for COLUMNAR format.
+ * @return long array of values in the column
+ */
+ public static long[] extractLongValuesForColumn(DataBlock dataBlock, int columnIndex) {
+ DataSchema dataSchema = dataBlock.getDataSchema();
+ DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+ // Get null bitmap for the column.
+ RoaringBitmap nullBitmap = extractNullBitmaps(dataBlock)[columnIndex];
+ int numRows = dataBlock.getNumberOfRows();
+
+ long[] rows = new long[numRows];
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ if (nullBitmap != null && nullBitmap.contains(rowId)) {
+ continue;
+ }
+
+ switch (columnDataTypes[columnIndex]) {
+ case INT:
+ case BOOLEAN:
+ rows[rowId] = dataBlock.getInt(rowId, columnIndex);
+ break;
+ case LONG:
+ rows[rowId] = dataBlock.getLong(rowId, columnIndex);
+ break;
+ case FLOAT:
+ rows[rowId] = (long) dataBlock.getFloat(rowId, columnIndex);
+ break;
+ case DOUBLE:
+ rows[rowId] = (long) dataBlock.getDouble(rowId, columnIndex);
+ break;
+ case BIG_DECIMAL:
+ rows[rowId] = dataBlock.getBigDecimal(rowId, columnIndex).longValue();
+ break;
+ default:
+ throw new IllegalStateException(
+ String.format("Unsupported data type: %s for column: %s", columnDataTypes[columnIndex], columnIndex));
+ }
+ }
+ return rows;
+ }
+
+ /**
+ * Given a datablock and the column index, extracts the float values for the column. Prefer using this function over
+ * extractRowFromDatablock if the desired datatype is known to prevent autoboxing to Object and later unboxing to the
+ * desired type.
+ * This only works on ROW format.
+ * TODO: Add support for COLUMNAR format.
+ * @return float array of values in the column
+ */
+ public static float[] extractFloatValuesForColumn(DataBlock dataBlock, int columnIndex) {
+ DataSchema dataSchema = dataBlock.getDataSchema();
+ DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+ // Get null bitmap for the column.
+ RoaringBitmap nullBitmap = extractNullBitmaps(dataBlock)[columnIndex];
+ int numRows = dataBlock.getNumberOfRows();
+
+ float[] rows = new float[numRows];
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ if (nullBitmap != null && nullBitmap.contains(rowId)) {
+ continue;
+ }
+
+ switch (columnDataTypes[columnIndex]) {
+ case INT:
+ case BOOLEAN:
+ rows[rowId] = dataBlock.getInt(rowId, columnIndex);
+ break;
+ case LONG:
+ rows[rowId] = dataBlock.getLong(rowId, columnIndex);
+ break;
+ case FLOAT:
+ rows[rowId] = dataBlock.getFloat(rowId, columnIndex);
+ break;
+ case DOUBLE:
+ rows[rowId] = (float) dataBlock.getDouble(rowId, columnIndex);
+ break;
+ case BIG_DECIMAL:
+ rows[rowId] = dataBlock.getBigDecimal(rowId, columnIndex).floatValue();
+ break;
+ default:
+ throw new IllegalStateException(
+ String.format("Unsupported data type: %s for column: %s", columnDataTypes[columnIndex], columnIndex));
+ }
+ }
+
+ return rows;
+ }
+
+ /**
+ * Given a datablock and the column index, extracts the double values for the column. Prefer using this function over
+ * extractRowFromDatablock if the desired datatype is known to prevent autoboxing to Object and later unboxing to the
+ * desired type.
+ * This only works on ROW format.
+ * TODO: Add support for COLUMNAR format.
+ * @return double array of values in the column
+ */
+ public static double[] extractDoubleValuesForColumn(DataBlock dataBlock, int columnIndex) {
+ DataSchema dataSchema = dataBlock.getDataSchema();
+ DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+ // Get null bitmap for the column.
+ RoaringBitmap nullBitmap = extractNullBitmaps(dataBlock)[columnIndex];
+ int numRows = dataBlock.getNumberOfRows();
+
+ double[] rows = new double[numRows];
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ if (nullBitmap != null && nullBitmap.contains(rowId)) {
+ continue;
+ }
+ switch (columnDataTypes[columnIndex]) {
+ case INT:
+ case BOOLEAN:
+ rows[rowId] = dataBlock.getInt(rowId, columnIndex);
+ break;
+ case LONG:
+ rows[rowId] = dataBlock.getLong(rowId, columnIndex);
+ break;
+ case FLOAT:
+ rows[rowId] = dataBlock.getFloat(rowId, columnIndex);
+ break;
+ case DOUBLE:
+ rows[rowId] = dataBlock.getDouble(rowId, columnIndex);
+ break;
+ case BIG_DECIMAL:
+ rows[rowId] = dataBlock.getBigDecimal(rowId, columnIndex).doubleValue();
+ break;
+ default:
+ throw new IllegalStateException(
+ String.format("Unsupported data type: %s for column: %s", columnDataTypes[columnIndex], columnIndex));
+ }
+ }
+
+ return rows;
+ }
+
+ /**
+ * Given a datablock and the column index, extracts the BigDecimal values for the column. Prefer using this function
+ * over extractRowFromDatablock if the desired datatype is known to prevent autoboxing to Object and later unboxing to
+ * the desired type.
+ * This only works on ROW format.
+ * TODO: Add support for COLUMNAR format.
+ * @return BigDecimal array of values in the column
+ */
+ public static BigDecimal[] extractBigDecimalValuesForColumn(DataBlock dataBlock, int columnIndex) {
+ DataSchema dataSchema = dataBlock.getDataSchema();
+ DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+ // Get null bitmap for the column.
+ RoaringBitmap nullBitmap = extractNullBitmaps(dataBlock)[columnIndex];
+ int numRows = dataBlock.getNumberOfRows();
+
+ BigDecimal[] rows = new BigDecimal[numRows];
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ if (nullBitmap != null && nullBitmap.contains(rowId)) {
+ continue;
+ }
+
+ switch (columnDataTypes[columnIndex]) {
+ case INT:
+ case BOOLEAN:
+ rows[rowId] = BigDecimal.valueOf(dataBlock.getInt(rowId, columnIndex));
+ break;
+ case LONG:
+ rows[rowId] = BigDecimal.valueOf(dataBlock.getLong(rowId, columnIndex));
+ break;
+ case FLOAT:
+ rows[rowId] = BigDecimal.valueOf(dataBlock.getFloat(rowId, columnIndex));
+ break;
+ case DOUBLE:
+ rows[rowId] = BigDecimal.valueOf(dataBlock.getDouble(rowId, columnIndex));
+ break;
+ case BIG_DECIMAL:
+ rows[rowId] = BigDecimal.valueOf(dataBlock.getBigDecimal(rowId, columnIndex).doubleValue());
+ break;
+ default:
+ throw new IllegalStateException(
+ String.format("Unsupported data type: %s for column: %s", columnDataTypes[columnIndex], columnIndex));
+ }
+ }
+
+ return rows;
+ }
+
+ /**
+ * Given a datablock and the column index, extracts the String values for the column. Prefer using this function over
+ * extractRowFromDatablock if the desired datatype is known to prevent autoboxing to Object and later unboxing to the
+ * desired type.
+ * This only works on ROW format.
+ * TODO: Add support for COLUMNAR format.
+ * @return String array of values in the column
+ */
+ public static String[] extractStringValuesForColumn(DataBlock dataBlock, int columnIndex) {
+ DataSchema dataSchema = dataBlock.getDataSchema();
+ DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+ // Get null bitmap for the column.
+ RoaringBitmap nullBitmap = extractNullBitmaps(dataBlock)[columnIndex];
+ int numRows = dataBlock.getNumberOfRows();
+
+ String[] rows = new String[numRows];
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ if (nullBitmap != null && nullBitmap.contains(rowId)) {
+ continue;
+ }
+ rows[rowId] = dataBlock.getString(rowId, columnIndex);
+ }
+
+ return rows;
+ }
+
+ /**
+ * Given a datablock and the column index, extracts the byte values for the column. Prefer using this function over
+ * extractRowFromDatablock if the desired datatype is known to prevent autoboxing to Object and later unboxing to the
+ * desired type.
+ * This only works on ROW format.
+ * TODO: Add support for COLUMNAR format.
+ * @return byte array of values in the column
+ */
+ public static byte[][] extractBytesValuesForColumn(DataBlock dataBlock, int columnIndex) {
+ DataSchema dataSchema = dataBlock.getDataSchema();
+ DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+ // Get null bitmap for the column.
+ RoaringBitmap nullBitmap = extractNullBitmaps(dataBlock)[columnIndex];
+ int numRows = dataBlock.getNumberOfRows();
+
+ byte[][] rows = new byte[numRows][];
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ if (nullBitmap != null && nullBitmap.contains(rowId)) {
+ continue;
+ }
+ rows[rowId] = dataBlock.getBytes(rowId, columnIndex).getBytes();
+ }
+
+ return rows;
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/IntermediateStageBlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/common/IntermediateStageBlockValSet.java
new file mode 100644
index 0000000000..7ddaaf04c9
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/IntermediateStageBlockValSet.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common;
+
+import java.math.BigDecimal;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * In the multistage engine, the leaf stage servers process the data in columnar fashion. By the time the
+ * intermediate stage receives the projected column, they are converted to a row based format. This class provides
+ * the capability to convert the row based representation into columnar blocks so that they can be used to process
+ * aggregations using v1 aggregation functions.
+ * TODO: Support MV
+ */
+public class IntermediateStageBlockValSet implements BlockValSet {
+ private final FieldSpec.DataType _dataType;
+ private final DataBlock _dataBlock;
+ private final int _index;
+ private final RoaringBitmap _nullBitMap;
+
+ public IntermediateStageBlockValSet(DataSchema.ColumnDataType columnDataType, DataBlock dataBlock, int colIndex) {
+ _dataType = columnDataType.toDataType();
+ _dataBlock = dataBlock;
+ _index = colIndex;
+ _nullBitMap = dataBlock.getNullRowIds(colIndex);
+ }
+
+ /**
+ * Returns a bitmap of indices where null values are found.
+ */
+ @Nullable
+ @Override
+ public RoaringBitmap getNullBitmap() {
+ return _nullBitMap;
+ }
+
+ @Override
+ public FieldSpec.DataType getValueType() {
+ return _dataType;
+ }
+
+ @Override
+ public boolean isSingleValue() {
+ // TODO: Needs to be changed when we start supporting MV in multistage
+ return true;
+ }
+
+ @Nullable
+ @Override
+ public Dictionary getDictionary() {
+ return null;
+ }
+
+ @Override
+ public int[] getDictionaryIdsSV() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int[] getIntValuesSV() {
+ return DataBlockUtils.extractIntValuesForColumn(_dataBlock, _index);
+ }
+
+ @Override
+ public long[] getLongValuesSV() {
+ return DataBlockUtils.extractLongValuesForColumn(_dataBlock, _index);
+ }
+
+ @Override
+ public float[] getFloatValuesSV() {
+ return DataBlockUtils.extractFloatValuesForColumn(_dataBlock, _index);
+ }
+
+ @Override
+ public double[] getDoubleValuesSV() {
+ return DataBlockUtils.extractDoubleValuesForColumn(_dataBlock, _index);
+ }
+
+ @Override
+ public BigDecimal[] getBigDecimalValuesSV() {
+ return DataBlockUtils.extractBigDecimalValuesForColumn(_dataBlock, _index);
+ }
+
+ @Override
+ public String[] getStringValuesSV() {
+ return DataBlockUtils.extractStringValuesForColumn(_dataBlock, _index);
+ }
+
+ @Override
+ public byte[][] getBytesValuesSV() {
+ return DataBlockUtils.extractBytesValuesForColumn(_dataBlock, _index);
+ }
+
+ @Override
+ public int[][] getDictionaryIdsMV() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int[][] getIntValuesMV() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long[][] getLongValuesMV() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public float[][] getFloatValuesMV() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public double[][] getDoubleValuesMV() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String[][] getStringValuesMV() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public byte[][][] getBytesValuesMV() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int[] getNumMVEntries() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 4a01580e61..eb8ea016fa 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -43,6 +43,7 @@ public class AggregationFunctionFactory {
*/
public static AggregationFunction getAggregationFunction(FunctionContext function, boolean nullHandlingEnabled) {
try {
+ // TODO(Sonam): Replace $ removal with a util function
String upperCaseFunctionName = StringUtils.remove(function.getFunctionName(), '_').toUpperCase();
List<ExpressionContext> arguments = function.getArguments();
ExpressionContext firstArgument = arguments.get(0);
@@ -188,6 +189,8 @@ public class AggregationFunctionFactory {
case MAX:
return new MaxAggregationFunction(firstArgument, nullHandlingEnabled);
case SUM:
+ // TODO(Sonam): Uncomment SUM0 when merging planner changes
+ // case SUM0:
return new SumAggregationFunction(firstArgument, nullHandlingEnabled);
case SUMPRECISION:
return new SumPrecisionAggregationFunction(arguments, nullHandlingEnabled);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageAggregationExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageAggregationExecutor.java
new file mode 100644
index 0000000000..5a3f61198e
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageAggregationExecutor.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.IntermediateStageBlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * Class that executes all aggregation functions (without group-bys) for the multistage AggregateOperator.
+ */
+public class MultistageAggregationExecutor {
+ private final NewAggregateOperator.Mode _mode;
+ // The identifier operands for the aggregation function only store the column name. This map contains mapping
+ // from column name to their index.
+ private final Map<String, Integer> _colNameToIndexMap;
+
+ private final AggregationFunction[] _aggFunctions;
+
+ // Result holders for each mode.
+ private final AggregationResultHolder[] _aggregateResultHolder;
+ private final Object[] _mergeResultHolder;
+ private final Object[] _finalResultHolder;
+
+ public MultistageAggregationExecutor(AggregationFunction[] aggFunctions, NewAggregateOperator.Mode mode,
+ Map<String, Integer> colNameToIndexMap) {
+ _aggFunctions = aggFunctions;
+ _mode = mode;
+ _colNameToIndexMap = colNameToIndexMap;
+
+ _aggregateResultHolder = new AggregationResultHolder[aggFunctions.length];
+ _mergeResultHolder = new Object[aggFunctions.length];
+ _finalResultHolder = new Object[aggFunctions.length];
+
+ for (int i = 0; i < _aggFunctions.length; i++) {
+ _aggregateResultHolder[i] = _aggFunctions[i].createAggregationResultHolder();
+ }
+ }
+
+ /**
+ * Performs aggregation for the data in the block.
+ */
+ public void processBlock(TransferableBlock block, DataSchema inputDataSchema) {
+ if (_mode.equals(NewAggregateOperator.Mode.AGGREGATE)) {
+ processAggregate(block, inputDataSchema);
+ } else if (_mode.equals(NewAggregateOperator.Mode.MERGE)) {
+ processMerge(block);
+ } else if (_mode.equals(NewAggregateOperator.Mode.EXTRACT_RESULT)) {
+ collectResult(block);
+ }
+ }
+
+ /**
+ * @return an empty agg result block for non-group-by aggregation.
+ */
+ public Object[] constructEmptyAggResultRow() {
+ Object[] row = new Object[_aggFunctions.length];
+ for (int i = 0; i < _aggFunctions.length; i++) {
+ AggregationFunction aggFunction = _aggFunctions[i];
+ row[i] = aggFunction.extractAggregationResult(aggFunction.createAggregationResultHolder());
+ }
+ return row;
+ }
+
+ /**
+ * Fetches the result.
+ */
+ public List<Object[]> getResult() {
+ List<Object[]> rows = new ArrayList<>();
+ Object[] row = new Object[_aggFunctions.length];
+
+ for (int i = 0; i < _aggFunctions.length; i++) {
+ AggregationFunction aggFunction = _aggFunctions[i];
+ if (_mode.equals(NewAggregateOperator.Mode.MERGE)) {
+ Object value = _mergeResultHolder[i];
+ row[i] = convertObjectToReturnType(_aggFunctions[i].getType(), value);
+ } else if (_mode.equals(NewAggregateOperator.Mode.AGGREGATE)) {
+ Object value = aggFunction.extractAggregationResult(_aggregateResultHolder[i]);
+ row[i] = convertObjectToReturnType(_aggFunctions[i].getType(), value);
+ } else {
+ assert _mode.equals(NewAggregateOperator.Mode.EXTRACT_RESULT);
+ Comparable result = aggFunction.extractFinalResult(_finalResultHolder[i]);
+ row[i] = result == null ? null : aggFunction.getFinalResultColumnType().convert(result);
+ }
+ }
+ rows.add(row);
+ return rows;
+ }
+
+ private Object convertObjectToReturnType(AggregationFunctionType aggType, Object value) {
+ // For bool_and and bool_or aggregation functions, the return type for aggregate and merge modes are set as
+ // boolean. However, the v1 bool_and and bool_or function uses Integer as the intermediate type.
+ boolean boolAndOrAgg =
+ aggType.equals(AggregationFunctionType.BOOLAND) || aggType.equals(AggregationFunctionType.BOOLOR);
+ if (boolAndOrAgg && value instanceof Integer) {
+ Boolean boolVal = ((Number) value).intValue() > 0 ? true : false;
+ return boolVal;
+ }
+ return value;
+ }
+
+ private void processAggregate(TransferableBlock block, DataSchema inputDataSchema) {
+ for (int i = 0; i < _aggFunctions.length; i++) {
+ AggregationFunction aggregationFunction = _aggFunctions[i];
+ Map<ExpressionContext, BlockValSet> blockValSetMap =
+ getBlockValSetMap(aggregationFunction, block, inputDataSchema);
+ aggregationFunction.aggregate(block.getNumRows(), _aggregateResultHolder[i], blockValSetMap);
+ }
+ }
+
+ private void processMerge(TransferableBlock block) {
+ List<Object[]> container = block.getContainer();
+
+ for (int i = 0; i < _aggFunctions.length; i++) {
+ for (Object[] row : container) {
+ Object intermediateResultToMerge = extractValueFromRow(_aggFunctions[i], row);
+ Object mergedIntermediateResult = _mergeResultHolder[i];
+
+ // Not all V1 aggregation functions have null-handling logic. Handle null values before calling merge.
+ if (intermediateResultToMerge == null) {
+ continue;
+ }
+ if (mergedIntermediateResult == null) {
+ _mergeResultHolder[i] = intermediateResultToMerge;
+ continue;
+ }
+
+ _mergeResultHolder[i] = _aggFunctions[i].merge(intermediateResultToMerge, mergedIntermediateResult);
+ }
+ }
+ }
+
+ private void collectResult(TransferableBlock block) {
+ List<Object[]> container = block.getContainer();
+ assert container.size() == 1;
+ Object[] row = container.get(0);
+ for (int i = 0; i < _aggFunctions.length; i++) {
+ _finalResultHolder[i] = extractValueFromRow(_aggFunctions[i], row);
+ }
+ }
+
+ private Map<ExpressionContext, BlockValSet> getBlockValSetMap(AggregationFunction aggFunction,
+ TransferableBlock block, DataSchema inputDataSchema) {
+ List<ExpressionContext> expressions = aggFunction.getInputExpressions();
+ int numExpressions = expressions.size();
+ if (numExpressions == 0) {
+ return Collections.emptyMap();
+ }
+
+ Preconditions.checkState(numExpressions == 1, "Cannot handle more than one identifier in aggregation function.");
+ ExpressionContext expression = expressions.get(0);
+ Preconditions.checkState(expression.getType().equals(ExpressionContext.Type.IDENTIFIER));
+ int index = _colNameToIndexMap.get(expression.getIdentifier());
+
+ DataSchema.ColumnDataType dataType = inputDataSchema.getColumnDataType(index);
+ Preconditions.checkState(block.getType().equals(DataBlock.Type.ROW), "Datablock type is not ROW");
+ // TODO: If the previous block is not mailbox received, this method is not efficient. Then getDataBlock() will
+ // convert the unserialized format to serialized format of BaseDataBlock. Then it will convert it back to column
+ // value primitive type.
+ return Collections.singletonMap(expression,
+ new IntermediateStageBlockValSet(dataType, block.getDataBlock(), index));
+ }
+
+ Object extractValueFromRow(AggregationFunction aggregationFunction, Object[] row) {
+ // TODO: Add support to handle aggregation functions where:
+ // 1. The identifier need not be the first argument
+ // 2. There are more than one identifiers.
+ List<ExpressionContext> expressions = aggregationFunction.getInputExpressions();
+ Preconditions.checkState(expressions.size() == 1);
+ ExpressionContext expr = expressions.get(0);
+ ExpressionContext.Type exprType = expr.getType();
+
+ if (exprType.equals(ExpressionContext.Type.IDENTIFIER)) {
+ String colName = expr.getIdentifier();
+ int colIndex = _colNameToIndexMap.get(colName);
+
+ Object value = row[colIndex];
+
+ // Boolean aggregation functions like BOOL_AND and BOOL_OR have return types set to Boolean. However, their
+ // intermediateResultType is Integer. To handle this case convert Boolean objects to Integer objects.
+ boolean boolAndOrAgg =
+ aggregationFunction.getType().equals(AggregationFunctionType.BOOLAND) || aggregationFunction.getType()
+ .equals(AggregationFunctionType.BOOLOR);
+ if (boolAndOrAgg && value instanceof Boolean) {
+ Integer intVal = ((Boolean) value).booleanValue() ? 1 : 0;
+ return intVal;
+ }
+
+ return value;
+ }
+
+ Preconditions.checkState(exprType.equals(ExpressionContext.Type.LITERAL), "Invalid expression type");
+ return expr.getLiteral().getValue();
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
new file mode 100644
index 0000000000..dedb9f9b58
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.IntermediateStageBlockValSet;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * Class that executes the group by aggregations for the multistage AggregateOperator.
+ */
+public class MultistageGroupByExecutor {
+ private final NewAggregateOperator.Mode _mode;
+ // The identifier operands for the aggregation function only store the column name. This map contains mapping
+ // between column name to their index which is used in v2 engine.
+ private final Map<String, Integer> _colNameToIndexMap;
+
+ private final List<ExpressionContext> _groupSet;
+ private final AggregationFunction[] _aggFunctions;
+
+ // Group By Result holders for each mode
+ private final GroupByResultHolder[] _aggregateResultHolders;
+ private final Map<Integer, Object[]> _mergeResultHolder;
+ private final List<Object[]> _finalResultHolder;
+
+ // Mapping from the row-key to a zero based integer index. This is used when we invoke the v1 aggregation functions
+ // because they use the zero based integer indexes to store results.
+ private int _groupId = 0;
+ private Map<Key, Integer> _groupKeyToIdMap;
+
+ // Mapping from the group by row-key to the values in the row which form the key. Used to fetch the actual row
+ // values when populating the result.
+ private final Map<Key, Object[]> _groupByKeyHolder;
+
+ public MultistageGroupByExecutor(List<ExpressionContext> groupByExpr, AggregationFunction[] aggFunctions,
+ NewAggregateOperator.Mode mode, Map<String, Integer> colNameToIndexMap) {
+ _mode = mode;
+ _colNameToIndexMap = colNameToIndexMap;
+ _groupSet = groupByExpr;
+ _aggFunctions = aggFunctions;
+
+ _aggregateResultHolders = new GroupByResultHolder[_aggFunctions.length];
+ _mergeResultHolder = new HashMap<>();
+ _finalResultHolder = new ArrayList<>();
+
+ _groupKeyToIdMap = new HashMap<>();
+ _groupByKeyHolder = new HashMap<>();
+
+ for (int i = 0; i < _aggFunctions.length; i++) {
+ _aggregateResultHolders[i] =
+ _aggFunctions[i].createGroupByResultHolder(InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+ InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+ }
+ }
+
+ /**
+ * Performs group-by aggregation for the data in the block.
+ */
+ public void processBlock(TransferableBlock block, DataSchema inputDataSchema) {
+ if (_mode.equals(NewAggregateOperator.Mode.AGGREGATE)) {
+ processAggregate(block, inputDataSchema);
+ } else if (_mode.equals(NewAggregateOperator.Mode.MERGE)) {
+ processMerge(block);
+ } else if (_mode.equals(NewAggregateOperator.Mode.EXTRACT_RESULT)) {
+ collectResult(block);
+ }
+ }
+
+ /**
+ * Fetches the result.
+ */
+ public List<Object[]> getResult() {
+ List<Object[]> rows = new ArrayList<>();
+
+ if (_mode.equals(NewAggregateOperator.Mode.EXTRACT_RESULT)) {
+ return extractFinalGroupByResult();
+ }
+
+ // If the mode is MERGE or AGGREGATE, the groupby keys are already collected in _groupByKeyHolder by virtue of
+ // generating the row keys.
+ for (Map.Entry<Key, Object[]> e : _groupByKeyHolder.entrySet()) {
+ int numCols = _groupSet.size() + _aggFunctions.length;
+ Object[] row = new Object[numCols];
+ Object[] keyElements = e.getValue();
+ System.arraycopy(keyElements, 0, row, 0, keyElements.length);
+
+ for (int i = 0; i < _aggFunctions.length; i++) {
+ int index = i + _groupSet.size();
+ int groupId = _groupKeyToIdMap.get(e.getKey());
+ if (_mode.equals(NewAggregateOperator.Mode.MERGE)) {
+ Object value = _mergeResultHolder.get(groupId)[i];
+ row[index] = convertObjectToReturnType(_aggFunctions[i].getType(), value);
+ } else if (_mode.equals(NewAggregateOperator.Mode.AGGREGATE)) {
+ Object value = _aggFunctions[i].extractGroupByResult(_aggregateResultHolders[i], groupId);
+ row[index] = convertObjectToReturnType(_aggFunctions[i].getType(), value);
+ }
+ }
+
+ rows.add(row);
+ }
+
+ return rows;
+ }
+
+ private List<Object[]> extractFinalGroupByResult() {
+ List<Object[]> rows = new ArrayList<>();
+ for (Object[] resultRow : _finalResultHolder) {
+ int numCols = _groupSet.size() + _aggFunctions.length;
+ Object[] row = new Object[numCols];
+ System.arraycopy(resultRow, 0, row, 0, _groupSet.size());
+
+ for (int i = 0; i < _aggFunctions.length; i++) {
+ int aggIdx = i + _groupSet.size();
+ Comparable result = _aggFunctions[i].extractFinalResult(resultRow[aggIdx]);
+ row[aggIdx] = result == null ? null : _aggFunctions[i].getFinalResultColumnType().convert(result);
+ }
+
+ rows.add(row);
+ }
+ return rows;
+ }
+
+ private Object convertObjectToReturnType(AggregationFunctionType aggType, Object value) {
+ // For bool_and and bool_or aggregation functions, the return type for aggregate and merge modes are set as
+ // boolean. However, the v1 bool_and and bool_or function uses Integer as the intermediate type.
+ boolean boolAndOrAgg =
+ aggType.equals(AggregationFunctionType.BOOLAND) || aggType.equals(AggregationFunctionType.BOOLOR);
+ if (boolAndOrAgg && value instanceof Integer) {
+ Boolean boolVal = ((Number) value).intValue() > 0 ? true : false;
+ return boolVal;
+ }
+ return value;
+ }
+
+ private void processAggregate(TransferableBlock block, DataSchema inputDataSchema) {
+ int[] intKeys = generateGroupByKeys(block.getContainer());
+
+ for (int i = 0; i < _aggFunctions.length; i++) {
+ AggregationFunction aggregationFunction = _aggFunctions[i];
+ Map<ExpressionContext, BlockValSet> blockValSetMap =
+ getBlockValSetMap(aggregationFunction, block, inputDataSchema);
+ GroupByResultHolder groupByResultHolder = _aggregateResultHolders[i];
+ groupByResultHolder.ensureCapacity(_groupKeyToIdMap.size());
+ aggregationFunction.aggregateGroupBySV(block.getNumRows(), intKeys, groupByResultHolder, blockValSetMap);
+ }
+ }
+
+ private void processMerge(TransferableBlock block) {
+ List<Object[]> container = block.getContainer();
+ int[] intKeys = generateGroupByKeys(container);
+
+ for (int i = 0; i < _aggFunctions.length; i++) {
+ for (int j = 0; j < container.size(); j++) {
+ Object[] row = container.get(j);
+ int rowKey = intKeys[j];
+ if (!_mergeResultHolder.containsKey(rowKey)) {
+ _mergeResultHolder.put(rowKey, new Object[_aggFunctions.length]);
+ }
+ Object intermediateResultToMerge = extractValueFromRow(_aggFunctions[i], row);
+ Object mergedIntermediateResult = _mergeResultHolder.get(rowKey)[i];
+
+ // Not all V1 aggregation functions have null-handling. So handle null values and call merge only if necessary.
+ if (intermediateResultToMerge == null) {
+ continue;
+ }
+ if (mergedIntermediateResult == null) {
+ _mergeResultHolder.get(rowKey)[i] = intermediateResultToMerge;
+ continue;
+ }
+
+ _mergeResultHolder.get(rowKey)[i] = _aggFunctions[i].merge(intermediateResultToMerge, mergedIntermediateResult);
+ }
+ }
+ }
+
+ private void collectResult(TransferableBlock block) {
+ List<Object[]> container = block.getContainer();
+ for (Object[] row : container) {
+ assert row.length == _groupSet.size() + _aggFunctions.length;
+ Object[] resultRow = new Object[row.length];
+ System.arraycopy(row, 0, resultRow, 0, _groupSet.size());
+
+ for (int i = 0; i < _aggFunctions.length; i++) {
+ int index = _groupSet.size() + i;
+ resultRow[index] = extractValueFromRow(_aggFunctions[i], row);
+ }
+
+ _finalResultHolder.add(resultRow);
+ }
+ }
+
+ /**
+ * Creates the group by key for each row. Converts the key into a 0-index based int value that can be used by
+ * GroupByAggregationResultHolders used in v1 aggregations.
+ * <p>
+ * Returns the int key for each row.
+ */
+ private int[] generateGroupByKeys(List<Object[]> rows) {
+ int[] rowKeys = new int[rows.size()];
+ int numGroups = _groupSet.size();
+
+ for (int i = 0; i < rows.size(); i++) {
+ Object[] row = rows.get(i);
+
+ Object[] keyElements = new Object[numGroups];
+ for (int j = 0; j < numGroups; j++) {
+ String colName = _groupSet.get(j).getIdentifier();
+ int colIndex = _colNameToIndexMap.get(colName);
+ keyElements[j] = row[colIndex];
+ }
+
+ Key rowKey = new Key(keyElements);
+ _groupByKeyHolder.put(rowKey, rowKey.getValues());
+ if (!_groupKeyToIdMap.containsKey(rowKey)) {
+ _groupKeyToIdMap.put(rowKey, _groupId);
+ ++_groupId;
+ }
+ rowKeys[i] = _groupKeyToIdMap.get(rowKey);
+ }
+
+ return rowKeys;
+ }
+
+ private Map<ExpressionContext, BlockValSet> getBlockValSetMap(AggregationFunction aggFunction,
+ TransferableBlock block, DataSchema inputDataSchema) {
+ List<ExpressionContext> expressions = aggFunction.getInputExpressions();
+ int numExpressions = expressions.size();
+ if (numExpressions == 0) {
+ return Collections.emptyMap();
+ }
+
+ Preconditions.checkState(numExpressions == 1, "Cannot handle more than one identifier in aggregation function.");
+ ExpressionContext expression = expressions.get(0);
+ Preconditions.checkState(expression.getType().equals(ExpressionContext.Type.IDENTIFIER));
+ int index = _colNameToIndexMap.get(expression.getIdentifier());
+
+ DataSchema.ColumnDataType dataType = inputDataSchema.getColumnDataType(index);
+ Preconditions.checkState(block.getType().equals(DataBlock.Type.ROW), "Datablock type is not ROW");
+ // TODO: If the previous block is not mailbox received, this method is not efficient. Then getDataBlock() will
+ // convert the unserialized format to serialized format of BaseDataBlock. Then it will convert it back to column
+ // value primitive type.
+ return Collections.singletonMap(expression,
+ new IntermediateStageBlockValSet(dataType, block.getDataBlock(), index));
+ }
+
+ Object extractValueFromRow(AggregationFunction aggregationFunction, Object[] row) {
+ // TODO: Add support to handle aggregation functions where:
+ // 1. The identifier need not be the first argument
+ // 2. There are more than one identifiers.
+ List<ExpressionContext> expressions = aggregationFunction.getInputExpressions();
+ Preconditions.checkState(expressions.size() == 1);
+ ExpressionContext expr = expressions.get(0);
+ ExpressionContext.Type exprType = expr.getType();
+
+ if (exprType.equals(ExpressionContext.Type.IDENTIFIER)) {
+ String colName = expr.getIdentifier();
+ int colIndex = _colNameToIndexMap.get(colName);
+ Object value = row[colIndex];
+
+ // Boolean aggregation functions like BOOL_AND and BOOL_OR have return types set to Boolean. However, their
+ // intermediateResultType is Integer. To handle this case convert Boolean objects to Integer objects.
+ boolean boolAndOrAgg =
+ aggregationFunction.getType().equals(AggregationFunctionType.BOOLAND) || aggregationFunction.getType()
+ .equals(AggregationFunctionType.BOOLOR);
+ if (boolAndOrAgg && value instanceof Boolean) {
+ Integer intVal = ((Boolean) value).booleanValue() ? 1 : 0;
+ return intVal;
+ }
+
+ return value;
+ }
+
+ Preconditions.checkState(exprType.equals(ExpressionContext.Type.LITERAL), "Invalid expression type");
+ return expr.getLiteral().getValue();
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NewAggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NewAggregateOperator.java
new file mode 100644
index 0000000000..fda873b84e
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NewAggregateOperator.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ *
+ * AggregateOperator is used to aggregate values over a set of group by keys.
+ * Output data will be in the format of [group by key, aggregate result1, ... aggregate resultN]
+ * Currently, we only support the following aggregation functions:
+ * 1. SUM
+ * 2. COUNT
+ * 3. MIN
+ * 4. MAX
+ * 5. DistinctCount and Count(Distinct)
+ * 6. AVG
+ * 7. FourthMoment
+ * 8. BoolAnd and BoolOr
+ *
+ * When the list of aggregation calls is empty, this class is used to calculate distinct result based on group by keys.
+ * In this case, the input can be any type.
+ *
+ * If the list of aggregation calls is not empty, the input of aggregation has to be a number.
+ * Note: This class performs aggregation over the double value of input.
+ * If the input is single value, the output type will be input type. Otherwise, the output type will be double.
+ */
+// TODO(Sonam): Rename to AggregateOperator when merging Planner support.
+public class NewAggregateOperator extends MultiStageOperator {
+ private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+
+ private final MultiStageOperator _inputOperator;
+ private final DataSchema _resultSchema;
+ private final DataSchema _inputSchema;
+
+ // Map that maintains the mapping between columnName and the column ordinal index. It is used to fetch the required
+ // column value from row-based container and fetch the input datatype for the column.
+ private final HashMap<String, Integer> _colNameToIndexMap;
+
+ private TransferableBlock _upstreamErrorBlock;
+ private boolean _readyToConstruct;
+ private boolean _hasReturnedAggregateBlock;
+
+ private final boolean _isGroupByAggregation;
+ private MultistageAggregationExecutor _aggregationExecutor;
+ private MultistageGroupByExecutor _groupByExecutor;
+
+ // This aggregation operator uses the v1 Aggregation functions to process the rows. It operates in one of the 3 modes:
+ // 1. Aggregate Mode:
+ // - Calls aggregate(), aggregateGroupBySV(), aggregateGroupByMV()
+ // - This mode is used when isLeafStage hint is set or if treatIntermediateAsLeaf is true.
+ // 2. Merge Mode:
+ // - Calls merge()
+ // - This mode is used when isIntermediateStage hint is set and if treatIntermediateAsLeaf is false.
+ // 3. ExtractResult Mode:
+ // - Calls extractFinalResult()
+ // - This mode is used when isFinalStage() hint is set.
+ enum Mode {
+ AGGREGATE,
+ MERGE,
+ EXTRACT_RESULT
+ }
+
+ // TODO: refactor Pinot Reducer code to support the intermediate stage agg operator.
+ // aggCalls has to be a list of FunctionCall and cannot be null
+ // groupSet has to be a list of InputRef and cannot be null
+ // TODO: Add these two checks when we confirm we can handle error in upstream ctor call.
+
+ @VisibleForTesting
+ public NewAggregateOperator(OpChainExecutionContext context, MultiStageOperator inputOperator,
+ DataSchema resultSchema, DataSchema inputSchema, List<RexExpression> aggCalls, List<RexExpression> groupSet,
+ boolean isLeafStage, boolean isIntermediateStage, boolean extractFinalResult, boolean treatIntermediateAsLeaf) {
+ super(context);
+ _inputOperator = inputOperator;
+ _resultSchema = resultSchema;
+ _inputSchema = inputSchema;
+
+ _upstreamErrorBlock = null;
+ _readyToConstruct = false;
+ _hasReturnedAggregateBlock = false;
+ _colNameToIndexMap = new HashMap<>();
+
+ Mode mode;
+ if (extractFinalResult) {
+ mode = Mode.EXTRACT_RESULT;
+ } else if (isIntermediateStage && !treatIntermediateAsLeaf) {
+ mode = Mode.MERGE;
+ } else {
+ assert isLeafStage || (isIntermediateStage && treatIntermediateAsLeaf);
+ mode = Mode.AGGREGATE;
+ }
+
+ // Convert groupSet to ExpressionContext that our aggregation functions understand.
+ List<ExpressionContext> groupByExpr = getGroupSet(groupSet);
+ List<FunctionContext> functionContexts = getFunctionContexts(aggCalls);
+ AggregationFunction[] aggFunctions = new AggregationFunction[functionContexts.size()];
+
+ for (int i = 0; i < functionContexts.size(); i++) {
+ aggFunctions[i] = AggregationFunctionFactory.getAggregationFunction(functionContexts.get(i), true);
+ }
+
+ // Initialize the appropriate executor.
+ if (!groupSet.isEmpty()) {
+ _isGroupByAggregation = true;
+ _groupByExecutor = new MultistageGroupByExecutor(groupByExpr, aggFunctions, mode, _colNameToIndexMap);
+ } else {
+ _isGroupByAggregation = false;
+ _aggregationExecutor = new MultistageAggregationExecutor(aggFunctions, mode, _colNameToIndexMap);
+ }
+ }
+
+ @Override
+ public List<MultiStageOperator> getChildOperators() {
+ return ImmutableList.of(_inputOperator);
+ }
+
+ @Nullable
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ protected TransferableBlock getNextBlock() {
+ try {
+ if (!_readyToConstruct && !consumeInputBlocks()) {
+ return TransferableBlockUtils.getNoOpTransferableBlock();
+ }
+
+ if (_upstreamErrorBlock != null) {
+ return _upstreamErrorBlock;
+ }
+
+ if (!_hasReturnedAggregateBlock) {
+ return produceAggregatedBlock();
+ } else {
+ // TODO: Move to close call.
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ }
+ } catch (Exception e) {
+ return TransferableBlockUtils.getErrorTransferableBlock(e);
+ }
+ }
+
+ private TransferableBlock produceAggregatedBlock() {
+ List<Object[]> rows = _isGroupByAggregation ? _groupByExecutor.getResult() : _aggregationExecutor.getResult();
+
+ _hasReturnedAggregateBlock = true;
+ if (rows.size() == 0) {
+ if (!_isGroupByAggregation) {
+ Object[] row = _aggregationExecutor.constructEmptyAggResultRow();
+ return new TransferableBlock(Collections.singletonList(row), _resultSchema, DataBlock.Type.ROW);
+ } else {
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ }
+ } else {
+ return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
+ }
+ }
+
+ /**
+ * @return whether or not the operator is ready to move on (EOS or ERROR)
+ */
+ private boolean consumeInputBlocks() {
+ TransferableBlock block = _inputOperator.nextBlock();
+ while (!block.isNoOpBlock()) {
+ // setting upstream error block
+ if (block.isErrorBlock()) {
+ _upstreamErrorBlock = block;
+ return true;
+ } else if (block.isEndOfStreamBlock()) {
+ _readyToConstruct = true;
+ return true;
+ }
+
+ if (_isGroupByAggregation) {
+ _groupByExecutor.processBlock(block, _inputSchema);
+ } else {
+ _aggregationExecutor.processBlock(block, _inputSchema);
+ }
+
+ block = _inputOperator.nextBlock();
+ }
+ return false;
+ }
+
+ private List<FunctionContext> getFunctionContexts(List<RexExpression> aggCalls) {
+ List<RexExpression.FunctionCall> aggFunctionCalls =
+ aggCalls.stream().map(RexExpression.FunctionCall.class::cast).collect(Collectors.toList());
+ List<FunctionContext> functionContexts = new ArrayList<>();
+ for (RexExpression.FunctionCall functionCall : aggFunctionCalls) {
+ FunctionContext funcContext = convertRexExpressionsToFunctionContext(functionCall);
+ functionContexts.add(funcContext);
+ }
+ return functionContexts;
+ }
+
+ private FunctionContext convertRexExpressionsToFunctionContext(RexExpression.FunctionCall aggFunctionCall) {
+ // Extract details from RexExpression aggFunctionCall.
+ String functionName = aggFunctionCall.getFunctionName();
+ List<RexExpression> functionOperands = aggFunctionCall.getFunctionOperands();
+
+ List<ExpressionContext> aggArguments = new ArrayList<>();
+ for (RexExpression operand : functionOperands) {
+ ExpressionContext exprContext = convertRexExpressionToExpressionContext(operand);
+ aggArguments.add(exprContext);
+ }
+
+ if (aggArguments.isEmpty()) {
+ // This can only be true for COUNT aggregation functions.
+ // The literal value here does not matter. We create a dummy literal here just so that the count aggregation
+ // has some column to process.
+ ExpressionContext literalExpr = ExpressionContext.forLiteralContext(FieldSpec.DataType.LONG, 1L);
+ aggArguments.add(literalExpr);
+ }
+
+ FunctionContext functionContext = new FunctionContext(FunctionContext.Type.AGGREGATION, functionName, aggArguments);
+ return functionContext;
+ }
+
+ private List<ExpressionContext> getGroupSet(List<RexExpression> groupBySetRexExpr) {
+ List<ExpressionContext> groupByExprContext = new ArrayList<>();
+ for (RexExpression groupByRexExpr : groupBySetRexExpr) {
+ ExpressionContext exprContext = convertRexExpressionToExpressionContext(groupByRexExpr);
+ groupByExprContext.add(exprContext);
+ }
+
+ return groupByExprContext;
+ }
+
+ private ExpressionContext convertRexExpressionToExpressionContext(RexExpression rexExpr) {
+ ExpressionContext exprContext;
+
+ // This is used only for aggregation arguments and groupby columns. The rexExpression can never be a function type.
+ switch (rexExpr.getKind()) {
+ case INPUT_REF: {
+ RexExpression.InputRef inputRef = (RexExpression.InputRef) rexExpr;
+ int identifierIndex = inputRef.getIndex();
+ String columnName = _inputSchema.getColumnName(identifierIndex);
+ // Calcite generates unique column names for aggregation functions. For example, select avg(col1), sum(col1)
+ // will generate names $f0 and $f1 for avg and sum respectively. We use a map to store the name -> index
+ // mapping to extract the required column value from row-based container and fetch the input datatype for the
+ // column.
+ _colNameToIndexMap.put(columnName, identifierIndex);
+ exprContext = ExpressionContext.forIdentifier(columnName);
+ break;
+ }
+ case LITERAL: {
+ RexExpression.Literal literalRexExp = (RexExpression.Literal) rexExpr;
+ Object value = literalRexExp.getValue();
+ exprContext = ExpressionContext.forLiteralContext(literalRexExp.getDataType(), value);
+ break;
+ }
+ default:
+ throw new IllegalStateException("Aggregation Function operands or GroupBy columns cannot be a function.");
+ }
+
+ return exprContext;
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 722790d867..7385ca1d48 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.plan;
import java.util.ArrayList;
import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.ExchangeNode;
import org.apache.pinot.query.planner.plannode.FilterNode;
@@ -96,6 +97,18 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
@Override
public MultiStageOperator visitAggregate(AggregateNode node, PhysicalPlanContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
+ DataSchema inputSchema = node.getInputs().get(0).getDataSchema();
+ DataSchema resultSchema = node.getDataSchema();
+
+ // TODO(Sonam): Rename to AggregateOperator when the planner changes are merged.
+// boolean extractFinalResult = AggregateNode.isFinalStage(node);
+// boolean isIntermediateStage = AggregateNode.isIntermediateStage(node);
+// boolean isLeafStage = AggregateNode.isLeafStage(node);
+// boolean treatIntermediateAsLeaf = node.isTreatIntermediateStageAsLeaf();
+//
+// return new NewAggregateOperator(context.getOpChainExecutionContext(), nextOperator, resultSchema, inputSchema,
+// node.getAggCalls(), node.getGroupSet(), isLeafStage, isIntermediateStage, extractFinalResult,
+// treatIntermediateAsLeaf);
return new AggregateOperator(context.getOpChainExecutionContext(), nextOperator, node.getDataSchema(),
node.getAggCalls(), node.getGroupSet(), node.getInputs().get(0).getDataSchema());
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index bda5086a40..21b4b047bd 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -44,7 +44,7 @@ import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.DOUBLE;
import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.STRING;
-
+// TODO(Sonam): Ensure test passes when the switch to AggregateOperator is made.
public class AggregateOperatorTest {
private AutoCloseable _mocks;
diff --git a/pinot-query-runtime/src/test/resources/queries/Aggregates.json b/pinot-query-runtime/src/test/resources/queries/Aggregates.json
index aaafa39651..bd3132090b 100644
--- a/pinot-query-runtime/src/test/resources/queries/Aggregates.json
+++ b/pinot-query-runtime/src/test/resources/queries/Aggregates.json
@@ -1,5 +1,5 @@
{
- "general_aggregate": {
+ "general_aggregate_no_filter": {
"tables": {
"tbl": {
"schema": [
@@ -22,54 +22,638 @@
"queries": [
{
"psql": "4.2.7",
- "description": "average double",
- "sql": "SELECT avg(double_col) FROM {tbl}"
+ "description": "count * aggregation",
+ "sql": "SELECT count(*) FROM {tbl}"
},
{
"psql": "4.2.7",
- "description": "average double with filter",
- "sql": "SELECT avg(double_col) FROM {tbl} WHERE double_col >= 100"
+ "description": "aggregations on double column",
+ "sql": "SELECT min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl}"
},
{
"psql": "4.2.7",
- "description": "sum double",
- "sql": "SELECT sum(double_col) FROM {tbl}"
+ "description": "aggregations on int column",
+ "sql": "SELECT min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl}"
},
{
"psql": "4.2.7",
- "description": "sum int",
- "sql": "SELECT sum(int_col) FROM {tbl}"
+ "description": "aggregations on bool column",
+ "sql": "SELECT bool_and(bool_col), bool_or(bool_col), count(bool_col), count(distinct(bool_col)), count(*) FROM {tbl}"
},
{
"psql": "4.2.7",
- "description": "max double",
- "sql": "SELECT max(double_col) FROM {tbl}"
+ "description": "aggregations on string column",
+ "sql": "SELECT count(string_col), count(distinct(string_col)), count(*) FROM {tbl}"
+ }
+ ]
+ },
+ "general_aggregate_with_filter": {
+ "tables": {
+ "tbl": {
+ "schema": [
+ {"name": "int_col", "type": "INT"},
+ {"name": "double_col", "type": "DOUBLE"},
+ {"name": "string_col", "type": "STRING"},
+ {"name": "bool_col", "type": "BOOLEAN"}
+ ],
+ "inputs": [
+ [2, 300, "a", true],
+ [2, 400, "a", true],
+ [3, 100, "b", false],
+ [100, 1, "b", false],
+ [101, 1.01, "c", false],
+ [150, 1.5, "c", false],
+ [175, 1.75, "c", true]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "psql": "4.2.7",
+ "description": "count aggregation with filters",
+ "sql": "SELECT count(*) FROM {tbl} WHERE string_col='b'"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on double column with string filters",
+ "sql": "SELECT min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} WHERE string_col='b'"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on double column with int filters",
+ "sql": "SELECT min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} WHERE int_col > 100"
+ },
+ {
+ "psql": "4.2.7",
+ "ignored": true,
+ "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code because existing code returns INT 0. But H2 returns a double.",
+ "description": "aggregate int column and filter by int column",
+ "sql": "SELECT sum(1 / int_col) FROM {tbl} WHERE int_col > 0",
+ "h2Sql": "SELECT sum(1.0 / int_col) FROM {tbl} WHERE int_col > 0"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregate double column and filter by double column",
+ "sql": "SELECT sum(1 / double_col) FROM {tbl} WHERE double_col < 2"
+ },
+ {
+ "psql": "4.2.7",
+ "ignored": true,
+ "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code but is fixed with this patch",
+ "description": "aggregations on double column with filters",
+ "sql": "SELECT min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on int column with filters",
+ "sql": "SELECT min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE string_col='b'"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on int column with filters",
+ "sql": "SELECT min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on int column with filters",
+ "sql": "SELECT min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on int column with filters",
+ "sql": "SELECT min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE string_col='b'"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on int column with filters",
+ "sql": "SELECT min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on int column with filters",
+ "sql": "SELECT min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on bool column with filters",
+ "sql": "SELECT bool_and(bool_col), bool_or(bool_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE string_col='b'"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on bool column with filters",
+ "sql": "SELECT bool_and(bool_col), bool_or(bool_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on bool column with filters",
+ "sql": "SELECT bool_and(bool_col), bool_or(bool_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on string column with filters",
+ "sql": "SELECT count(string_col), count(distinct(string_col)), count(*) FROM {tbl} WHERE string_col='b'"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on string column with filters",
+ "sql": "SELECT count(string_col), count(distinct(string_col)), count(*) FROM {tbl} WHERE int_col > 100"
+ },
+ {
+ "psql": "4.2.7",
+ "ignored": true,
+ "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code but is fixed with this patch.",
+ "description": "aggregations on string column with filters",
+ "sql": "SELECT count(string_col), count(distinct(string_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75"
+ }
+ ]
+ },
+ "general_group_by_aggregate_no_filter": {
+ "tables": {
+ "tbl": {
+ "schema": [
+ {"name": "int_col", "type": "INT"},
+ {"name": "double_col", "type": "DOUBLE"},
+ {"name": "string_col", "type": "STRING"},
+ {"name": "bool_col", "type": "BOOLEAN"}
+ ],
+ "inputs": [
+ [2, 300, "a", true],
+ [2, 400, "a", true],
+ [3, 100, "b", false],
+ [100, 1, "b", false],
+ [101, 1.01, "c", false],
+ [150, 1.5, "c", false],
+ [175, 1.75, "c", true]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "psql": "4.2.7",
+ "description": "group by with count",
+ "sql": "SELECT int_col, count(*) FROM {tbl} GROUP BY int_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "group by with aggregation on double column",
+ "sql": "SELECT int_col, min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} GROUP BY int_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "group by with aggregation on double column and order by",
+ "sql": "SELECT int_col, min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} GROUP BY int_col ORDER BY int_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "group by with aggregations on int column",
+ "sql": "SELECT double_col, min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} GROUP BY double_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "group by with aggregations on int column and order by",
+ "sql": "SELECT double_col, min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} GROUP BY double_col ORDER BY double_col"
},
{
"psql": "4.2.7",
- "description": "max int",
- "sql": "SELECT max(int_col) FROM {tbl}"
+ "description": "group by with aggregations on bool column",
+ "sql": "SELECT string_col, bool_and(bool_col), bool_or(bool_col), count(bool_col), count(distinct(bool_col)), count(*) FROM {tbl} GROUP BY string_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "group by with aggregations on bool column and order by",
+ "sql": "SELECT string_col, bool_and(bool_col), bool_or(bool_col), count(bool_col), count(distinct(bool_col)), count(*) FROM {tbl} GROUP BY string_col ORDER BY string_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "group by with aggregations on string column",
+ "sql": "SELECT bool_col, count(string_col), count(distinct(string_col)), count(*) FROM {tbl} GROUP BY bool_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "group by with aggregations on string column and order by",
+ "sql": "SELECT bool_col, count(string_col), count(distinct(string_col)), count(*) FROM {tbl} GROUP BY bool_col ORDER BY bool_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "group by on multiple columns with aggregation on double column",
+ "sql": "SELECT int_col, string_col, min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} GROUP BY int_col, string_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "group by multiple columns with aggregations on int column",
+ "sql": "SELECT double_col, string_col, bool_col, min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} GROUP BY double_col, string_col, bool_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "group by and order by on multiple columns with aggregation on double column",
+ "sql": "SELECT int_col, string_col, min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} GROUP BY int_col, string_col ORDER BY int_col, string_col"
+ }
+ ]
+ },
+ "general_group_by_aggregate_with_filter": {
+ "tables": {
+ "tbl": {
+ "schema": [
+ {"name": "int_col", "type": "INT"},
+ {"name": "double_col", "type": "DOUBLE"},
+ {"name": "string_col", "type": "STRING"},
+ {"name": "bool_col", "type": "BOOLEAN"}
+ ],
+ "inputs": [
+ [2, 300, "a", true],
+ [2, 400, "a", true],
+ [3, 100, "b", false],
+ [100, 1, "b", false],
+ [101, 1.01, "c", false],
+ [150, 1.5, "c", false],
+ [175, 1.75, "c", true]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on double column with filters",
+ "sql": "SELECT string_col, count(*) FROM {tbl} WHERE string_col='b' GROUP BY string_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on double column with filters",
+ "sql": "SELECT int_col, min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} WHERE string_col='b' GROUP BY int_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on double column with filters",
+ "sql": "SELECT int_col, string_col, min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} WHERE int_col > 100 GROUP BY int_col, string_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on double column with filters",
+ "sql": "SELECT int_col, string_col, bool_col, min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75 GROUP BY int_col, string_col, bool_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on int column with filters",
+ "sql": "SELECT bool_col, min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE string_col='b' GROUP BY bool_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on int column with filters",
+ "sql": "SELECT bool_col, double_col, min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100 GROUP BY bool_col, double_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on int column with filters",
+ "sql": "SELECT bool_col, double_col, string_col, min(int_col), max(int_col), avg(int_col), sum(int_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75 GROUP BY bool_col, double_col, string_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on bool column with filters",
+ "sql": "SELECT string_col, bool_and(bool_col), bool_or(bool_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE string_col='b' GROUP BY string_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on bool column with filters",
+ "sql": "SELECT string_col, double_col, bool_and(bool_col), bool_or(bool_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100 GROUP BY string_col, double_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on bool column with filters",
+ "sql": "SELECT string_col, double_col, int_col, bool_and(bool_col), bool_or(bool_col), count(int_col), count(distinct(int_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75GROUP BY string_col, double_col, int_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on string column with filters",
+ "sql": "SELECT double_col, count(string_col), count(distinct(string_col)), count(*) FROM {tbl} WHERE string_col='b' GROUP BY double_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on string column with filters",
+ "sql": "SELECT double_col, int_col, count(string_col), count(distinct(string_col)), count(*) FROM {tbl} WHERE int_col > 100 GROUP BY double_col, int_col"
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregations on string column with filters",
+ "sql": "SELECT double_col, int_col, bool_col, count(string_col), count(distinct(string_col)), count(*) FROM {tbl} WHERE int_col > 100 and double_col = 1.75 GROUP BY double_col, int_col, bool_col"
+ }
+ ]
+ },
+ "aggregations_with_transform": {
+ "tables": {
+ "tbl": {
+ "schema": [
+ {"name": "int_col", "type": "INT"},
+ {"name": "double_col", "type": "DOUBLE"},
+ {"name": "string_col", "type": "STRING"},
+ {"name": "bool_col", "type": "BOOLEAN"}
+ ],
+ "inputs": [
+ [2, 300, "a", true],
+ [2, 400, "a", true],
+ [3, 100, "b", false],
+ [100, 1, "b", false],
+ [101, 1.01, "c", false],
+ [150, 1.5, "c", false],
+ [175, 1.75, "c", true]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "psql": "4.2.7",
+ "description": "transform in groupby column with filter",
+ "sql": "SELECT CONCAT(string_col, bool_col, '-') as c1, min(double_col), max(double_col), avg(double_col), sum(double_col), count(double_col), count(distinct(double_col)), count(*) FROM {tbl} WHERE string_col='b' GROUP BY c1",
+ "outputs": [
+ ["b-false", 1.0, 100.0, 50.5, 101.0, 2, 2, 2]
+ ]
+ },
+ {
+ "psql": "4.2.7",
+ "description": "aggregation with transform column with filter",
+ "sql": "SELECT count(CONCAT(string_col, bool_col, '-')), count(distinct(CONCAT(string_col, bool_col, '-'))), count(*) FROM {tbl}",
+ "outputs": [
+ [7, 4, 7]
+ ]
+ },
+ {
+ "psql": "4.2.7",
+ "description": "group by and aggregation with transform column with filter",
+ "sql": "SELECT int_col, count(CONCAT(string_col, bool_col, '-')), count(distinct(CONCAT(string_col, bool_col, '-'))), count(*) FROM {tbl} GROUP BY int_col",
+ "outputs": [
+ [2, 2, 1, 2],
+ [3, 1, 1, 1],
+ [100, 1, 1, 1],
+ [101, 1, 1, 1],
+ [150, 1, 1, 1],
+ [175, 1, 1, 1]
+ ]
+ }
+ ]
+ },
+ "aggregation_with_complex_sql": {
+ "tables": {
+ "tbl1": {
+ "schema": [
+ {"name": "int_col", "type": "INT"},
+ {"name": "double_col", "type": "DOUBLE"},
+ {"name": "string_col", "type": "STRING"},
+ {"name": "bool_col", "type": "BOOLEAN"}
+ ],
+ "inputs": [
+ [2, 300, "a", true],
+ [2, 400, "a", true],
+ [3, 100, "b", false],
+ [100, 1, "b", false],
+ [101, 1.01, "c", false],
+ [150, 1.5, "c", false],
+ [175, 1.75, "c", true]
+ ]
+ },
+ "tbl2": {
+ "schema":[
+ {"name": "int_col2", "type": "INT"},
+ {"name": "string_col2", "type": "STRING"},
+ {"name": "double_col2", "type": "DOUBLE"}
+ ],
+ "inputs": [
+ [1, "apple", 1000.0],
+ [2, "a", 1.323],
+ [3, "b", 1212.12],
+ [3, "c", 341],
+ [4, "orange", 1212.121]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "psql": "9.21.0",
+ "ignored": true,
+ "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code but is fixed with this patch",
+ "description": "JOIN with simple aggregations on int and double columns",
+ "sql": "SELECT min(double_col), min(int_col), min(double_col2), min(int_col2), max(double_col), max(int_col), max(double_col2), max(int_col2), avg(int_col), avg(double_col), avg(double_col2), avg(int_col2), sum(int_col), sum(double_col), sum(double_col2), sum(int_col2), count(*), count(distinct int_col), count(distinct double_col), count(distinct int_col2), count(distinct double_col2) from {tbl1} JOIN {tbl2} ON string_col = string_col2"
+ },
+ {
+ "psql": "9.21.0",
+ "ignored": true,
+ "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code but is fixed with this patch",
+ "description": "JOIN with simple aggregations on boolean columns",
+ "sql": "SELECT count(bool_col), count(distinct bool_col), bool_and(bool_col), bool_or(bool_col) from {tbl1} JOIN {tbl2} ON string_col = string_col2"
+ },
+ {
+ "psql": "9.21.0",
+ "description": "JOIN with simple aggregations on string columns",
+ "sql": "SELECT count(string_col), count(string_col2), count(distinct string_col), count(distinct string_col2) from {tbl1} JOIN {tbl2} ON int_col = int_col2"
+ },
+ {
+ "psql": "9.21.0",
+ "ignored": true,
+ "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code but is fixed with this patch",
+ "description": "JOIN with simple aggregations on int and double columns and group by on bool_col",
+ "sql": "SELECT bool_col, min(double_col), min(int_col), min(double_col2), min(int_col2), max(double_col), max(int_col), max(double_col2), max(int_col2), avg(int_col), avg(double_col), avg(double_col2), avg(int_col2), sum(int_col), sum(double_col), sum(double_col2), sum(int_col2), count(*), count(distinct int_col), count(distinct double_col), count(distinct int_col2), count(distinct double_col2) from {tbl1} JOIN {tbl2} ON string_col = string_col2 GROUP BY bool_col"
+ },
+ {
+ "psql": "9.21.0",
+ "ignored": true,
+ "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code but is fixed with this patch",
+ "description": "JOIN with aggregations on int and double columns and group by on bool_col and string_col",
+ "sql": "SELECT bool_col, string_col, min(double_col), min(int_col), min(double_col2), min(int_col2), max(double_col), max(int_col), max(double_col2), max(int_col2), avg(int_col), avg(double_col), avg(double_col2), avg(int_col2), sum(int_col), sum(double_col), sum(double_col2), sum(int_col2), count(*), count(distinct int_col), count(distinct double_col), count(distinct int_col2), count(distinct double_col2) from {tbl1} JOIN {tbl2} ON string_col = string_col2 GROUP BY bool_col, str [...]
+ },
+ {
+ "psql": "9.21.0",
+ "description": "JOIN with aggregations on boolean columns and group by on string",
+ "sql": "SELECT string_col, count(bool_col), count(distinct bool_col), bool_and(bool_col), bool_or(bool_col) from {tbl1} JOIN {tbl2} ON string_col = string_col2 GROUP BY string_col"
+ },
+ {
+ "psql": "9.21.0",
+ "ignored": true,
+ "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code but is fixed with this patch",
+ "description": "JOIN with aggregations on boolean columns and group by on string and int",
+ "sql": "SELECT string_col, int_col, count(bool_col), count(distinct bool_col), bool_and(bool_col), bool_or(bool_col) from {tbl1} JOIN {tbl2} ON string_col = string_col2 GROUP BY string_col, int_col"
+ },
+ {
+ "psql": "9.21.0",
+ "description": "JOIN with aggregations on string columns and group by on double",
+ "sql": "SELECT double_col, count(string_col), count(string_col2), count(distinct string_col), count(distinct string_col2) from {tbl1} JOIN {tbl2} ON int_col = int_col2 GROUP BY double_col"
+ },
+ {
+ "psql": "9.21.0",
+ "description": "JOIN with aggregations on string columns and group by on double, bool and int",
+ "sql": "SELECT double_col, bool_col, int_col, count(string_col), count(string_col2), count(distinct string_col), count(distinct string_col2) from {tbl1} JOIN {tbl2} ON int_col = int_col2 GROUP BY double_col, bool_col, int_col"
+ },
+ {
+ "psql": "9.21.0",
+ "description": "Aggregation on semi join with multiple not in",
+ "sql": "SELECT bool_col, int_col, count(string_col), count(distinct string_col), min(double_col), max(double_col), avg(double_col), count(double_col) from {tbl1} WHERE string_col = 'c' AND int_col NOT IN (SELECT int_col2 FROM {tbl2} WHERE string_col2='a') AND int_col NOT IN (SELECT int_col2 FROM {tbl2} WHERE string_col2='bar') AND int_col NOT IN (SELECT int_col2 FROM {tbl2} WHERE string_col2='orange') GROUP BY bool_col, int_col"
+ }
+ ]
+ },
+ "aggregations_not_supported_by_h2": {
+ "tables": {
+ "tbl": {
+ "schema": [
+ {"name": "int_col", "type": "INT"},
+ {"name": "double_col", "type": "DOUBLE"},
+ {"name": "string_col", "type": "STRING"},
+ {"name": "bool_col", "type": "BOOLEAN"}
+ ],
+ "inputs": [
+ [2, 300, "a", true],
+ [3, 3213, "a", true],
+ [11, 545.12, "a", true],
+ [12, 543.12, "a", true],
+ [13, 765.4, "a", true],
+ [13, 400, "a", true],
+ [3, 100, "b", false],
+ [100, 1, "b", false],
+ [30, 10120, "b", false],
+ [100, 23121, "b", false],
+ [3, 43, "b", false],
+ [101, 1.01, "c", false],
+ [150, 1.5, "c", false],
+ [175, 1.75, "c", true],
+ [152, 1.25, "c", false],
+ [177, 1.095, "c", true]
+ ]
+ },
+ "tbl2": {
+ "schema":[
+ {"name": "int_col2", "type": "INT"},
+ {"name": "string_col2", "type": "STRING"},
+ {"name": "double_col2", "type": "DOUBLE"}
+ ],
+ "inputs": [
+ [1, "apple", 1000.0],
+ [2, "a", 1.323],
+ [3, "b", 1212.12],
+ [4, "orange", 1212.121]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "psql": "9.21.0",
+ "description": "distinct count on all columns",
+ "ignored": true,
+ "comments": "TODO(Sonam): Remove ignore check. Current code doesn't support distinctcount on numeric operands",
+ "sql": "SELECT distinctcount(int_col), distinctcount(double_col), distinctcount(bool_col), distinctcount(string_col) FROM {tbl}",
+ "outputs": [
+ [12, 16, 2, 3]
+ ]
+ },
+ {
+ "psql": "9.21.0",
+ "description": "distinct count with filter",
+ "ignored": true,
+ "comments": "TODO(Sonam): Remove ignore check. Current code doesn't support distinctcount on numeric operands",
+ "sql": "SELECT distinctcount(int_col), distinctcount(double_col), distinctcount(bool_col), distinctcount(string_col) FROM {tbl} where string_col='c'",
+ "outputs": [
+ [5, 5, 2, 1]
+ ]
+ },
+ {
+ "psql": "9.21.0",
+ "description": "distintcount with group by",
+ "ignored": true,
+ "comments": "TODO(Sonam): Remove ignore check. Current code doesn't support distinctcount on numeric operands",
+ "sql": "SELECT string_col, distinctcount(int_col), distinctcount(double_col), distinctcount(bool_col) FROM {tbl} group by string_col",
+ "outputs": [
+ ["a", 5, 6, 1],
+ ["b", 3, 5, 1],
+ ["c", 5, 5, 2]
+ ]
+ },
+ {
+ "psql": "9.21.0",
+ "description": "distinctcount with multiple group by columns",
+ "ignored": true,
+ "comments": "TODO(Sonam): Remove ignore check. Current code doesn't support distinctcount on numeric operands",
+ "sql": "SELECT string_col, bool_col, distinctcount(int_col), distinctcount(double_col) FROM {tbl} group by string_col, bool_col",
+ "outputs": [
+ ["a", true, 5, 6],
+ ["b", false, 3, 5],
+ ["c", true, 2, 2],
+ ["c", false, 3, 3]
+ ]
+ },
+ {
+ "psql": "9.21.0",
+ "description": "JOIN with distinct count on columns",
+ "ignored": true,
+ "comments": "TODO(Sonam): Remove ignore check. Current code doesn't support distinctcount on numeric operands",
+ "sql": "SELECT distinctcount(double_col), distinctcount(int_col), distinctcount(double_col2), distinctcount(int_col2), distinctcount(bool_col), distinctcount(string_col), distinctcount(string_col2) from {tbl} JOIN {tbl2} ON string_col = string_col2",
+ "outputs": [
+ [11, 7, 2, 2, 2, 2, 2]
+ ]
+ },
+ {
+ "psql": "9.21.0",
+ "description": "JOIN with distinct count and group by on columns",
+ "ignored": true,
+ "comments": "TODO(Sonam): Remove ignore check. Current code doesn't support distinctcount on numeric operands",
+ "sql": "SELECT string_col, distinctcount(double_col), distinctcount(int_col), distinctcount(double_col2), distinctcount(int_col2), distinctcount(bool_col) from {tbl} JOIN {tbl2} ON string_col = string_col2 GROUP BY string_col",
+ "outputs": [
+ ["a", 6, 5, 1, 1, 1],
+ ["b", 5, 3, 1, 1, 1]
+ ]
},
{
- "psql": "4.2.7",
- "description": "count int",
- "sql": "SELECT count(int_col) FROM {tbl}"
+ "psql": "9.21.0",
+ "description": "skewness and kurtosis and int and double cols",
+ "sql": "SELECT skewness(double_col), skewness(int_col), kurtosis(double_col), kurtosis(int_col) FROM {tbl}",
+ "outputs": [
+ [3.1103657693253997, 0.5794666900768517, 9.965595397275615, -1.4550234124062964]
+ ]
},
{
- "psql": "4.2.7",
- "description": "count double",
- "sql": "SELECT count(double_col) FROM {tbl}"
+ "psql": "9.21.0",
+ "description": "skewness and kurtosis and int and double cols with group by",
+ "sql": "SELECT string_col, skewness(double_col), skewness(int_col), kurtosis(double_col), kurtosis(int_col) FROM {tbl} group by string_col",
+ "outputs": [
+ ["a", 2.3416450161609847, -0.8960998790160103, 5.582974376555998, -1.791124260355027],
+ ["b", 1.4293497662867718, 0.40194821599933056, 1.220463765626068, -3.1341049510321075],
+ ["c", 0.6353100324008337, -1.3564793650180287, -1.1100427274943443, 1.989350375759732]
+ ]
},
{
- "psql": "4.2.7",
- "description": "count, sum group by order by",
- "sql": "select string_col, count(int_col), sum(double_col) from {tbl} group by string_col order by string_col;"
+ "psql": "9.21.0",
+ "description": "JOIN with skewness on columns",
+ "sql": "SELECT skewness(double_col), skewness(int_col), kurtosis(double_col2), kurtosis(int_col2) from {tbl} JOIN {tbl2} ON string_col = string_col2",
+ "outputs": [
+ [2.5196193851884536, 1.7344829395977062, -2.444444444444445, -2.444444444444444]
+ ]
},
{
- "psql": "4.2.7",
- "description": "min, max",
- "sql": "SELECT min(int_col), max(int_col) FROM {tbl}"
- },
+ "psql": "9.21.0",
+ "description": "JOIN with distinct count and group by on columns",
+ "sql": "SELECT string_col, skewness(double_col), skewness(int_col), kurtosis(double_col2), kurtosis(int_col2) from {tbl} JOIN {tbl2} ON string_col = string_col2 GROUP BY string_col",
+ "outputs": [
+ ["a", 2.3416450161609847, -0.8960998790160102, 0.0, 0.0],
+ ["b", 1.4293497662867716, 0.4019482159993307, 0.0, 0.0]
+ ]
+ }
+ ]
+ },
+ "unsupported_aggregates": {
+ "tables": {
+ "tbl": {
+ "schema": [
+ {"name": "int_col", "type": "INT"},
+ {"name": "double_col", "type": "DOUBLE"},
+ {"name": "string_col", "type": "STRING"},
+ {"name": "bool_col", "type": "BOOLEAN"}
+ ],
+ "inputs": [
+ [2, 300, "a", true],
+ [2, 400, "a", true],
+ [3, 100, "b", false],
+ [100, 1, "b", false],
+ [101, 1.01, "c", false],
+ [150, 1.5, "c", false],
+ [175, 1.75, "c", true]
+ ]
+ }
+ },
+ "queries": [
{
"psql": "4.2.7",
"comments": "plan error:Unsupported SQL aggregation kind: {}. Only splittable aggregation functions are supported! [SINGLE_VALUE]",
@@ -111,16 +695,6 @@
"description": "aggregate boolean column",
"sql": "SELECT min(bool_col) FROM {tbl}"
},
- {
- "psql": "9.21.0",
- "description": "aggregate boolean column",
- "sql": "SELECT bool_and(bool_col), bool_or(bool_col) FROM {tbl} GROUP BY string_col"
- },
- {
- "psql": "9.21.0",
- "description": "aggregate boolean column no group by",
- "sql": "SELECT bool_and(bool_col), bool_or(bool_col) FROM {tbl}"
- },
{
"ignored": true,
"comment": "issue with converting data types: Unexpected RelDataTypeField: ANY for column: EXPR$0",
@@ -135,36 +709,29 @@
"sql": "SELECT sum(pow(int_col, 2)) FROM {tbl}"
},
{
+ "psql": "4.2.7",
+ "description": "aggregate boolean column and filter by string column",
"ignored": true,
- "comment": "sum empty returns [0] instead of [null] at the moment",
- "description": "sum empty input after filter",
- "sql": "SELECT sum(int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
- },
- {
- "description": "count empty input after filter",
- "sql": "SELECT count(*) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
- },
- {
- "description": "count empty input after filter",
- "sql": "SELECT count(int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+ "comments": "Cannot apply 'SUM' to arguments of type 'SUM(<BOOLEAN>)",
+ "sql": "SELECT sum(bool_col) FROM {tbl} WHERE string_col > 'a'"
},
{
+ "psql": "4.2.7",
+ "comments": "parse error:Encountered HAVING",
"ignored": true,
- "comment": "sum empty returns [0] instead of [null] at the moment",
- "description": "sum empty input after filter with subquery",
- "sql": "SELECT sum(int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
- },
- {
- "description": "count empty input after filter with sub-query",
- "sql": "SELECT count(*) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+ "description": "aggregate query with having clause",
+ "sql": "select string_col, count(int_col) from {tbl} a group by string_col order by string_col having exists(select 1 from {tbl} b where count(int_col) = b.int_col);"
},
{
- "description": "count empty input after filter with sub-query",
- "sql": "SELECT count(int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+ "psql": "4.2.7",
+ "comments": "parse error:Encountered HAVING",
+ "description": "aggregate query with having clause",
+ "ignored": true,
+ "sql": "select string_col, count(int_col) from {tbl} a group by string_col order by string_col having (select count(*) from {tbl} b where count(int_col) = b.int_col) > 0;"
}
]
},
- "aggregate_filter": {
+ "empty_return_type_test_h2": {
"tables": {
"tbl": {
"schema": [
@@ -174,109 +741,142 @@
{"name": "bool_col", "type": "BOOLEAN"}
],
"inputs": [
- [2, 300, "a", false],
+ [2, 300, "a", true],
[2, 400, "a", true],
- [3, 100, "b", true],
- [0.001, 1, "b", false],
+ [3, 100, "b", false],
+ [100, 1, "b", false],
[101, 1.01, "c", false],
- [150, 1.5, "c", true],
- [175, 1.75, "c", true],
- [-10000, 1.75, "c", false],
- [-2, 0.5, "c", false]
+ [150, 1.5, "c", false],
+ [175, 1.75, "c", true]
]
}
},
"queries": [
{
- "psql": "4.2.7",
- "description": "aggregate int column and filter by int column",
- "sql": "SELECT min(int_col) FROM {tbl} WHERE int_col < 100"
+ "ignored": true,
+ "comment": "sum empty returns [0] instead of [null] at the moment when leaf aggregation is performed by v1 engine.",
+ "description": "Return empty for sum",
+ "sql": "SELECT sum(int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
},
{
- "psql": "4.2.7",
- "description": "aggregate int column and filter by int column",
- "sql": "SELECT sum(1 / int_col) FROM {tbl} WHERE int_col > 0",
- "h2Sql": "SELECT sum(1.0 / int_col) FROM {tbl} WHERE int_col > 0"
+ "description": "Return empty for sum with subquery",
+ "sql": "SELECT sum(int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
},
{
- "psql": "4.2.7",
- "description": "aggregate double column and filter by double column",
- "sql": "SELECT sum(1 / double_col) FROM {tbl} WHERE double_col < 1"
+ "ignored": true,
+ "comment": "min empty returns [infinity] instead of [null] at the moment when leaf aggregation is performed by v1 engine.",
+ "description": "Return empty for min",
+ "sql": "SELECT min(int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
},
{
- "psql": "4.2.7",
- "description": "aggregate double column and filter by string column",
- "sql": "SELECT sum(double_col) FROM {tbl} WHERE string_col > 'a'"
+ "description": "Return empty for min with subquery",
+ "sql": "SELECT min(int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
},
{
- "psql": "4.2.7",
- "description": "aggregate boolean column and filter by string column",
"ignored": true,
- "comments": "Cannot apply 'SUM' to arguments of type 'SUM(<BOOLEAN>)",
- "sql": "SELECT sum(bool_col) FROM {tbl} WHERE string_col > 'a'"
+ "comment": "max empty returns [-infinity] instead of [null] at the moment when leaf aggregation is performed by v1 engine.",
+ "description": "Return empty for max",
+ "sql": "SELECT max(int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+ },
+ {
+ "description": "Return empty for min with subquery",
+ "sql": "SELECT max(int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+ },
+ {
+ "comment": "avg empty returns [NaN] instead of [null] at the moment when leaf aggregation is performed by v1 engine.",
+ "description": "Return empty for avg",
+ "sql": "SELECT avg(int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+ },
+ {
+ "description": "Return empty for avg with subquery",
+ "sql": "SELECT avg(int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+ },
+ {
+ "description": "Return empty for count",
+ "sql": "SELECT count(int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+ },
+ {
+ "description": "Return empty for count with subquery",
+ "sql": "SELECT count(int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+ },
+ {
+ "description": "Return empty for count(*)",
+ "sql": "SELECT count(*) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+ },
+ {
+ "description": "Return empty for count * with subquery",
+ "sql": "SELECT count(*) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
},
{
- "psql": "4.2.7",
- "comments": "parse error:Encountered HAVING",
"ignored": true,
- "description": "aggregate query with having clause",
- "sql": "select string_col, count(int_col) from {tbl} a group by string_col order by string_col having exists(select 1 from {tbl} b where count(int_col) = b.int_col);"
+ "comment": "bool_and empty returns [true] instead of [null] at the moment when leaf aggregation is performed by v1 engine.",
+ "description": "Return empty for bool_and",
+ "sql": "SELECT bool_and(bool_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
},
{
- "psql": "4.2.7",
- "comments": "parse error:Encountered HAVING",
- "description": "aggregate query with having clause",
"ignored": true,
- "sql": "select string_col, count(int_col) from {tbl} a group by string_col order by string_col having (select count(*) from {tbl} b where count(int_col) = b.int_col) > 0;"
- }
- ]
- },
- "additional_aggregate": {
- "tables": {
- "tbl": {
- "schema": [
- {"name": "int_col", "type": "INT"},
- {"name": "double_col", "type": "DOUBLE"},
- {"name": "string_col", "type": "STRING"},
- {"name": "bool_col", "type": "BOOLEAN"}
- ],
- "inputs": [
- [2, 300, "a", false],
- [2, 400, "a", true],
- [3, 100, "b", true],
- [0.001, 1, "b", false],
- [101, 1.01, "c", false],
- [150, 1.5, "c", true],
- [175, 1.75, "c", true],
- [-10000, 1.75, "c", false],
- [-2, 0.5, "c", false]
- ]
+ "comment": "bool_and empty returns [0 which gets converted to false] instead of [null] when leaf aggregation is performed by Intermediate Stage",
+ "description": "Return empty for bool_and with subquery",
+ "sql": "SELECT bool_and(bool_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+ },
+ {
+ "ignored": true,
+ "comment": "bool_or empty returns [false] instead of [null] at the moment when leaf aggregation is performed by v1 engine.",
+ "description": "Return empty for bool_or",
+ "sql": "SELECT bool_or(bool_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+ },
+ {
+ "ignored": true,
+ "comment": "bool_or empty returns [false] instead of [null] at the moment when leaf aggregation is performed by Intermediate Stage",
+ "description": "Return empty for bool_or with subquery",
+ "sql": "SELECT bool_or(bool_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+ },
+ {
+ "description": "Return empty for count distinct",
+ "sql": "SELECT count(distinct int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+ },
+ {
+ "ignored": true,
+ "comments": "TODO(Sonam): Remove ignore check. Test fails with existing code because H2 expects 0. But we return null",
+ "description": "Return empty for count distinct with subquery",
+ "sql": "SELECT count(distinct int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+ },
+ {
+ "ignored": true,
+ "comment": "DistinctCount agg function doesn't support null handling",
+ "description": "Return empty for distinctcount",
+ "sql": "SELECT distinctcount(int_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+ },
+ {
+ "ignored": true,
+ "comment": "DistinctCount agg function doesn't support null handling",
+ "description": "Return empty for distinctcount with subquery",
+ "sql": "SELECT distinctcount(int_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+ },
+ {
+ "ignored": true,
+ "comment": "FourthMoment agg function doesn't support null handling",
+ "description": "Return empty for skewness",
+ "sql": "SELECT skewness(double_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+ },
+ {
+ "ignored": true,
+ "comment": "FourthMoment agg function doesn't support null handling",
+ "description": "Return empty for skewness with subquery",
+ "sql": "SELECT skewness(double_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
+ },
+ {
+ "ignored": true,
+ "comment": "FourthMoment agg function doesn't support null handling",
+ "description": "Return empty for kurtosis",
+ "sql": "SELECT kurtosis(double_col) FROM {tbl} WHERE string_col IN ('foo', 'bar')"
+ },
+ {
+ "ignored": true,
+ "comment": "FourthMoment agg function doesn't support null handling",
+ "description": "Return empty for kurtosis with subquery",
+ "sql": "SELECT kurtosis(double_col) FROM {tbl} WHERE string_col IN ( SELECT string_col FROM {tbl} WHERE int_col BETWEEN 1 AND 0 GROUP BY string_col )"
}
- },
- "queries": [
- { "sql": "SELECT min(int_col), min(int_col) FROM {tbl} WHERE int_col < 100" },
- { "sql": "SELECT min(int_col), count(int_col), count(*) FROM {tbl}" },
- { "sql": "SELECT string_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col" },
- { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col, bool_col" },
- { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col" },
- { "sql": "SELECT string_col, string_col AS alias, count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col" },
- { "sql": "SELECT string_col, bool_col, bool_col AS alias, sum(int_col), count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col" },
- { "sql": "SELECT sum(int_col), count(int_col) FROM {tbl} GROUP BY string_col" },
- { "sql": "SELECT string_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col ORDER BY string_col" },
- { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col, bool_col ORDER BY string_col" },
- { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col, bool_col ORDER BY bool_col, string_col" },
- { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col ORDER BY bool_col, string_col" },
- { "sql": "SELECT string_col, string_col AS alias, count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col ORDER BY string_col" },
- { "sql": "SELECT string_col, string_col AS alias, count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col ORDER BY bool_col, alias"},
- { "sql": "SELECT string_col, bool_col, bool_col AS alias, sum(int_col), count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col ORDER BY alias" },
- { "sql": "SELECT sum(int_col), count(int_col) FROM {tbl} GROUP BY string_col ORDER BY string_col" },
- { "sql": "SELECT CASE WHEN bool_col THEN int_col ELSE int_col - 5 END, count(int_col) FROM {tbl} GROUP BY CASE WHEN bool_col THEN int_col ELSE int_col - 5 END ORDER BY CASE WHEN bool_col THEN int_col ELSE int_col - 5 END" },
- { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) ORDER BY upper(string_col)" },
- { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) ORDER BY count(int_col)" },
- { "sql": "SELECT * FROM (SELECT string_col, bool_col, min(int_col) AS m, count(int_col), count(*) AS c FROM {tbl} GROUP BY string_col, bool_col ORDER BY string_col) WHERE c < m" },
- { "sql": "SELECT * FROM (SELECT string_col, bool_col, min(int_col) AS m, count(int_col), count(*) AS c FROM {tbl} GROUP BY string_col, bool_col ORDER BY bool_col, string_col) WHERE c < m" },
- { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) HAVING sum(int_col) > 0 ORDER BY upper(string_col)" },
- { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) HAVING sum(int_col) >= 0 AND count(int_col) >= 0 ORDER BY count(int_col)" }
]
},
"aggregate_with_hints": {
@@ -344,5 +944,53 @@
"sql": "SELECT /*+ skipLeafStageGroupByAggregation */ double_col, sum(int_col) FROM {tbl} WHERE int_col > 3 AND double_col > 1.0 GROUP BY double_col"
}
]
+ },
+ "additional_aggregate": {
+ "tables": {
+ "tbl": {
+ "schema": [
+ {"name": "int_col", "type": "INT"},
+ {"name": "double_col", "type": "DOUBLE"},
+ {"name": "string_col", "type": "STRING"},
+ {"name": "bool_col", "type": "BOOLEAN"}
+ ],
+ "inputs": [
+ [2, 300, "a", false],
+ [2, 400, "a", true],
+ [3, 100, "b", true],
+ [0.001, 1, "b", false],
+ [101, 1.01, "c", false],
+ [150, 1.5, "c", true],
+ [175, 1.75, "c", true],
+ [-10000, 1.75, "c", false],
+ [-2, 0.5, "c", false]
+ ]
+ }
+ },
+ "queries": [
+ { "sql": "SELECT min(int_col), min(int_col) FROM {tbl} WHERE int_col < 100" },
+ { "sql": "SELECT min(int_col), count(int_col), count(*) FROM {tbl}" },
+ { "sql": "SELECT string_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col" },
+ { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col, bool_col" },
+ { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col" },
+ { "sql": "SELECT string_col, string_col AS alias, count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col" },
+ { "sql": "SELECT string_col, bool_col, bool_col AS alias, sum(int_col), count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col" },
+ { "sql": "SELECT sum(int_col), count(int_col) FROM {tbl} GROUP BY string_col" },
+ { "sql": "SELECT string_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col ORDER BY string_col" },
+ { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col, bool_col ORDER BY string_col" },
+ { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY string_col, bool_col ORDER BY bool_col, string_col" },
+ { "sql": "SELECT string_col, bool_col, min(int_col), count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col ORDER BY bool_col, string_col" },
+ { "sql": "SELECT string_col, string_col AS alias, count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col ORDER BY string_col" },
+ { "sql": "SELECT string_col, string_col AS alias, count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col ORDER BY bool_col, alias"},
+ { "sql": "SELECT string_col, bool_col, bool_col AS alias, sum(int_col), count(int_col), count(*) FROM {tbl} GROUP BY bool_col, string_col ORDER BY alias" },
+ { "sql": "SELECT sum(int_col), count(int_col) FROM {tbl} GROUP BY string_col ORDER BY string_col" },
+ { "sql": "SELECT CASE WHEN bool_col THEN int_col ELSE int_col - 5 END, count(int_col) FROM {tbl} GROUP BY CASE WHEN bool_col THEN int_col ELSE int_col - 5 END ORDER BY CASE WHEN bool_col THEN int_col ELSE int_col - 5 END" },
+ { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) ORDER BY upper(string_col)" },
+ { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) ORDER BY count(int_col)" },
+ { "sql": "SELECT * FROM (SELECT string_col, bool_col, min(int_col) AS m, count(int_col), count(*) AS c FROM {tbl} GROUP BY string_col, bool_col ORDER BY string_col) WHERE c < m" },
+ { "sql": "SELECT * FROM (SELECT string_col, bool_col, min(int_col) AS m, count(int_col), count(*) AS c FROM {tbl} GROUP BY string_col, bool_col ORDER BY bool_col, string_col) WHERE c < m" },
+ { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) HAVING sum(int_col) > 0 ORDER BY upper(string_col)" },
+ { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) HAVING sum(int_col) >= 0 AND count(int_col) >= 0 ORDER BY count(int_col)" }
+ ]
}
}
diff --git a/pinot-query-runtime/src/test/resources/queries/NullHandling.json b/pinot-query-runtime/src/test/resources/queries/NullHandling.json
index 36fad242e6..3c5a6a7add 100644
--- a/pinot-query-runtime/src/test/resources/queries/NullHandling.json
+++ b/pinot-query-runtime/src/test/resources/queries/NullHandling.json
@@ -17,11 +17,12 @@
{"name": "strCol1", "type": "STRING"},
{"name": "strCol2", "type": "STRING"},
{"name": "intCol1", "type": "INT"},
- {"name": "doubleCol1", "type": "DOUBLE"}
+ {"name": "doubleCol1", "type": "DOUBLE"},
+ {"name": "boolCol1", "type": "BOOLEAN"}
],
"inputs": [
- ["foo", "bob", 3, 3.1416],
- ["alice", "alice", 4, 2.7183]
+ ["foo", "bob", 3, 3.1416, true],
+ ["alice", "alice", 4, 2.7183, false]
]
}
},
@@ -36,7 +37,7 @@
},
{
"description": "LEFT JOIN and AGGREGATE",
- "sql": "SELECT COUNT({tbl2}.intCol1), MIN({tbl2}.intCol1), MAX({tbl2}.doubleCol1), SUM({tbl2}.doubleCol1) FROM {tbl1} LEFT OUTER JOIN {tbl2} ON {tbl1}.strCol1 = {tbl2}.strCol1"
+ "sql": "SELECT COUNT({tbl2}.intCol1), MIN({tbl2}.intCol1), MAX({tbl2}.doubleCol1), SUM({tbl2}.doubleCol1), AVG({tbl2}.doubleCol1), BOOL_AND({tbl2}.boolCol1), BOOL_OR({tbl2}.boolCol1) FROM {tbl1} LEFT OUTER JOIN {tbl2} ON {tbl1}.strCol1 = {tbl2}.strCol1"
},
{
"description": "LEFT JOIN and GROUP BY",
@@ -44,7 +45,7 @@
},
{
"description": "LEFT JOIN and GROUP BY with AGGREGATE",
- "sql": "SELECT {tbl1}.strCol2, COUNT({tbl2}.intCol1), MIN({tbl2}.intCol1), MAX({tbl2}.doubleCol1), SUM({tbl2}.doubleCol1) FROM {tbl1} LEFT OUTER JOIN {tbl2} ON {tbl1}.strCol1 = {tbl2}.strCol1 GROUP BY {tbl1}.strCol2"
+ "sql": "SELECT {tbl1}.strCol2, COUNT({tbl2}.intCol1), MIN({tbl2}.intCol1), MAX({tbl2}.doubleCol1), SUM({tbl2}.doubleCol1), AVG({tbl2}.doubleCol1) FROM {tbl1} LEFT OUTER JOIN {tbl2} ON {tbl1}.strCol1 = {tbl2}.strCol1 GROUP BY {tbl1}.strCol2"
},
{
"description": "LEFT JOIN and SORT (by default, H2 treats null as the smallest value, which is different from Postgres, thus we don't test the default ordering)",
diff --git a/pinot-query-runtime/src/test/resources/queries/WithStatements.json b/pinot-query-runtime/src/test/resources/queries/WithStatements.json
index 1b74c31bc9..401bf39e0b 100644
--- a/pinot-query-runtime/src/test/resources/queries/WithStatements.json
+++ b/pinot-query-runtime/src/test/resources/queries/WithStatements.json
@@ -18,13 +18,17 @@
"schema": [
{"name": "strCol1", "type": "STRING"},
{"name": "strCol2", "type": "STRING"},
- {"name": "intCol", "type": "INT"}
+ {"name": "intCol", "type": "INT"},
+ {"name": "doubleCol", "type": "DOUBLE"},
+ {"name": "floatCol", "type": "FLOAT"},
+ {"name": "longCol", "type": "LONG"},
+ {"name": "boolCol", "type": "BOOLEAN"}
],
"inputs": [
- ["a", "foo", 1],
- ["a", "bar", 2],
- ["b", "alice", 42],
- ["b", "bob", 196883]
+ ["a", "foo", 1, 100.0, 12.0, 10000, true],
+ ["a", "bar", 2, 175.12, 22.22, 300000, false],
+ ["b", "alice", 42, 175.12, 22.22, 300000, true],
+ ["b", "bob", 196883, 175.12, 22.22, 300000, true]
]
}
},
@@ -39,6 +43,16 @@
["b", "bob", 2]
]
},
+ {
+ "description": "single 'with' with all aggregations and group by",
+ "sql": "WITH w AS (SELECT strCol, SUM(intCol) AS c1, AVG(intCol) AS c2, MIN(intCol) AS c3, MAX(intCol) AS c4, COUNT(*) as c5, COUNT(DISTINCT(intCol)) as c6 FROM {tbl1} GROUP BY strCol) SELECT strCol, strCol2, c1, c2, c3, c4, c5, c6 FROM w JOIN {tbl2} ON w.strCol = {tbl2}.strCol1",
+ "outputs": [
+ ["a", "foo", 4, 2.0, 1, 3, 2, 2],
+ ["a", "bar", 4, 2.0, 1, 3, 2, 2],
+ ["b", "alice", 2, 2.0, 2.0, 2.0, 1, 1],
+ ["b", "bob", 2, 2.0, 2.0, 2.0, 1, 1]
+ ]
+ },
{
"description": "multi 'with' table",
"sql": "WITH agg1 AS ( SELECT strCol, sum(intCol) AS sumVal FROM {tbl1} GROUP BY strCol), agg2 AS (SELECT strCol1, avg(intCol) AS avgVal FROM {tbl2} GROUP BY strCol1) SELECT strCol, sumVal - avgVal FROM agg1, agg2 WHERE agg1.strCol = agg2.strCol1",
@@ -47,6 +61,54 @@
["b", -98460.5]
]
},
+ {
+ "description": "multi 'with' table with all aggregations on int column",
+ "sql": "WITH agg1 AS ( SELECT strCol, sum(intCol) AS sumVal FROM {tbl1} GROUP BY strCol), agg2 AS (SELECT strCol1, SUM(intCol) AS c1, AVG(intCol) AS c2, MIN(intCol) AS c3, MAX(intCol) AS c4, COUNT(*) as c5, COUNT(DISTINCT(intCol)) AS c6 FROM {tbl2} GROUP BY strCol1) SELECT strCol, sumVal - c2, c1, c3, c4, c5, c6 FROM agg1, agg2 WHERE agg1.strCol = agg2.strCol1",
+ "outputs": [
+ ["a", 2.5, 3.0, 1, 2, 2, 2],
+ ["b", -98460.5, 196925.0, 42.0, 196883.0, 2, 2]
+ ]
+ },
+ {
+ "description": "multi 'with' table with all aggregations on double column",
+ "sql": "WITH agg1 AS ( SELECT strCol, sum(intCol) AS sumVal FROM {tbl1} GROUP BY strCol), agg2 AS (SELECT strCol1, SUM(doubleCol) AS c1, AVG(doubleCol) AS c2, MIN(doubleCol) AS c3, MAX(doubleCol) AS c4, COUNT(*) as c5, COUNT(DISTINCT(doubleCol)) AS c6 FROM {tbl2} GROUP BY strCol1) SELECT strCol, sumVal - c2, c1, c3, c4, c5, c6 FROM agg1, agg2 WHERE agg1.strCol = agg2.strCol1",
+ "outputs": [
+ ["a", -133.56, 275.12, 100.0, 175.12, 2, 2],
+ ["b", -173.12, 350.24, 175.12, 175.12, 2, 1]
+ ]
+ },
+ {
+ "description": "multi 'with' table with all aggregations on float column",
+ "sql": "WITH agg1 AS ( SELECT strCol, sum(intCol) AS sumVal FROM {tbl1} GROUP BY strCol), agg2 AS (SELECT strCol1, SUM(floatCol) AS c1, AVG(floatCol) AS c2, MIN(floatCol) AS c3, MAX(floatCol) AS c4, COUNT(*) as c5, COUNT(DISTINCT(floatCol)) AS c6 FROM {tbl2} GROUP BY strCol1) SELECT strCol, sumVal - c2, c1, c3, c4, c5, c6 FROM agg1, agg2 WHERE agg1.strCol = agg2.strCol1",
+ "outputs": [
+ ["a", -13.109999656677246, 34.21999931335449, 12.0, 22.219999313354492, 2, 2],
+ ["b", -20.219999313354492, 44.439998626708984, 22.219999313354492, 22.219999313354492, 2, 1]
+ ]
+ },
+ {
+ "description": "multi 'with' table with all aggregations on long column",
+ "sql": "WITH agg1 AS ( SELECT strCol, sum(intCol) AS sumVal FROM {tbl1} GROUP BY strCol), agg2 AS (SELECT strCol1, SUM(longCol) AS c1, AVG(longCol) AS c2, MIN(longCol) AS c3, MAX(longCol) AS c4, COUNT(*) as c5, COUNT(DISTINCT(longCol)) AS c6 FROM {tbl2} GROUP BY strCol1) SELECT strCol, sumVal - c2, c1, c3, c4, c5, c6 FROM agg1, agg2 WHERE agg1.strCol = agg2.strCol1",
+ "outputs": [
+ ["a", -154996.0, 310000.0, 10000.0, 300000.0, 2, 2],
+ ["b", -299998.0, 600000.0, 300000.0, 300000.0, 2, 1]
+ ]
+ },
+ {
+ "description": "multi 'with' table with all aggregations on string column",
+ "sql": "WITH agg1 AS ( SELECT strCol, sum(intCol) AS sumVal FROM {tbl1} GROUP BY strCol), agg2 AS (SELECT strCol1, COUNT(strCol2) as c1, COUNT(DISTINCT(strCol2)) AS c2 FROM {tbl2} GROUP BY strCol1) SELECT strCol, c1, c2 FROM agg1, agg2 WHERE agg1.strCol = agg2.strCol1",
+ "outputs": [
+ ["a", 2, 2],
+ ["b", 2, 2]
+ ]
+ },
+ {
+ "description": "multi 'with' table with all aggregations on bool column",
+ "sql": "WITH agg1 AS ( SELECT strCol, sum(intCol) AS sumVal FROM {tbl1} GROUP BY strCol), agg2 AS (SELECT strCol1, bool_and(boolCol) AS c1, bool_or(boolCol) as c2, COUNT(boolCol) as c3, COUNT(DISTINCT(boolCol)) AS c4 FROM {tbl2} GROUP BY strCol1) SELECT strCol, c1, c2, c3, c4 FROM agg1, agg2 WHERE agg1.strCol = agg2.strCol1",
+ "outputs": [
+ ["a", false, true, 2, 2],
+ ["b", true, true, 2, 1]
+ ]
+ },
{
"description": "nested 'with' on agg table: (with a as ( ... ), select ... ",
"sql": "WITH agg1 AS (SELECT strCol1, strCol2, sum(intCol) AS sumVal FROM {tbl2} GROUP BY strCol1, strCol2) SELECT strCol1, avg(sumVal) AS avgVal FROM agg1 GROUP BY strCol1",
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org