You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/12/01 01:24:18 UTC
[pinot] branch master updated: support BOOL_AND and BOOL_OR aggregate functions (#9848)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0a6d84391d support BOOL_AND and BOOL_OR aggregate functions (#9848)
0a6d84391d is described below
commit 0a6d84391dcbc9924b2b593e53121f981f1561db
Author: Almog Gavra <al...@gmail.com>
AuthorDate: Wed Nov 30 17:24:11 2022 -0800
support BOOL_AND and BOOL_OR aggregate functions (#9848)
* support BOOL_AND and BOOL_OR aggregate functions
* inline null handling and add LongResultHolders
* fix null handling and add null tests
* change Long to Int and a few other minor changes
---
.../query/aggregation/AggregationResultHolder.java | 13 ++
.../aggregation/DoubleAggregationResultHolder.java | 15 ++
...ltHolder.java => IntAggregateResultHolder.java} | 44 ++--
.../aggregation/ObjectAggregationResultHolder.java | 16 ++
.../function/AggregationFunctionFactory.java | 4 +
.../function/BaseBooleanAggregationFunction.java | 252 ++++++++++++++++++++
.../BooleanAndAggregationFunction.java} | 42 ++--
.../BooleanOrAggregationFunction.java} | 42 ++--
.../groupby/DoubleGroupByResultHolder.java | 10 +
.../aggregation/groupby/GroupByResultHolder.java | 19 ++
...sultHolder.java => IntGroupByResultHolder.java} | 33 ++-
.../groupby/ObjectGroupByResultHolder.java | 12 +
.../function/AggregationFunctionFactoryTest.java | 14 ++
.../pinot/queries/BooleanAggQueriesTest.java | 257 +++++++++++++++++++++
.../PinotAggregateExchangeNodeInsertRule.java | 2 +-
.../sql/fun/PinotBoolAndAggregateFunction.java | 39 ++--
.../sql/fun/PinotBoolOrAggregateFunction.java | 39 ++--
.../apache/calcite/sql/fun/PinotOperatorTable.java | 3 +
.../query/runtime/operator/AggregateOperator.java | 14 ++
.../src/test/resources/queries/Aggregates.json | 175 ++++----------
.../pinot/segment/spi/AggregationFunctionType.java | 6 +-
21 files changed, 776 insertions(+), 275 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
index 33e975c0ff..29a3b7e93d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
@@ -30,6 +30,12 @@ public interface AggregationResultHolder {
*/
void setValue(double value);
+ /**
+ * Set the 'primitive int' aggregation result.
+ * @param value
+ */
+ void setValue(int value);
+
/**
* Set the aggregation result value.
* @param value
@@ -42,6 +48,13 @@ public interface AggregationResultHolder {
*/
double getDoubleResult();
+ /**
+ * Returns the 'primitive int' aggregation result.
+ *
+ * @return
+ */
+ int getIntResult();
+
/**
* Returns the result of aggregation.
* @return
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DoubleAggregationResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DoubleAggregationResultHolder.java
index 9cff121797..7962b246d7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DoubleAggregationResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DoubleAggregationResultHolder.java
@@ -42,6 +42,11 @@ public class DoubleAggregationResultHolder implements AggregationResultHolder {
_value = value;
}
+ @Override
+ public void setValue(int value) {
+ throw new RuntimeException("Method 'setValue' (with int value) not supported for class " + getClass().getName());
+ }
+
/**
* {@inheritDoc}
* Value for this class is 'primitive double', so this method is not implemented.
@@ -61,6 +66,16 @@ public class DoubleAggregationResultHolder implements AggregationResultHolder {
return _value;
}
+ /**
+ * {@inheritDoc}
+ *
+ * @return
+ */
+ @Override
+ public int getIntResult() {
+ throw new RuntimeException("Method 'getIntResult' not supported for class " + getClass().getName());
+ }
+
/**
* {@inheritDoc}
* Result for this class is 'primitive double', so this method is not implemented.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DoubleAggregationResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/IntAggregateResultHolder.java
similarity index 66%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DoubleAggregationResultHolder.java
copy to pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/IntAggregateResultHolder.java
index 9cff121797..78d29a204b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DoubleAggregationResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/IntAggregateResultHolder.java
@@ -16,56 +16,42 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pinot.core.query.aggregation;
-/**
- * AggregationResultHolder interface implementation for result type 'primitive double'.
- *
- */
-public class DoubleAggregationResultHolder implements AggregationResultHolder {
- double _value;
+public class IntAggregateResultHolder implements AggregationResultHolder {
+
+ int _value;
- /**
- * Constructor for the class.
- * @param defaultValue
- */
- public DoubleAggregationResultHolder(double defaultValue) {
+ public IntAggregateResultHolder(int defaultValue) {
_value = defaultValue;
}
- /**
- * {@inheritDoc}
- * @param value
- */
@Override
public void setValue(double value) {
+ throw new RuntimeException("Method 'setValue' (with double value) not supported for class " + getClass().getName());
+ }
+
+ @Override
+ public void setValue(int value) {
_value = value;
}
- /**
- * {@inheritDoc}
- * Value for this class is 'primitive double', so this method is not implemented.
- * @param value
- */
@Override
public void setValue(Object value) {
throw new RuntimeException("Method 'setValue' (with object value) not supported for class " + getClass().getName());
}
- /**
- * {@inheritDoc}
- * @return
- */
@Override
public double getDoubleResult() {
+ throw new RuntimeException("Method 'getDoubleResult' not supported for class " + getClass().getName());
+ }
+
+ @Override
+ public int getIntResult() {
return _value;
}
- /**
- * {@inheritDoc}
- * Result for this class is 'primitive double', so this method is not implemented.
- * @return
- */
@Override
public <T> T getResult() {
throw new RuntimeException("Method 'getResult' not supported for class " + getClass().getName());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/ObjectAggregationResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/ObjectAggregationResultHolder.java
index fdca760d13..817ea086f5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/ObjectAggregationResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/ObjectAggregationResultHolder.java
@@ -42,6 +42,12 @@ public class ObjectAggregationResultHolder implements AggregationResultHolder {
_value = value;
}
+
+ @Override
+ public void setValue(int value) {
+ _value = value;
+ }
+
/**
* {@inheritDoc}
* @return
@@ -51,6 +57,16 @@ public class ObjectAggregationResultHolder implements AggregationResultHolder {
throw new RuntimeException("Method 'getDoubleResult' not supported for class " + getClass().getName());
}
+ /**
+ * {@inheritDoc}
+ *
+ * @return
+ */
+ @Override
+ public int getIntResult() {
+ throw new RuntimeException("Method 'getIntResult' not supported for class " + getClass().getName());
+ }
+
/**
* {@inheritDoc}
* @return
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 3eb3a4f5ad..4e1bda2025 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -273,6 +273,10 @@ public class AggregationFunctionFactory {
return new CovarianceAggregationFunction(arguments, false);
case COVARSAMP:
return new CovarianceAggregationFunction(arguments, true);
+ case BOOLAND:
+ return new BooleanAndAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
+ case BOOLOR:
+ return new BooleanOrAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
default:
throw new IllegalArgumentException();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseBooleanAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseBooleanAggregationFunction.java
new file mode 100644
index 0000000000..c295d4838f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseBooleanAggregationFunction.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.IntAggregateResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.IntGroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+// TODO: change this to implement BaseSingleInputAggregationFunction<Boolean, Boolean> when we get proper
+// handling of booleans in serialization - today this would fail because ColumnDataType#convert assumes
+// that the boolean is encoded as its stored type (an integer)
+public abstract class BaseBooleanAggregationFunction extends BaseSingleInputAggregationFunction<Integer, Integer> {
+
+ private final BooleanMerge _merger;
+ private final boolean _nullHandlingEnabled;
+
+ protected enum BooleanMerge {
+ AND {
+ @Override
+ int merge(int left, int right) {
+ return left & right;
+ }
+
+ @Override
+ boolean isTerminal(int agg) {
+ return agg == 0;
+ }
+
+ @Override
+ int getDefaultValue() {
+ return 1;
+ }
+ },
+ OR {
+ @Override
+ int merge(int left, int right) {
+ return left | right;
+ }
+
+ @Override
+ boolean isTerminal(int agg) {
+ return agg > 0;
+ }
+
+ @Override
+ int getDefaultValue() {
+ return 0;
+ }
+ };
+
+ abstract int merge(int left, int right);
+
+ abstract boolean isTerminal(int agg);
+
+ abstract int getDefaultValue();
+ }
+
+ protected BaseBooleanAggregationFunction(ExpressionContext expression, boolean nullHandlingEnabled,
+ BooleanMerge merger) {
+ super(expression);
+ _nullHandlingEnabled = nullHandlingEnabled;
+ _merger = merger;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return _nullHandlingEnabled
+ ? new ObjectAggregationResultHolder()
+ : new IntAggregateResultHolder(_merger.getDefaultValue());
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+ return _nullHandlingEnabled
+ ? new ObjectGroupByResultHolder(initialCapacity, maxCapacity)
+ : new IntGroupByResultHolder(initialCapacity, maxCapacity, _merger.getDefaultValue());
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ if (blockValSet.getValueType() != FieldSpec.DataType.BOOLEAN) {
+ throw new IllegalArgumentException(
+ String.format("Unsupported data type %s for %s", getType().getName(), blockValSet.getValueType()));
+ }
+
+ int[] bools = blockValSet.getIntValuesSV();
+ if (_nullHandlingEnabled) {
+ int agg = getInt(aggregationResultHolder.getResult());
+
+ // early terminate on a per-block level to allow the
+ // loop below to be more tightly optimized (avoid a branch)
+ if (_merger.isTerminal(agg)) {
+ return;
+ }
+
+ RoaringBitmap nullBitmap = blockValSet.getNullBitmap();
+ if (nullBitmap == null) {
+ nullBitmap = new RoaringBitmap();
+ } else if (nullBitmap.getCardinality() > length) {
+ return;
+ }
+
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ agg = _merger.merge(agg, bools[i]);
+ aggregationResultHolder.setValue((Object) agg);
+ }
+ }
+ } else {
+ int agg = aggregationResultHolder.getIntResult();
+
+ // early terminate on a per-block level to allow the
+ // loop below to be more tightly optimized (avoid a branch)
+ if (_merger.isTerminal(agg)) {
+ return;
+ }
+
+ for (int i = 0; i < length; i++) {
+ agg = _merger.merge(agg, bools[i]);
+ aggregationResultHolder.setValue(agg);
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ if (blockValSet.getValueType() != FieldSpec.DataType.BOOLEAN) {
+ throw new IllegalArgumentException(
+ String.format("Unsupported data type %s for %s", getType().getName(), blockValSet.getValueType()));
+ }
+
+ int[] bools = blockValSet.getIntValuesSV();
+ if (_nullHandlingEnabled) {
+ RoaringBitmap nullBitmap = blockValSet.getNullBitmap();
+ if (nullBitmap == null) {
+ nullBitmap = new RoaringBitmap();
+ } else if (nullBitmap.getCardinality() > length) {
+ return;
+ }
+
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ int groupByKey = groupKeyArray[i];
+ int agg = getInt(groupByResultHolder.getResult(groupByKey));
+ groupByResultHolder.setValueForKey(groupByKey, (Object) _merger.merge(agg, bools[i]));
+ }
+ }
+ } else {
+ for (int i = 0; i < length; i++) {
+ int groupByKey = groupKeyArray[i];
+ int agg = groupByResultHolder.getIntResult(groupByKey);
+ groupByResultHolder.setValueForKey(groupByKey, _merger.merge(agg, bools[i]));
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ int[] valueArray = blockValSetMap.get(_expression).getIntValuesSV();
+
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ int agg = groupByResultHolder.getIntResult(groupKey);
+ groupByResultHolder.setValueForKey(groupKey, _merger.merge(agg, valueArray[i]));
+ }
+ }
+ }
+
+ @Override
+ public Integer extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+ if (_nullHandlingEnabled) {
+ return aggregationResultHolder.getResult();
+ } else {
+ return aggregationResultHolder.getIntResult();
+ }
+ }
+
+ @Override
+ public Integer extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+ if (_nullHandlingEnabled) {
+ return groupByResultHolder.getResult(groupKey);
+ } else {
+ return groupByResultHolder.getIntResult(groupKey);
+ }
+ }
+
+ @Override
+ public Integer merge(Integer intermediateResult1, Integer intermediateResult2) {
+ if (_nullHandlingEnabled) {
+ if (intermediateResult1 == null) {
+ return intermediateResult2;
+ } else if (intermediateResult2 == null) {
+ return intermediateResult1;
+ }
+ }
+
+ return _merger.merge(intermediateResult1, intermediateResult2);
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+ return DataSchema.ColumnDataType.BOOLEAN;
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getFinalResultColumnType() {
+ return DataSchema.ColumnDataType.BOOLEAN;
+ }
+
+ @Override
+ public Integer extractFinalResult(Integer intermediateResult) {
+ return intermediateResult;
+ }
+
+ private int getInt(Integer val) {
+ return val == null ? _merger.getDefaultValue() : val;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BooleanAndAggregationFunction.java
similarity index 55%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
copy to pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BooleanAndAggregationFunction.java
index 33e975c0ff..c385e1dcfb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BooleanAndAggregationFunction.java
@@ -16,35 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.query.aggregation;
-/**
- * Interface for ResultHolder to hold the result of aggregation.
- *
- */
-public interface AggregationResultHolder {
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
- /**
- * Set the 'primitive double' aggregation result.
- * @param value
- */
- void setValue(double value);
+public class BooleanAndAggregationFunction extends BaseBooleanAggregationFunction {
- /**
- * Set the aggregation result value.
- * @param value
- */
- void setValue(Object value);
+ public BooleanAndAggregationFunction(ExpressionContext expression) {
+ this(expression, false);
+ }
- /**
- * Returns the 'primitive double' aggregation result.
- * @return
- */
- double getDoubleResult();
+ public BooleanAndAggregationFunction(ExpressionContext expression, boolean nullHandlingEnabled) {
+ super(expression, nullHandlingEnabled, BooleanMerge.AND);
+ }
- /**
- * Returns the result of aggregation.
- * @return
- */
- <T> T getResult();
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.BOOLAND;
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BooleanOrAggregationFunction.java
similarity index 55%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
copy to pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BooleanOrAggregationFunction.java
index 33e975c0ff..6ee96fad49 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BooleanOrAggregationFunction.java
@@ -16,35 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.query.aggregation;
-/**
- * Interface for ResultHolder to hold the result of aggregation.
- *
- */
-public interface AggregationResultHolder {
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
- /**
- * Set the 'primitive double' aggregation result.
- * @param value
- */
- void setValue(double value);
+public class BooleanOrAggregationFunction extends BaseBooleanAggregationFunction {
- /**
- * Set the aggregation result value.
- * @param value
- */
- void setValue(Object value);
+ public BooleanOrAggregationFunction(ExpressionContext expression) {
+ this(expression, false);
+ }
- /**
- * Returns the 'primitive double' aggregation result.
- * @return
- */
- double getDoubleResult();
+ protected BooleanOrAggregationFunction(ExpressionContext expression, boolean nullHandlingEnabled) {
+ super(expression, nullHandlingEnabled, BooleanMerge.OR);
+ }
- /**
- * Returns the result of aggregation.
- * @return
- */
- <T> T getResult();
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.BOOLOR;
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DoubleGroupByResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DoubleGroupByResultHolder.java
index 254c774046..60e1b8bb47 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DoubleGroupByResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DoubleGroupByResultHolder.java
@@ -80,6 +80,11 @@ public class DoubleGroupByResultHolder implements GroupByResultHolder {
}
}
+ @Override
+ public int getIntResult(int groupKey) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public <T> T getResult(int groupKey) {
throw new UnsupportedOperationException();
@@ -92,6 +97,11 @@ public class DoubleGroupByResultHolder implements GroupByResultHolder {
}
}
+ @Override
+ public void setValueForKey(int groupKey, int value) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void setValueForKey(int groupKey, Object newValue) {
throw new UnsupportedOperationException();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByResultHolder.java
index 7271ce138e..64017ec97a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByResultHolder.java
@@ -31,6 +31,14 @@ public interface GroupByResultHolder {
*/
void setValueForKey(int groupKey, double value);
+ /**
+ * Stores the given value (of type int) for the given groupKey.
+ *
+ * @param groupKey
+ * @param value
+ */
+ void setValueForKey(int groupKey, int value);
+
/**
* Store the given value (of type ResultType) for the given groupKey.
* @param groupKey
@@ -49,6 +57,17 @@ public interface GroupByResultHolder {
*/
double getDoubleResult(int groupKey);
+ /**
+ * Returns the result (int) for the given group by key.
+ * If the group key does not exist in the result holder, returns
+ * the defaultValue it was initialized with (default value of the aggregation
+ * function it is holding the result for).
+ *
+ * @param groupKey
+ * @return
+ */
+ int getIntResult(int groupKey);
+
/**
* Returns the result (ResultType) for the given group key.
* If the group key does not exist in the result holder, returns the
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DoubleGroupByResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/IntGroupByResultHolder.java
similarity index 80%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DoubleGroupByResultHolder.java
copy to pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/IntGroupByResultHolder.java
index 254c774046..3ae29f4580 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DoubleGroupByResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/IntGroupByResultHolder.java
@@ -16,21 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pinot.core.query.aggregation.groupby;
import com.google.common.base.Preconditions;
import java.util.Arrays;
-/**
- * Result Holder implemented using DoubleArray.
- */
-public class DoubleGroupByResultHolder implements GroupByResultHolder {
+public class IntGroupByResultHolder implements GroupByResultHolder {
+
private final int _maxCapacity;
- private final double _defaultValue;
+ private final int _defaultValue;
private int _resultHolderCapacity;
- private double[] _resultArray;
+ private int[] _resultArray;
/**
* Constructor for the class.
@@ -39,13 +38,13 @@ public class DoubleGroupByResultHolder implements GroupByResultHolder {
* @param maxCapacity Maximum capacity of the result holder
* @param defaultValue Default value of un-initialized results
*/
- public DoubleGroupByResultHolder(int initialCapacity, int maxCapacity, double defaultValue) {
+ public IntGroupByResultHolder(int initialCapacity, int maxCapacity, int defaultValue) {
_maxCapacity = maxCapacity;
_defaultValue = defaultValue;
_resultHolderCapacity = initialCapacity;
- _resultArray = new double[initialCapacity];
- if (defaultValue != 0.0) {
+ _resultArray = new int[initialCapacity];
+ if (defaultValue != 0) {
Arrays.fill(_resultArray, defaultValue);
}
}
@@ -61,11 +60,11 @@ public class DoubleGroupByResultHolder implements GroupByResultHolder {
// Cap the growth to maximum possible number of group keys
_resultHolderCapacity = Math.min(_resultHolderCapacity, _maxCapacity);
- double[] current = _resultArray;
- _resultArray = new double[_resultHolderCapacity];
+ int[] current = _resultArray;
+ _resultArray = new int[_resultHolderCapacity];
System.arraycopy(current, 0, _resultArray, 0, copyLength);
- if (_defaultValue != 0.0) {
+ if (_defaultValue != 0) {
Arrays.fill(_resultArray, copyLength, _resultHolderCapacity, _defaultValue);
}
}
@@ -73,6 +72,11 @@ public class DoubleGroupByResultHolder implements GroupByResultHolder {
@Override
public double getDoubleResult(int groupKey) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getIntResult(int groupKey) {
if (groupKey == GroupKeyGenerator.INVALID_ID) {
return _defaultValue;
} else {
@@ -87,6 +91,11 @@ public class DoubleGroupByResultHolder implements GroupByResultHolder {
@Override
public void setValueForKey(int groupKey, double newValue) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setValueForKey(int groupKey, int newValue) {
if (groupKey != GroupKeyGenerator.INVALID_ID) {
_resultArray[groupKey] = newValue;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/ObjectGroupByResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/ObjectGroupByResultHolder.java
index 1f807a9e7c..ebf5ec7b60 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/ObjectGroupByResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/ObjectGroupByResultHolder.java
@@ -65,6 +65,11 @@ public class ObjectGroupByResultHolder implements GroupByResultHolder {
throw new UnsupportedOperationException();
}
+ @Override
+ public int getIntResult(int groupKey) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
@SuppressWarnings("unchecked")
public <T> T getResult(int groupKey) {
@@ -82,6 +87,13 @@ public class ObjectGroupByResultHolder implements GroupByResultHolder {
}
}
+ @Override
+ public void setValueForKey(int groupKey, int newValue) {
+ if (groupKey != GroupKeyGenerator.INVALID_ID) {
+ _resultArray[groupKey] = newValue;
+ }
+ }
+
@SuppressWarnings("unchecked")
@Override
public void setValueForKey(int groupKey, Object newValue) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
index d40f17fce2..144949a0d5 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
@@ -444,6 +444,20 @@ public class AggregationFunctionFactoryTest {
assertEquals(aggregationFunction.getType(), AggregationFunctionType.PERCENTILETDIGESTMV);
assertEquals(aggregationFunction.getColumnName(), "percentileTDigest95.0MV_column");
assertEquals(aggregationFunction.getResultColumnName(), "percentiletdigestmv(column, 95.0)");
+
+ function = getFunction("bool_and");
+ aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof BooleanAndAggregationFunction);
+ assertEquals(aggregationFunction.getType(), AggregationFunctionType.BOOLAND);
+ assertEquals(aggregationFunction.getColumnName(), "boolAnd_column");
+ assertEquals(aggregationFunction.getResultColumnName(), "booland(column)");
+
+ function = getFunction("bool_or");
+ aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof BooleanOrAggregationFunction);
+ assertEquals(aggregationFunction.getType(), AggregationFunctionType.BOOLOR);
+ assertEquals(aggregationFunction.getColumnName(), "boolOr_column");
+ assertEquals(aggregationFunction.getResultColumnName(), "boolor(column)");
}
private FunctionContext getFunction(String functionName) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BooleanAggQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BooleanAggQueriesTest.java
new file mode 100644
index 0000000000..3e87d7446b
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BooleanAggQueriesTest.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.queries;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import org.apache.pinot.core.operator.query.GroupByOperator;
+import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.util.GapfillUtils;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class BooleanAggQueriesTest extends BaseQueriesTest {
+
+ private static final int NUM_RECORDS = 16;
+
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "BooleanAggQueriesTest");
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String SEGMENT_NAME = "testSegment";
+
+ private static final String BOOLEAN_COLUMN = "boolColumn";
+ private static final String GROUP_BY_COLUMN = "groupByColumn";
+
+ private static final Schema SCHEMA =
+ new Schema.SchemaBuilder()
+ .addSingleValueDimension(BOOLEAN_COLUMN, FieldSpec.DataType.BOOLEAN)
+ .addSingleValueDimension(GROUP_BY_COLUMN, FieldSpec.DataType.STRING).build();
+
+ private static final TableConfig TABLE_CONFIG =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
+
+ @DataProvider
+ public static Object[][] nullHandling() {
+ return new Object[][] {
+ new Object[]{true},
+ new Object[]{false},
+ };
+ }
+
+ @Override
+ protected String getFilter() {
+ return "";
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteDirectory(INDEX_DIR);
+
+ Object[][] recordContents = new Object[][] {
+ new Object[]{true, "allTrue"},
+ new Object[]{true, "allTrue"},
+ new Object[]{true, "allTrue"},
+ new Object[]{false, "allFalse"},
+ new Object[]{false, "allFalse"},
+ new Object[]{false, "allFalse"},
+ new Object[]{true, "mixedOne"},
+ new Object[]{true, "mixedOne"},
+ new Object[]{false, "mixedOne"},
+ new Object[]{false, "mixedTwo"},
+ new Object[]{true, "mixedTwo"},
+ new Object[]{false, "mixedTwo"},
+ new Object[]{null, "withNulls"},
+ new Object[]{true, "withNulls"},
+ new Object[]{false, "withNulls"},
+ new Object[]{null, "onlyNulls"},
+ };
+
+ List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+ for (Object[] record : recordContents) {
+ GenericRow genericRow = new GenericRow();
+ genericRow.putValue(BOOLEAN_COLUMN, record[0]);
+ genericRow.putValue(GROUP_BY_COLUMN, record[1]);
+ records.add(genericRow);
+ }
+
+ ImmutableSegment immutableSegment = setUpSingleSegment(records, SEGMENT_NAME);
+ _indexSegment = immutableSegment;
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+ }
+
+ private ImmutableSegment setUpSingleSegment(List<GenericRow> recordSet, String segmentName)
+ throws Exception {
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+ segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+ segmentGeneratorConfig.setSegmentName(segmentName);
+ segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+ segmentGeneratorConfig.setNullHandlingEnabled(true);
+
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig, new GenericRowRecordReader(recordSet));
+ driver.build();
+
+ return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.mmap);
+ }
+
+ protected <T extends Operator> T getOperator(String query, boolean enableNullHandling) {
+ PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+
+ pinotQuery.setQueryOptions(
+ ImmutableMap.of(
+ CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING,
+ String.valueOf(enableNullHandling)));
+
+ PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
+ QueryContext queryContext = QueryContextConverterUtils.getQueryContext(serverPinotQuery);
+ return (T) PLAN_MAKER.makeSegmentPlanNode(getIndexSegment(), queryContext).run();
+ }
+
+ @Test(dataProvider = "nullHandling")
+ public void testBooleanAnd(boolean isNullHandlingEnabled) {
+ // Given:
+ String query = "SELECT BOOL_AND(boolColumn) FROM testTable GROUP BY groupByColumn";
+ GroupByOperator operator = getOperator(query, isNullHandlingEnabled);
+
+ // When:
+ GroupByResultsBlock result = operator.nextBlock();
+
+ // Then;
+ QueriesTestUtils.testInnerSegmentExecutionStatistics(
+ // multiply values by two because we copy the segments into two different segments
+ operator.getExecutionStatistics(), NUM_RECORDS, 0, NUM_RECORDS * 2, NUM_RECORDS);
+ AggregationGroupByResult aggregates = result.getAggregationGroupByResult();
+ Iterator<GroupKeyGenerator.GroupKey> keys = aggregates.getGroupKeyIterator();
+ while (keys.hasNext()) {
+ GroupKeyGenerator.GroupKey key = keys.next();
+ switch ((String) key._keys[0]) {
+ case "allFalse":
+ case "mixedOne":
+ case "mixedTwo":
+ case "withNulls":
+ Assert.assertEquals(aggregates.getResultForGroupId(0, key._groupId), 0);
+ break;
+ case "allTrue":
+ Assert.assertEquals(aggregates.getResultForGroupId(0, key._groupId), 1);
+ break;
+ case "onlyNulls":
+ Assert.assertEquals(aggregates.getResultForGroupId(0, key._groupId), isNullHandlingEnabled ? null : 0);
+ break;
+ default:
+ throw new IllegalStateException("Unexpected grouping: " + key._keys[0]);
+ }
+ }
+ }
+
+ @Test(dataProvider = "nullHandling")
+ public void testBooleanOr(boolean isNullHandlingEnabled) {
+ // Given:
+ String query = "SELECT BOOL_OR(boolColumn) FROM testTable GROUP BY groupByColumn";
+ GroupByOperator operator = getOperator(query, isNullHandlingEnabled);
+
+ // When:
+ GroupByResultsBlock result = operator.nextBlock();
+
+ // Then;
+ QueriesTestUtils.testInnerSegmentExecutionStatistics(
+ // multiply values by two because we copy the segments into two different segments
+ operator.getExecutionStatistics(), NUM_RECORDS, 0, NUM_RECORDS * 2, NUM_RECORDS);
+ AggregationGroupByResult aggregates = result.getAggregationGroupByResult();
+ Iterator<GroupKeyGenerator.GroupKey> keys = aggregates.getGroupKeyIterator();
+ while (keys.hasNext()) {
+ GroupKeyGenerator.GroupKey key = keys.next();
+ switch ((String) key._keys[0]) {
+ case "mixedOne":
+ case "mixedTwo":
+ case "allTrue":
+ case "withNulls":
+ Assert.assertEquals(aggregates.getResultForGroupId(0, key._groupId), 1);
+ break;
+ case "allFalse":
+ Assert.assertEquals(aggregates.getResultForGroupId(0, key._groupId), 0);
+ break;
+ case "onlyNulls":
+ Assert.assertEquals(aggregates.getResultForGroupId(0, key._groupId), isNullHandlingEnabled ? null : 0);
+ break;
+ default:
+ throw new IllegalStateException("Unexpected grouping: " + key._keys[0]);
+ }
+ }
+ }
+
+ @Test(dataProvider = "nullHandling")
+ public void testBooleanAndNoGroupBy(boolean isNullHandlingEnabled) {
+ // Given:
+ String query = "SELECT BOOL_AND(boolColumn) FROM testTable WHERE groupByColumn = 'allTrue'";
+ AggregationOperator operator = getOperator(query, isNullHandlingEnabled);
+
+ // When:
+ AggregationResultsBlock result = operator.nextBlock();
+
+ // Then;
+ List<Object> aggregates = result.getResults();
+ Assert.assertEquals(aggregates.size(), 1);
+ Assert.assertEquals(aggregates.get(0), 1);
+ }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
index 5545798b48..6b826ab04b 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
@@ -70,7 +70,7 @@ public class PinotAggregateExchangeNodeInsertRule extends RelOptRule {
public static final PinotAggregateExchangeNodeInsertRule INSTANCE =
new PinotAggregateExchangeNodeInsertRule(PinotRuleUtils.PINOT_REL_FACTORY);
private static final Set<SqlKind> SUPPORTED_AGG_KIND = ImmutableSet.of(
- SqlKind.SUM, SqlKind.SUM0, SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT);
+ SqlKind.SUM, SqlKind.SUM0, SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT, SqlKind.OTHER_FUNCTION);
public PinotAggregateExchangeNodeInsertRule(RelBuilderFactory factory) {
super(operand(LogicalAggregate.class, any()), factory, null);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
similarity index 56%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
copy to pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
index 33e975c0ff..3610ce0c4d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
@@ -16,35 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.query.aggregation;
-/**
- * Interface for ResultHolder to hold the result of aggregation.
- *
- */
-public interface AggregationResultHolder {
+package org.apache.calcite.sql.fun;
- /**
- * Set the 'primitive double' aggregation result.
- * @param value
- */
- void setValue(double value);
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.util.Optionality;
- /**
- * Set the aggregation result value.
- * @param value
- */
- void setValue(Object value);
- /**
- * Returns the 'primitive double' aggregation result.
- * @return
- */
- double getDoubleResult();
+public class PinotBoolAndAggregateFunction extends SqlAggFunction {
- /**
- * Returns the result of aggregation.
- * @return
- */
- <T> T getResult();
+ public PinotBoolAndAggregateFunction() {
+ super("BOOL_AND", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
+ null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+ false, false, Optionality.FORBIDDEN);
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
similarity index 56%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
copy to pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
index 33e975c0ff..3d336c1070 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
@@ -16,35 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.query.aggregation;
-/**
- * Interface for ResultHolder to hold the result of aggregation.
- *
- */
-public interface AggregationResultHolder {
+package org.apache.calcite.sql.fun;
- /**
- * Set the 'primitive double' aggregation result.
- * @param value
- */
- void setValue(double value);
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.util.Optionality;
- /**
- * Set the aggregation result value.
- * @param value
- */
- void setValue(Object value);
- /**
- * Returns the 'primitive double' aggregation result.
- * @return
- */
- double getDoubleResult();
+public class PinotBoolOrAggregateFunction extends SqlAggFunction {
- /**
- * Returns the result of aggregation.
- * @return
- */
- <T> T getResult();
+ public PinotBoolOrAggregateFunction() {
+ super("BOOL_OR", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
+ null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+ false, false, Optionality.FORBIDDEN);
+ }
}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java
index ff37a2fbff..3d2ba325bf 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java
@@ -21,6 +21,7 @@ package org.apache.calcite.sql.fun;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
+import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.validate.SqlNameMatchers;
@@ -44,6 +45,8 @@ public class PinotOperatorTable extends SqlStdOperatorTable {
private static @MonotonicNonNull PinotOperatorTable _instance;
public static final SqlFunction COALESCE = new PinotSqlCoalesceFunction();
+ public static final SqlAggFunction BOOL_AND = new PinotBoolAndAggregateFunction();
+ public static final SqlAggFunction BOOL_OR = new PinotBoolOrAggregateFunction();
// TODO: clean up lazy init by using Suppliers.memorized(this::computeInstance) and make getter wrapped around
// supplier instance. this should replace all lazy init static objects in the codebase
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 7389a70106..87bea8df1c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -208,6 +208,14 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> {
return ((Number) left).doubleValue() + 1;
}
+ private static Boolean mergeBoolAnd(Object left, Object right) {
+ return ((Boolean) left) && ((Boolean) right);
+ }
+
+ private static Boolean mergeBoolOr(Object left, Object right) {
+ return ((Boolean) left) || ((Boolean) right);
+ }
+
private static Key extraRowKey(Object[] row, List<RexExpression> groupSet) {
Object[] keyElements = new Object[groupSet.size()];
for (int i = 0; i < groupSet.size(); i++) {
@@ -233,6 +241,12 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> {
.put("$MAX", AggregateOperator::mergeMax)
.put("$MAX0", AggregateOperator::mergeMax)
.put("COUNT", AggregateOperator::mergeCount)
+ .put("BOOL_AND", AggregateOperator::mergeBoolAnd)
+ .put("$BOOL_AND", AggregateOperator::mergeBoolAnd)
+ .put("$BOOL_AND0", AggregateOperator::mergeBoolAnd)
+ .put("BOOL_OR", AggregateOperator::mergeBoolOr)
+ .put("$BOOL_OR", AggregateOperator::mergeBoolOr)
+ .put("$BOOL_OR0", AggregateOperator::mergeBoolOr)
.build();
final int _inputRef;
diff --git a/pinot-query-runtime/src/test/resources/queries/Aggregates.json b/pinot-query-runtime/src/test/resources/queries/Aggregates.json
index 25986b8501..b82be2cb5e 100644
--- a/pinot-query-runtime/src/test/resources/queries/Aggregates.json
+++ b/pinot-query-runtime/src/test/resources/queries/Aggregates.json
@@ -3,66 +3,19 @@
"tables": {
"tbl": {
"schema": [
- {
- "name": "int_col",
- "type": "INT"
- },
- {
- "name": "double_col",
- "type": "DOUBLE"
- },
- {
- "name": "string_col",
- "type": "STRING"
- },
- {
- "name": "bool_col",
- "type": "BOOLEAN"
- }
+ { "name": "int_col", "type": "INT" },
+ { "name": "double_col", "type": "DOUBLE" },
+ { "name": "string_col", "type": "STRING" },
+ { "name": "bool_col", "type": "BOOLEAN" }
],
"inputs": [
- [
- 2,
- 300,
- "a",
- true
- ],
- [
- 2,
- 400,
- "a",
- false
- ],
- [
- 3,
- 100,
- "b",
- true
- ],
- [
- 100,
- 1,
- "b",
- false
- ],
- [
- 101,
- 1.01,
- "c",
- false
- ],
- [
- 150,
- 1.5,
- "c",
- false
- ],
- [
- 175,
- 1.75,
- "c",
- true
- ]
+ [ 2, 300, "a", true ],
+ [ 2, 400, "a", true ],
+ [ 3, 100, "b", false ],
+ [ 100, 1, "b", false ],
+ [ 101, 1.01, "c", false ],
+ [ 150, 1.5, "c", false ],
+ [ 175, 1.75, "c", true ]
]
}
},
@@ -164,6 +117,29 @@
"comments": "class java.lang.Double cannot be cast to class java.lang.Boolean",
"description": "aggregate boolean column",
"sql": "SELECT min(bool_col) FROM {tbl}"
+ },
+ {
+ "psql": "9.21.0",
+ "description": "aggregate boolean column",
+ "sql": "SELECT bool_and(bool_col), bool_or(bool_col) FROM {tbl} GROUP BY string_col"
+ },
+ {
+ "psql": "9.21.0",
+ "description": "aggregate boolean column no group by",
+ "sql": "SELECT bool_and(bool_col), bool_or(bool_col) FROM {tbl}"
+ },
+ {
+ "ignored": true,
+ "comment": "issue with converting data types: Unexpected RelDataTypeField: ANY for column: EXPR$0",
+ "psql": "9.21.0",
+ "description": "aggregate boolean column no group by with inner function",
+ "sql": "SELECT bool_and(startsWith(string_col, 'a')), bool_or(startsWith(string_col, 'a')) FROM {tbl}"
+ },
+ {
+ "ignored": true,
+ "comment": "issue with converting data types: Unexpected RelDataTypeField: ANY for column: EXPR$0",
+ "description": "sum with inner function",
+ "sql": "SELECT sum(pow(int_col, 2)) FROM {tbl}"
}
]
},
@@ -171,78 +147,21 @@
"tables": {
"tbl": {
"schema": [
- {
- "name": "int_col",
- "type": "INT"
- },
- {
- "name": "double_col",
- "type": "DOUBLE"
- },
- {
- "name": "string_col",
- "type": "STRING"
- },
- {
- "name": "bool_col",
- "type": "BOOLEAN"
- }
+ { "name": "int_col", "type": "INT" },
+ { "name": "double_col", "type": "DOUBLE" },
+ { "name": "string_col", "type": "STRING" },
+ { "name": "bool_col", "type": "BOOLEAN" }
],
"inputs": [
- [
- 2,
- 300,
- "a",
- false
- ],
- [
- 2,
- 400,
- "a",
- true
- ],
- [
- 3,
- 100,
- "b",
- true
- ],
- [
- 0.001,
- 1,
- "b",
- false
- ],
- [
- 101,
- 1.01,
- "c",
- false
- ],
- [
- 150,
- 1.5,
- "c",
- true
- ],
- [
- 175,
- 1.75,
- "c",
- true
- ],
- [
- -10000,
- 1.75,
- "c",
- false
- ],
- [
- -2,
- 0.5,
- "c",
- false
- ]
+ [ 2, 300, "a", false ],
+ [ 2, 400, "a", true ],
+ [ 3, 100, "b", true ],
+ [ 0.001, 1, "b", false ],
+ [ 101, 1.01, "c", false ],
+ [ 150, 1.5, "c", true ],
+ [ 175, 1.75, "c", true ],
+ [ -10000, 1.75, "c", false ],
+ [ -2, 0.5, "c", false ]
]
}
},
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 48a2199918..19d2f4d6e5 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -79,7 +79,11 @@ public enum AggregationFunctionType {
PERCENTILERAWESTMV("percentileRawEstMV"),
PERCENTILETDIGESTMV("percentileTDigestMV"),
PERCENTILERAWTDIGESTMV("percentileRawTDigestMV"),
- DISTINCT("distinct");
+ DISTINCT("distinct"),
+
+ // boolean aggregate functions
+ BOOLAND("boolAnd"),
+ BOOLOR("boolOr");
private static final Set<String> NAMES = Arrays.stream(values()).flatMap(func -> Stream.of(func.name(),
func.getName(), func.getName().toLowerCase())).collect(Collectors.toSet());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org