You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/12/01 01:24:18 UTC

[pinot] branch master updated: support BOOL_AND and BOOL_OR aggregate functions (#9848)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a6d84391d support BOOL_AND and BOOL_OR aggregate functions (#9848)
0a6d84391d is described below

commit 0a6d84391dcbc9924b2b593e53121f981f1561db
Author: Almog Gavra <al...@gmail.com>
AuthorDate: Wed Nov 30 17:24:11 2022 -0800

    support BOOL_AND and BOOL_OR aggregate functions (#9848)
    
    * support BOOL_AND and BOOL_OR aggregate functions
    * inline null handling and add LongResultHolders
    * fix null handling and add null tests
    * change Long to Int and a few other minor changes
---
 .../query/aggregation/AggregationResultHolder.java |  13 ++
 .../aggregation/DoubleAggregationResultHolder.java |  15 ++
 ...ltHolder.java => IntAggregateResultHolder.java} |  44 ++--
 .../aggregation/ObjectAggregationResultHolder.java |  16 ++
 .../function/AggregationFunctionFactory.java       |   4 +
 .../function/BaseBooleanAggregationFunction.java   | 252 ++++++++++++++++++++
 .../BooleanAndAggregationFunction.java}            |  42 ++--
 .../BooleanOrAggregationFunction.java}             |  42 ++--
 .../groupby/DoubleGroupByResultHolder.java         |  10 +
 .../aggregation/groupby/GroupByResultHolder.java   |  19 ++
 ...sultHolder.java => IntGroupByResultHolder.java} |  33 ++-
 .../groupby/ObjectGroupByResultHolder.java         |  12 +
 .../function/AggregationFunctionFactoryTest.java   |  14 ++
 .../pinot/queries/BooleanAggQueriesTest.java       | 257 +++++++++++++++++++++
 .../PinotAggregateExchangeNodeInsertRule.java      |   2 +-
 .../sql/fun/PinotBoolAndAggregateFunction.java     |  39 ++--
 .../sql/fun/PinotBoolOrAggregateFunction.java      |  39 ++--
 .../apache/calcite/sql/fun/PinotOperatorTable.java |   3 +
 .../query/runtime/operator/AggregateOperator.java  |  14 ++
 .../src/test/resources/queries/Aggregates.json     | 175 ++++----------
 .../pinot/segment/spi/AggregationFunctionType.java |   6 +-
 21 files changed, 776 insertions(+), 275 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
index 33e975c0ff..29a3b7e93d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
@@ -30,6 +30,12 @@ public interface AggregationResultHolder {
    */
   void setValue(double value);
 
+  /**
+   * Set the 'primitive int' aggregation result.
+   * @param value
+   */
+  void setValue(int value);
+
   /**
    * Set the aggregation result value.
    * @param value
@@ -42,6 +48,13 @@ public interface AggregationResultHolder {
    */
   double getDoubleResult();
 
+  /**
+   * Returns the 'primitive int' aggregation result.
+   *
+   * @return
+   */
+  int getIntResult();
+
   /**
    * Returns the result of aggregation.
    * @return
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DoubleAggregationResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DoubleAggregationResultHolder.java
index 9cff121797..7962b246d7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DoubleAggregationResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DoubleAggregationResultHolder.java
@@ -42,6 +42,11 @@ public class DoubleAggregationResultHolder implements AggregationResultHolder {
     _value = value;
   }
 
+  @Override
+  public void setValue(int value) {
+    throw new RuntimeException("Method 'setValue' (with int value) not supported for class " + getClass().getName());
+  }
+
   /**
    * {@inheritDoc}
    * Value for this class is 'primitive double', so this method is not implemented.
@@ -61,6 +66,16 @@ public class DoubleAggregationResultHolder implements AggregationResultHolder {
     return _value;
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @return
+   */
+  @Override
+  public int getIntResult() {
+    throw new RuntimeException("Method 'getIntResult' not supported for class " + getClass().getName());
+  }
+
   /**
    * {@inheritDoc}
    * Result for this class is 'primitive double', so this method is not implemented.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DoubleAggregationResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/IntAggregateResultHolder.java
similarity index 66%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DoubleAggregationResultHolder.java
copy to pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/IntAggregateResultHolder.java
index 9cff121797..78d29a204b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DoubleAggregationResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/IntAggregateResultHolder.java
@@ -16,56 +16,42 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pinot.core.query.aggregation;
 
-/**
- * AggregationResultHolder interface implementation for result type 'primitive double'.
- *
- */
-public class DoubleAggregationResultHolder implements AggregationResultHolder {
-  double _value;
+public class IntAggregateResultHolder implements AggregationResultHolder {
+
+  int _value;
 
-  /**
-   * Constructor for the class.
-   * @param defaultValue
-   */
-  public DoubleAggregationResultHolder(double defaultValue) {
+  public IntAggregateResultHolder(int defaultValue) {
     _value = defaultValue;
   }
 
-  /**
-   * {@inheritDoc}
-   * @param value
-   */
   @Override
   public void setValue(double value) {
+    throw new RuntimeException("Method 'setValue' (with double value) not supported for class " + getClass().getName());
+  }
+
+  @Override
+  public void setValue(int value) {
     _value = value;
   }
 
-  /**
-   * {@inheritDoc}
-   * Value for this class is 'primitive double', so this method is not implemented.
-   * @param value
-   */
   @Override
   public void setValue(Object value) {
     throw new RuntimeException("Method 'setValue' (with object value) not supported for class " + getClass().getName());
   }
 
-  /**
-   * {@inheritDoc}
-   * @return
-   */
   @Override
   public double getDoubleResult() {
+    throw new RuntimeException("Method 'getDoubleResult' not supported for class " + getClass().getName());
+  }
+
+  @Override
+  public int getIntResult() {
     return _value;
   }
 
-  /**
-   * {@inheritDoc}
-   * Result for this class is 'primitive double', so this method is not implemented.
-   * @return
-   */
   @Override
   public <T> T getResult() {
     throw new RuntimeException("Method 'getResult' not supported for class " + getClass().getName());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/ObjectAggregationResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/ObjectAggregationResultHolder.java
index fdca760d13..817ea086f5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/ObjectAggregationResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/ObjectAggregationResultHolder.java
@@ -42,6 +42,12 @@ public class ObjectAggregationResultHolder implements AggregationResultHolder {
     _value = value;
   }
 
+
+  @Override
+  public void setValue(int value) {
+    _value = value;
+  }
+
   /**
    * {@inheritDoc}
    * @return
@@ -51,6 +57,16 @@ public class ObjectAggregationResultHolder implements AggregationResultHolder {
     throw new RuntimeException("Method 'getDoubleResult' not supported for class " + getClass().getName());
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @return
+   */
+  @Override
+  public int getIntResult() {
+    throw new RuntimeException("Method 'getIntResult' not supported for class " + getClass().getName());
+  }
+
   /**
    * {@inheritDoc}
    * @return
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 3eb3a4f5ad..4e1bda2025 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -273,6 +273,10 @@ public class AggregationFunctionFactory {
             return new CovarianceAggregationFunction(arguments, false);
           case COVARSAMP:
             return new CovarianceAggregationFunction(arguments, true);
+          case BOOLAND:
+            return new BooleanAndAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
+          case BOOLOR:
+            return new BooleanOrAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
           default:
             throw new IllegalArgumentException();
         }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseBooleanAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseBooleanAggregationFunction.java
new file mode 100644
index 0000000000..c295d4838f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseBooleanAggregationFunction.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.IntAggregateResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.IntGroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+// TODO: change this to implement BaseSingleInputAggregationFunction<Boolean, Boolean> when we get proper
+// handling of booleans in serialization - today this would fail because ColumnDataType#convert assumes
+// that the boolean is encoded as its stored type (an integer)
+public abstract class BaseBooleanAggregationFunction extends BaseSingleInputAggregationFunction<Integer, Integer> {
+
+  private final BooleanMerge _merger;
+  private final boolean _nullHandlingEnabled;
+
+  protected enum BooleanMerge {
+    AND {
+      @Override
+      int merge(int left, int right) {
+        return left & right;
+      }
+
+      @Override
+      boolean isTerminal(int agg) {
+        return agg == 0;
+      }
+
+      @Override
+      int getDefaultValue() {
+        return 1;
+      }
+    },
+    OR {
+      @Override
+      int merge(int left, int right) {
+        return left | right;
+      }
+
+      @Override
+      boolean isTerminal(int agg) {
+        return agg > 0;
+      }
+
+      @Override
+      int getDefaultValue() {
+        return 0;
+      }
+    };
+
+    abstract int merge(int left, int right);
+
+    abstract boolean isTerminal(int agg);
+
+    abstract int getDefaultValue();
+  }
+
+  protected BaseBooleanAggregationFunction(ExpressionContext expression, boolean nullHandlingEnabled,
+      BooleanMerge merger) {
+    super(expression);
+    _nullHandlingEnabled = nullHandlingEnabled;
+    _merger = merger;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return _nullHandlingEnabled
+        ? new ObjectAggregationResultHolder()
+        : new IntAggregateResultHolder(_merger.getDefaultValue());
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return _nullHandlingEnabled
+        ? new ObjectGroupByResultHolder(initialCapacity, maxCapacity)
+        : new IntGroupByResultHolder(initialCapacity, maxCapacity, _merger.getDefaultValue());
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    if (blockValSet.getValueType() != FieldSpec.DataType.BOOLEAN) {
+      throw new IllegalArgumentException(
+          String.format("Unsupported data type %s for %s", getType().getName(), blockValSet.getValueType()));
+    }
+
+    int[] bools = blockValSet.getIntValuesSV();
+    if (_nullHandlingEnabled) {
+      int agg = getInt(aggregationResultHolder.getResult());
+
+      // early terminate on a per-block level to allow the
+      // loop below to be more tightly optimized (avoid a branch)
+      if (_merger.isTerminal(agg)) {
+        return;
+      }
+
+      RoaringBitmap nullBitmap = blockValSet.getNullBitmap();
+      if (nullBitmap == null) {
+        nullBitmap = new RoaringBitmap();
+      } else if (nullBitmap.getCardinality() > length) {
+        return;
+      }
+
+      for (int i = 0; i < length; i++) {
+        if (!nullBitmap.contains(i)) {
+          agg = _merger.merge(agg, bools[i]);
+          aggregationResultHolder.setValue((Object) agg);
+        }
+      }
+    } else {
+      int agg = aggregationResultHolder.getIntResult();
+
+      // early terminate on a per-block level to allow the
+      // loop below to be more tightly optimized (avoid a branch)
+      if (_merger.isTerminal(agg)) {
+        return;
+      }
+
+      for (int i = 0; i < length; i++) {
+        agg = _merger.merge(agg, bools[i]);
+        aggregationResultHolder.setValue(agg);
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    if (blockValSet.getValueType() != FieldSpec.DataType.BOOLEAN) {
+      throw new IllegalArgumentException(
+          String.format("Unsupported data type %s for %s", getType().getName(), blockValSet.getValueType()));
+    }
+
+    int[] bools = blockValSet.getIntValuesSV();
+    if (_nullHandlingEnabled) {
+      RoaringBitmap nullBitmap = blockValSet.getNullBitmap();
+      if (nullBitmap == null) {
+        nullBitmap = new RoaringBitmap();
+      } else if (nullBitmap.getCardinality() > length) {
+        return;
+      }
+
+      for (int i = 0; i < length; i++) {
+        if (!nullBitmap.contains(i)) {
+          int groupByKey = groupKeyArray[i];
+          int agg = getInt(groupByResultHolder.getResult(groupByKey));
+          groupByResultHolder.setValueForKey(groupByKey, (Object) _merger.merge(agg, bools[i]));
+        }
+      }
+    } else {
+      for (int i = 0; i < length; i++) {
+        int groupByKey = groupKeyArray[i];
+        int agg = groupByResultHolder.getIntResult(groupByKey);
+        groupByResultHolder.setValueForKey(groupByKey, _merger.merge(agg, bools[i]));
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    int[] valueArray = blockValSetMap.get(_expression).getIntValuesSV();
+
+    for (int i = 0; i < length; i++) {
+      for (int groupKey : groupKeysArray[i]) {
+        int agg = groupByResultHolder.getIntResult(groupKey);
+        groupByResultHolder.setValueForKey(groupKey, _merger.merge(agg, valueArray[i]));
+      }
+    }
+  }
+
+  @Override
+  public Integer extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    if (_nullHandlingEnabled) {
+      return aggregationResultHolder.getResult();
+    } else {
+      return aggregationResultHolder.getIntResult();
+    }
+  }
+
+  @Override
+  public Integer extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+    if (_nullHandlingEnabled) {
+      return groupByResultHolder.getResult(groupKey);
+    } else {
+      return groupByResultHolder.getIntResult(groupKey);
+    }
+  }
+
+  @Override
+  public Integer merge(Integer intermediateResult1, Integer intermediateResult2) {
+    if (_nullHandlingEnabled) {
+      if (intermediateResult1 == null) {
+        return intermediateResult2;
+      } else if (intermediateResult2 == null) {
+        return intermediateResult1;
+      }
+    }
+
+    return _merger.merge(intermediateResult1, intermediateResult2);
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+    return DataSchema.ColumnDataType.BOOLEAN;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.BOOLEAN;
+  }
+
+  @Override
+  public Integer extractFinalResult(Integer intermediateResult) {
+    return intermediateResult;
+  }
+
+  private int getInt(Integer val) {
+    return val == null ? _merger.getDefaultValue() : val;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BooleanAndAggregationFunction.java
similarity index 55%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
copy to pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BooleanAndAggregationFunction.java
index 33e975c0ff..c385e1dcfb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BooleanAndAggregationFunction.java
@@ -16,35 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.query.aggregation;
 
-/**
- * Interface for ResultHolder to hold the result of aggregation.
- *
- */
-public interface AggregationResultHolder {
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
 
-  /**
-   * Set the 'primitive double' aggregation result.
-   * @param value
-   */
-  void setValue(double value);
+public class BooleanAndAggregationFunction extends BaseBooleanAggregationFunction {
 
-  /**
-   * Set the aggregation result value.
-   * @param value
-   */
-  void setValue(Object value);
+  public BooleanAndAggregationFunction(ExpressionContext expression) {
+    this(expression, false);
+  }
 
-  /**
-   * Returns the 'primitive double' aggregation result.
-   * @return
-   */
-  double getDoubleResult();
+  public BooleanAndAggregationFunction(ExpressionContext expression, boolean nullHandlingEnabled) {
+    super(expression, nullHandlingEnabled, BooleanMerge.AND);
+  }
 
-  /**
-   * Returns the result of aggregation.
-   * @return
-   */
-  <T> T getResult();
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.BOOLAND;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BooleanOrAggregationFunction.java
similarity index 55%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
copy to pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BooleanOrAggregationFunction.java
index 33e975c0ff..6ee96fad49 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BooleanOrAggregationFunction.java
@@ -16,35 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.query.aggregation;
 
-/**
- * Interface for ResultHolder to hold the result of aggregation.
- *
- */
-public interface AggregationResultHolder {
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
 
-  /**
-   * Set the 'primitive double' aggregation result.
-   * @param value
-   */
-  void setValue(double value);
+public class BooleanOrAggregationFunction extends BaseBooleanAggregationFunction {
 
-  /**
-   * Set the aggregation result value.
-   * @param value
-   */
-  void setValue(Object value);
+  public BooleanOrAggregationFunction(ExpressionContext expression) {
+    this(expression, false);
+  }
 
-  /**
-   * Returns the 'primitive double' aggregation result.
-   * @return
-   */
-  double getDoubleResult();
+  protected BooleanOrAggregationFunction(ExpressionContext expression, boolean nullHandlingEnabled) {
+    super(expression, nullHandlingEnabled, BooleanMerge.OR);
+  }
 
-  /**
-   * Returns the result of aggregation.
-   * @return
-   */
-  <T> T getResult();
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.BOOLOR;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DoubleGroupByResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DoubleGroupByResultHolder.java
index 254c774046..60e1b8bb47 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DoubleGroupByResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DoubleGroupByResultHolder.java
@@ -80,6 +80,11 @@ public class DoubleGroupByResultHolder implements GroupByResultHolder {
     }
   }
 
+  @Override
+  public int getIntResult(int groupKey) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public <T> T getResult(int groupKey) {
     throw new UnsupportedOperationException();
@@ -92,6 +97,11 @@ public class DoubleGroupByResultHolder implements GroupByResultHolder {
     }
   }
 
+  @Override
+  public void setValueForKey(int groupKey, int value) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public void setValueForKey(int groupKey, Object newValue) {
     throw new UnsupportedOperationException();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByResultHolder.java
index 7271ce138e..64017ec97a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByResultHolder.java
@@ -31,6 +31,14 @@ public interface GroupByResultHolder {
    */
   void setValueForKey(int groupKey, double value);
 
+  /**
+   * Stores the given value (of type int) for the given groupKey.
+   *
+   * @param groupKey
+   * @param value
+   */
+  void setValueForKey(int groupKey, int value);
+
   /**
    * Store the given value (of type ResultType) for the given groupKey.
    * @param groupKey
@@ -49,6 +57,17 @@ public interface GroupByResultHolder {
    */
   double getDoubleResult(int groupKey);
 
+  /**
+   * Returns the result (int) for the given group by key.
+   * If the group key does not exist in the result holder, returns
+   * the defaultValue it was initialized with (default value of the aggregation
+   * function it is holding the result for).
+   *
+   * @param groupKey
+   * @return
+   */
+  int getIntResult(int groupKey);
+
   /**
    * Returns the result (ResultType) for the given group key.
    * If the group key does not exist in the result holder, returns the
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DoubleGroupByResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/IntGroupByResultHolder.java
similarity index 80%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DoubleGroupByResultHolder.java
copy to pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/IntGroupByResultHolder.java
index 254c774046..3ae29f4580 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DoubleGroupByResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/IntGroupByResultHolder.java
@@ -16,21 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pinot.core.query.aggregation.groupby;
 
 import com.google.common.base.Preconditions;
 import java.util.Arrays;
 
 
-/**
- * Result Holder implemented using DoubleArray.
- */
-public class DoubleGroupByResultHolder implements GroupByResultHolder {
+public class IntGroupByResultHolder implements GroupByResultHolder {
+
   private final int _maxCapacity;
-  private final double _defaultValue;
+  private final int _defaultValue;
 
   private int _resultHolderCapacity;
-  private double[] _resultArray;
+  private int[] _resultArray;
 
   /**
    * Constructor for the class.
@@ -39,13 +38,13 @@ public class DoubleGroupByResultHolder implements GroupByResultHolder {
    * @param maxCapacity Maximum capacity of the result holder
    * @param defaultValue Default value of un-initialized results
    */
-  public DoubleGroupByResultHolder(int initialCapacity, int maxCapacity, double defaultValue) {
+  public IntGroupByResultHolder(int initialCapacity, int maxCapacity, int defaultValue) {
     _maxCapacity = maxCapacity;
     _defaultValue = defaultValue;
 
     _resultHolderCapacity = initialCapacity;
-    _resultArray = new double[initialCapacity];
-    if (defaultValue != 0.0) {
+    _resultArray = new int[initialCapacity];
+    if (defaultValue != 0) {
       Arrays.fill(_resultArray, defaultValue);
     }
   }
@@ -61,11 +60,11 @@ public class DoubleGroupByResultHolder implements GroupByResultHolder {
       // Cap the growth to maximum possible number of group keys
       _resultHolderCapacity = Math.min(_resultHolderCapacity, _maxCapacity);
 
-      double[] current = _resultArray;
-      _resultArray = new double[_resultHolderCapacity];
+      int[] current = _resultArray;
+      _resultArray = new int[_resultHolderCapacity];
       System.arraycopy(current, 0, _resultArray, 0, copyLength);
 
-      if (_defaultValue != 0.0) {
+      if (_defaultValue != 0) {
         Arrays.fill(_resultArray, copyLength, _resultHolderCapacity, _defaultValue);
       }
     }
@@ -73,6 +72,11 @@ public class DoubleGroupByResultHolder implements GroupByResultHolder {
 
   @Override
   public double getDoubleResult(int groupKey) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getIntResult(int groupKey) {
     if (groupKey == GroupKeyGenerator.INVALID_ID) {
       return _defaultValue;
     } else {
@@ -87,6 +91,11 @@ public class DoubleGroupByResultHolder implements GroupByResultHolder {
 
   @Override
   public void setValueForKey(int groupKey, double newValue) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setValueForKey(int groupKey, int newValue) {
     if (groupKey != GroupKeyGenerator.INVALID_ID) {
       _resultArray[groupKey] = newValue;
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/ObjectGroupByResultHolder.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/ObjectGroupByResultHolder.java
index 1f807a9e7c..ebf5ec7b60 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/ObjectGroupByResultHolder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/ObjectGroupByResultHolder.java
@@ -65,6 +65,11 @@ public class ObjectGroupByResultHolder implements GroupByResultHolder {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public int getIntResult(int groupKey) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public <T> T getResult(int groupKey) {
@@ -82,6 +87,13 @@ public class ObjectGroupByResultHolder implements GroupByResultHolder {
     }
   }
 
+  @Override
+  public void setValueForKey(int groupKey, int newValue) {
+    if (groupKey != GroupKeyGenerator.INVALID_ID) {
+      _resultArray[groupKey] = newValue;
+    }
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public void setValueForKey(int groupKey, Object newValue) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
index d40f17fce2..144949a0d5 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
@@ -444,6 +444,20 @@ public class AggregationFunctionFactoryTest {
     assertEquals(aggregationFunction.getType(), AggregationFunctionType.PERCENTILETDIGESTMV);
     assertEquals(aggregationFunction.getColumnName(), "percentileTDigest95.0MV_column");
     assertEquals(aggregationFunction.getResultColumnName(), "percentiletdigestmv(column, 95.0)");
+
+    function = getFunction("bool_and");
+    aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof BooleanAndAggregationFunction);
+    assertEquals(aggregationFunction.getType(), AggregationFunctionType.BOOLAND);
+    assertEquals(aggregationFunction.getColumnName(), "boolAnd_column");
+    assertEquals(aggregationFunction.getResultColumnName(), "booland(column)");
+
+    function = getFunction("bool_or");
+    aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof BooleanOrAggregationFunction);
+    assertEquals(aggregationFunction.getType(), AggregationFunctionType.BOOLOR);
+    assertEquals(aggregationFunction.getColumnName(), "boolOr_column");
+    assertEquals(aggregationFunction.getResultColumnName(), "boolor(column)");
   }
 
   private FunctionContext getFunction(String functionName) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BooleanAggQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BooleanAggQueriesTest.java
new file mode 100644
index 0000000000..3e87d7446b
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BooleanAggQueriesTest.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.queries;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import org.apache.pinot.core.operator.query.GroupByOperator;
+import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.util.GapfillUtils;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class BooleanAggQueriesTest extends BaseQueriesTest {
+
+  private static final int NUM_RECORDS = 16;
+
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "BooleanAggQueriesTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  private static final String BOOLEAN_COLUMN = "boolColumn";
+  private static final String GROUP_BY_COLUMN = "groupByColumn";
+
+  private static final Schema SCHEMA =
+      new Schema.SchemaBuilder()
+          .addSingleValueDimension(BOOLEAN_COLUMN, FieldSpec.DataType.BOOLEAN)
+          .addSingleValueDimension(GROUP_BY_COLUMN, FieldSpec.DataType.STRING).build();
+
+  private static final TableConfig TABLE_CONFIG =
+      new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @DataProvider
+  public static Object[][] nullHandling() {
+    return new Object[][] {
+        new Object[]{true},
+        new Object[]{false},
+    };
+  }
+
+  @Override
+  protected String getFilter() {
+    return "";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+    throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+
+    Object[][] recordContents = new Object[][] {
+        new Object[]{true, "allTrue"},
+        new Object[]{true, "allTrue"},
+        new Object[]{true, "allTrue"},
+        new Object[]{false, "allFalse"},
+        new Object[]{false, "allFalse"},
+        new Object[]{false, "allFalse"},
+        new Object[]{true, "mixedOne"},
+        new Object[]{true, "mixedOne"},
+        new Object[]{false, "mixedOne"},
+        new Object[]{false, "mixedTwo"},
+        new Object[]{true, "mixedTwo"},
+        new Object[]{false, "mixedTwo"},
+        new Object[]{null, "withNulls"},
+        new Object[]{true, "withNulls"},
+        new Object[]{false, "withNulls"},
+        new Object[]{null, "onlyNulls"},
+    };
+
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+    for (Object[] record : recordContents) {
+      GenericRow genericRow = new GenericRow();
+      genericRow.putValue(BOOLEAN_COLUMN, record[0]);
+      genericRow.putValue(GROUP_BY_COLUMN, record[1]);
+      records.add(genericRow);
+    }
+
+    ImmutableSegment immutableSegment = setUpSingleSegment(records, SEGMENT_NAME);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+  }
+
+  private ImmutableSegment setUpSingleSegment(List<GenericRow> recordSet, String segmentName)
+      throws Exception {
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(segmentName);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+    segmentGeneratorConfig.setNullHandlingEnabled(true);
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(recordSet));
+    driver.build();
+
+    return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.mmap);
+  }
+
+  protected <T extends Operator> T getOperator(String query, boolean enableNullHandling) {
+    PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+
+    pinotQuery.setQueryOptions(
+        ImmutableMap.of(
+            CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING,
+            String.valueOf(enableNullHandling)));
+
+    PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(serverPinotQuery);
+    return (T) PLAN_MAKER.makeSegmentPlanNode(getIndexSegment(), queryContext).run();
+  }
+
+  @Test(dataProvider = "nullHandling")
+  public void testBooleanAnd(boolean isNullHandlingEnabled) {
+    // Given:
+    String query = "SELECT BOOL_AND(boolColumn) FROM testTable GROUP BY groupByColumn";
+    GroupByOperator operator = getOperator(query, isNullHandlingEnabled);
+
+    // When:
+    GroupByResultsBlock result = operator.nextBlock();
+
+    // Then;
+    QueriesTestUtils.testInnerSegmentExecutionStatistics(
+        // multiply values by two because we copy the segments into two different segments
+        operator.getExecutionStatistics(), NUM_RECORDS, 0, NUM_RECORDS * 2, NUM_RECORDS);
+    AggregationGroupByResult aggregates = result.getAggregationGroupByResult();
+    Iterator<GroupKeyGenerator.GroupKey> keys = aggregates.getGroupKeyIterator();
+    while (keys.hasNext()) {
+      GroupKeyGenerator.GroupKey key = keys.next();
+      switch ((String) key._keys[0]) {
+        case "allFalse":
+        case "mixedOne":
+        case "mixedTwo":
+        case "withNulls":
+          Assert.assertEquals(aggregates.getResultForGroupId(0, key._groupId), 0);
+          break;
+        case "allTrue":
+          Assert.assertEquals(aggregates.getResultForGroupId(0, key._groupId), 1);
+          break;
+        case "onlyNulls":
+          Assert.assertEquals(aggregates.getResultForGroupId(0, key._groupId), isNullHandlingEnabled ? null : 0);
+          break;
+        default:
+          throw new IllegalStateException("Unexpected grouping: " + key._keys[0]);
+      }
+    }
+  }
+
+  @Test(dataProvider = "nullHandling")
+  public void testBooleanOr(boolean isNullHandlingEnabled) {
+    // Given:
+    String query = "SELECT BOOL_OR(boolColumn) FROM testTable GROUP BY groupByColumn";
+    GroupByOperator operator = getOperator(query, isNullHandlingEnabled);
+
+    // When:
+    GroupByResultsBlock result = operator.nextBlock();
+
+    // Then;
+    QueriesTestUtils.testInnerSegmentExecutionStatistics(
+        // multiply values by two because we copy the segments into two different segments
+        operator.getExecutionStatistics(), NUM_RECORDS, 0, NUM_RECORDS * 2, NUM_RECORDS);
+    AggregationGroupByResult aggregates = result.getAggregationGroupByResult();
+    Iterator<GroupKeyGenerator.GroupKey> keys = aggregates.getGroupKeyIterator();
+    while (keys.hasNext()) {
+      GroupKeyGenerator.GroupKey key = keys.next();
+      switch ((String) key._keys[0]) {
+        case "mixedOne":
+        case "mixedTwo":
+        case "allTrue":
+        case "withNulls":
+          Assert.assertEquals(aggregates.getResultForGroupId(0, key._groupId), 1);
+          break;
+        case "allFalse":
+          Assert.assertEquals(aggregates.getResultForGroupId(0, key._groupId), 0);
+          break;
+        case "onlyNulls":
+          Assert.assertEquals(aggregates.getResultForGroupId(0, key._groupId), isNullHandlingEnabled ? null : 0);
+          break;
+        default:
+          throw new IllegalStateException("Unexpected grouping: " + key._keys[0]);
+      }
+    }
+  }
+
+  @Test(dataProvider = "nullHandling")
+  public void testBooleanAndNoGroupBy(boolean isNullHandlingEnabled) {
+    // Given:
+    String query = "SELECT BOOL_AND(boolColumn) FROM testTable WHERE groupByColumn = 'allTrue'";
+    AggregationOperator operator = getOperator(query, isNullHandlingEnabled);
+
+    // When:
+    AggregationResultsBlock result = operator.nextBlock();
+
+    // Then;
+    List<Object> aggregates = result.getResults();
+    Assert.assertEquals(aggregates.size(), 1);
+    Assert.assertEquals(aggregates.get(0), 1);
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
index 5545798b48..6b826ab04b 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
@@ -70,7 +70,7 @@ public class PinotAggregateExchangeNodeInsertRule extends RelOptRule {
   public static final PinotAggregateExchangeNodeInsertRule INSTANCE =
       new PinotAggregateExchangeNodeInsertRule(PinotRuleUtils.PINOT_REL_FACTORY);
   private static final Set<SqlKind> SUPPORTED_AGG_KIND = ImmutableSet.of(
-      SqlKind.SUM, SqlKind.SUM0, SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT);
+      SqlKind.SUM, SqlKind.SUM0, SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT, SqlKind.OTHER_FUNCTION);
 
   public PinotAggregateExchangeNodeInsertRule(RelBuilderFactory factory) {
     super(operand(LogicalAggregate.class, any()), factory, null);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
similarity index 56%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
copy to pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
index 33e975c0ff..3610ce0c4d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
@@ -16,35 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.query.aggregation;
 
-/**
- * Interface for ResultHolder to hold the result of aggregation.
- *
- */
-public interface AggregationResultHolder {
+package org.apache.calcite.sql.fun;
 
-  /**
-   * Set the 'primitive double' aggregation result.
-   * @param value
-   */
-  void setValue(double value);
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.util.Optionality;
 
-  /**
-   * Set the aggregation result value.
-   * @param value
-   */
-  void setValue(Object value);
 
-  /**
-   * Returns the 'primitive double' aggregation result.
-   * @return
-   */
-  double getDoubleResult();
+public class PinotBoolAndAggregateFunction extends SqlAggFunction {
 
-  /**
-   * Returns the result of aggregation.
-   * @return
-   */
-  <T> T getResult();
+  public PinotBoolAndAggregateFunction() {
+    super("BOOL_AND", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
+        null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+        false, false, Optionality.FORBIDDEN);
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
similarity index 56%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
copy to pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
index 33e975c0ff..3d336c1070 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
@@ -16,35 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.query.aggregation;
 
-/**
- * Interface for ResultHolder to hold the result of aggregation.
- *
- */
-public interface AggregationResultHolder {
+package org.apache.calcite.sql.fun;
 
-  /**
-   * Set the 'primitive double' aggregation result.
-   * @param value
-   */
-  void setValue(double value);
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.util.Optionality;
 
-  /**
-   * Set the aggregation result value.
-   * @param value
-   */
-  void setValue(Object value);
 
-  /**
-   * Returns the 'primitive double' aggregation result.
-   * @return
-   */
-  double getDoubleResult();
+public class PinotBoolOrAggregateFunction extends SqlAggFunction {
 
-  /**
-   * Returns the result of aggregation.
-   * @return
-   */
-  <T> T getResult();
+  public PinotBoolOrAggregateFunction() {
+    super("BOOL_OR", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
+        null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+        false, false, Optionality.FORBIDDEN);
+  }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java
index ff37a2fbff..3d2ba325bf 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java
@@ -21,6 +21,7 @@ package org.apache.calcite.sql.fun;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.validate.SqlNameMatchers;
@@ -44,6 +45,8 @@ public class PinotOperatorTable extends SqlStdOperatorTable {
   private static @MonotonicNonNull PinotOperatorTable _instance;
 
   public static final SqlFunction COALESCE = new PinotSqlCoalesceFunction();
+  public static final SqlAggFunction BOOL_AND = new PinotBoolAndAggregateFunction();
+  public static final SqlAggFunction BOOL_OR = new PinotBoolOrAggregateFunction();
 
   // TODO: clean up lazy init by using Suppliers.memorized(this::computeInstance) and make getter wrapped around
   // supplier instance. this should replace all lazy init static objects in the codebase
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 7389a70106..87bea8df1c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -208,6 +208,14 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> {
     return ((Number) left).doubleValue() + 1;
   }
 
+  private static Boolean mergeBoolAnd(Object left, Object right) {
+    return ((Boolean) left) && ((Boolean) right);
+  }
+
+  private static Boolean mergeBoolOr(Object left, Object right) {
+    return ((Boolean) left) || ((Boolean) right);
+  }
+
   private static Key extraRowKey(Object[] row, List<RexExpression> groupSet) {
     Object[] keyElements = new Object[groupSet.size()];
     for (int i = 0; i < groupSet.size(); i++) {
@@ -233,6 +241,12 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> {
         .put("$MAX", AggregateOperator::mergeMax)
         .put("$MAX0", AggregateOperator::mergeMax)
         .put("COUNT", AggregateOperator::mergeCount)
+        .put("BOOL_AND", AggregateOperator::mergeBoolAnd)
+        .put("$BOOL_AND", AggregateOperator::mergeBoolAnd)
+        .put("$BOOL_AND0", AggregateOperator::mergeBoolAnd)
+        .put("BOOL_OR", AggregateOperator::mergeBoolOr)
+        .put("$BOOL_OR", AggregateOperator::mergeBoolOr)
+        .put("$BOOL_OR0", AggregateOperator::mergeBoolOr)
         .build();
 
     final int _inputRef;
diff --git a/pinot-query-runtime/src/test/resources/queries/Aggregates.json b/pinot-query-runtime/src/test/resources/queries/Aggregates.json
index 25986b8501..b82be2cb5e 100644
--- a/pinot-query-runtime/src/test/resources/queries/Aggregates.json
+++ b/pinot-query-runtime/src/test/resources/queries/Aggregates.json
@@ -3,66 +3,19 @@
     "tables": {
       "tbl": {
         "schema": [
-          {
-            "name": "int_col",
-            "type": "INT"
-          },
-          {
-            "name": "double_col",
-            "type": "DOUBLE"
-          },
-          {
-            "name": "string_col",
-            "type": "STRING"
-          },
-          {
-            "name": "bool_col",
-            "type": "BOOLEAN"
-          }
+          { "name": "int_col", "type": "INT" },
+          { "name": "double_col", "type": "DOUBLE" },
+          { "name": "string_col", "type": "STRING" },
+          { "name": "bool_col", "type": "BOOLEAN" }
         ],
         "inputs": [
-          [
-            2,
-            300,
-            "a",
-            true
-          ],
-          [
-            2,
-            400,
-            "a",
-            false
-          ],
-          [
-            3,
-            100,
-            "b",
-            true
-          ],
-          [
-            100,
-            1,
-            "b",
-            false
-          ],
-          [
-            101,
-            1.01,
-            "c",
-            false
-          ],
-          [
-            150,
-            1.5,
-            "c",
-            false
-          ],
-          [
-            175,
-            1.75,
-            "c",
-            true
-          ]
+          [ 2, 300, "a", true ],
+          [ 2, 400, "a", true ],
+          [ 3, 100, "b", false ],
+          [ 100, 1, "b", false ],
+          [ 101, 1.01, "c", false ],
+          [ 150, 1.5, "c", false ],
+          [ 175, 1.75, "c", true ]
         ]
       }
     },
@@ -164,6 +117,29 @@
         "comments": "class java.lang.Double cannot be cast to class java.lang.Boolean",
         "description": "aggregate boolean column",
         "sql": "SELECT min(bool_col) FROM {tbl}"
+      },
+      {
+        "psql": "9.21.0",
+        "description": "aggregate boolean column",
+        "sql": "SELECT bool_and(bool_col), bool_or(bool_col) FROM {tbl} GROUP BY string_col"
+      },
+      {
+        "psql": "9.21.0",
+        "description": "aggregate boolean column no group by",
+        "sql": "SELECT bool_and(bool_col), bool_or(bool_col) FROM {tbl}"
+      },
+      {
+        "ignored": true,
+        "comment": "issue with converting data types:  Unexpected RelDataTypeField: ANY for column: EXPR$0",
+        "psql": "9.21.0",
+        "description": "aggregate boolean column no group by with inner function",
+        "sql": "SELECT bool_and(startsWith(string_col, 'a')), bool_or(startsWith(string_col, 'a')) FROM {tbl}"
+      },
+      {
+        "ignored": true,
+        "comment": "issue with converting data types:  Unexpected RelDataTypeField: ANY for column: EXPR$0",
+        "description": "sum with inner function",
+        "sql": "SELECT sum(pow(int_col, 2)) FROM {tbl}"
       }
     ]
   },
@@ -171,78 +147,21 @@
     "tables": {
       "tbl": {
         "schema": [
-          {
-            "name": "int_col",
-            "type": "INT"
-          },
-          {
-            "name": "double_col",
-            "type": "DOUBLE"
-          },
-          {
-            "name": "string_col",
-            "type": "STRING"
-          },
-          {
-            "name": "bool_col",
-            "type": "BOOLEAN"
-          }
+          { "name": "int_col", "type": "INT" },
+          { "name": "double_col", "type": "DOUBLE" },
+          { "name": "string_col", "type": "STRING" },
+          { "name": "bool_col", "type": "BOOLEAN" }
         ],
         "inputs": [
-          [
-            2,
-            300,
-            "a",
-            false
-          ],
-          [
-            2,
-            400,
-            "a",
-            true
-          ],
-          [
-            3,
-            100,
-            "b",
-            true
-          ],
-          [
-            0.001,
-            1,
-            "b",
-            false
-          ],
-          [
-            101,
-            1.01,
-            "c",
-            false
-          ],
-          [
-            150,
-            1.5,
-            "c",
-            true
-          ],
-          [
-            175,
-            1.75,
-            "c",
-            true
-          ],
-          [
-            -10000,
-            1.75,
-            "c",
-            false
-          ],
-          [
-            -2,
-            0.5,
-            "c",
-            false
-          ]
+          [ 2, 300, "a", false ],
+          [ 2, 400, "a", true ],
+          [ 3, 100, "b", true ],
+          [ 0.001, 1, "b", false ],
+          [ 101, 1.01, "c", false ],
+          [ 150, 1.5, "c", true ],
+          [ 175, 1.75, "c", true ],
+          [ -10000, 1.75, "c", false ],
+          [ -2, 0.5, "c", false ]
         ]
       }
     },
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 48a2199918..19d2f4d6e5 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -79,7 +79,11 @@ public enum AggregationFunctionType {
   PERCENTILERAWESTMV("percentileRawEstMV"),
   PERCENTILETDIGESTMV("percentileTDigestMV"),
   PERCENTILERAWTDIGESTMV("percentileRawTDigestMV"),
-  DISTINCT("distinct");
+  DISTINCT("distinct"),
+
+  // boolean aggregate functions
+  BOOLAND("boolAnd"),
+  BOOLOR("boolOr");
 
   private static final Set<String> NAMES = Arrays.stream(values()).flatMap(func -> Stream.of(func.name(),
       func.getName(), func.getName().toLowerCase())).collect(Collectors.toSet());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org