You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "izveigor (via GitHub)" <gi...@apache.org> on 2023/05/07 19:32:14 UTC

[GitHub] [arrow-datafusion] izveigor opened a new pull request, #6276: feat: support bitwise and boolean aggregate functions

izveigor opened a new pull request, #6276:
URL: https://github.com/apache/arrow-datafusion/pull/6276

   # Which issue does this PR close?
   
   Closes #6275
   
   # Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are these changes tested?
   Yes
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   Yes
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6276: feat: support bitwise and boolean aggregate functions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6276:
URL: https://github.com/apache/arrow-datafusion/pull/6276#discussion_r1191082495


##########
datafusion/physical-expr/src/aggregate/bool_and_or.rs:
##########
@@ -0,0 +1,643 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{ArrayRef, BooleanArray},
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+
+fn bool_and(array: &BooleanArray) -> Option<bool> {
+    if array.null_count() == array.len() {
+        return None;
+    }
+    Some(array.false_count() == 0)
+}
+
+fn bool_or(array: &BooleanArray) -> Option<bool> {
+    if array.null_count() == array.len() {
+        return None;
+    }
+    Some(array.true_count() != 0)
+}
+
+// Bool and/Bool or aggregation can take Dictionary encode input but always produces unpacked
+// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
+// The reason bool and/bool or aggregate produces unpacked output because there is only one
+// bool and/bool or value per group; there is no needs to keep them Dictionary encode
+fn bool_and_or_aggregate_data_type(input_type: DataType) -> DataType {
+    if let DataType::Dictionary(_, value_type) = input_type {
+        *value_type
+    } else {
+        input_type
+    }
+}
+
+// returns the new value after bool_and/bool_or with the new values, taking nullability into account
+macro_rules! typed_bool_and_or_batch {
+    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
+        let array = downcast_value!($VALUES, $ARRAYTYPE);
+        let delta = $OP(array);
+        Ok(ScalarValue::$SCALAR(delta))
+    }};
+}
+
+// bool_and/bool_or the array and returns a ScalarValue of its corresponding type.
+macro_rules! bool_and_or_batch {
+    ($VALUES:expr, $OP:ident) => {{
+        match $VALUES.data_type() {
+            DataType::Boolean => {
+                typed_bool_and_or_batch!($VALUES, BooleanArray, Boolean, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "Bool and/Bool or is not expected to receive the type {e:?}"
+                )));
+            }
+        }
+    }};
+}
+
+/// dynamically-typed bool_and(array) -> ScalarValue
+fn bool_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bool_and_or_batch!(values, bool_and)
+}
+
+/// dynamically-typed bool_or(array) -> ScalarValue
+fn bool_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bool_and_or_batch!(values, bool_or)
+}
+
+// bool_and/bool_or of two scalar values.
+macro_rules! typed_bool_and_or {
+    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
+        ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
+            (None, None) => None,
+            (Some(a), None) => Some(*a),
+            (None, Some(b)) => Some(*b),
+            (Some(a), Some(b)) => Some((*a).$OP(*b)),
+        })
+    }};
+}
+
+// bool_and/bool_or of two scalar values.
+macro_rules! typed_bool_and_or_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $TYPE:ident, $OP:ident) => {{
+        paste::item! {
+            match $SCALAR {
+                None => {}
+                Some(v) => $ACC.[<$OP _ $TYPE>]($INDEX, *v as $TYPE)
+            }
+        }
+    }};
+}
+
+// bool_and/bool_or of two scalar values of the same type
+macro_rules! bool_and_or {
+    ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
+        Ok(match ($VALUE, $DELTA) {
+            (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => {
+                typed_bool_and_or!(lhs, rhs, Boolean, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BOOL AND/BOOL OR is not expected to receive scalars of incompatible types {:?}",
+                    e
+                )))
+            }
+        })
+    }};
+}
+
+macro_rules! bool_and_or_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $OP:ident) => {{
+        Ok(match $SCALAR {
+            ScalarValue::Boolean(rhs) => {
+                typed_bool_and_or_v2!($INDEX, $ACC, rhs, bool, $OP)
+            }
+            ScalarValue::Null => {
+                // do nothing
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BOOL AND/BOOL OR is not expected to receive scalars of incompatible types {:?}",
+                    e
+                )))
+            }
+        })
+    }};
+}
+
+/// the bool_and of two scalar values
+pub fn booland(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+    bool_and_or!(lhs, rhs, bitand)
+}
+
+pub fn bool_and_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bool_and_or_v2!(index, accessor, s, bitand)
+}
+
+/// the bool_or of two scalar values
+pub fn boolor(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+    bool_and_or!(lhs, rhs, bitor)
+}
+
+pub fn bool_or_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bool_and_or_v2!(index, accessor, s, bitor)
+}
+
+/// BOOL_AND aggregate expression
+#[derive(Debug, Clone)]
+pub struct BoolAnd {
+    name: String,
+    pub data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
+}
+
+impl BoolAnd {
+    /// Create a new BOOL_AND aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type: bool_and_or_aggregate_data_type(data_type),
+            nullable: true,
+        }
+    }
+}
+
+impl AggregateExpr for BoolAnd {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BoolAndAccumulator::try_new(&self.data_type)?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "bool_and"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn row_accumulator_supported(&self) -> bool {
+        is_row_accumulator_support_dtype(&self.data_type)
+    }
+
+    fn supports_bounded_execution(&self) -> bool {
+        true
+    }
+
+    fn create_row_accumulator(
+        &self,
+        start_index: usize,
+    ) -> Result<Box<dyn RowAccumulator>> {
+        Ok(Box::new(BoolAndRowAccumulator::new(
+            start_index,
+            self.data_type.clone(),
+        )))
+    }
+
+    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+        Some(Arc::new(self.clone()))
+    }
+
+    fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BoolAndAccumulator::try_new(&self.data_type)?))
+    }
+}
+
+impl PartialEq<dyn Any> for BoolAnd {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.name == x.name
+                    && self.data_type == x.data_type
+                    && self.nullable == x.nullable
+                    && self.expr.eq(&x.expr)
+            })
+            .unwrap_or(false)
+    }
+}
+
+#[derive(Debug)]
+struct BoolAndAccumulator {
+    bool_and: ScalarValue,
+}
+
+impl BoolAndAccumulator {
+    /// new bool_and accumulator
+    pub fn try_new(data_type: &DataType) -> Result<Self> {
+        Ok(Self {
+            bool_and: ScalarValue::try_from(data_type)?,
+        })
+    }
+}
+
+impl Accumulator for BoolAndAccumulator {
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let values = &values[0];
+        let delta = &bool_and_batch(values)?;
+        self.bool_and = booland(&self.bool_and, delta)?;
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        self.update_batch(states)
+    }
+
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![self.bool_and.clone()])
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(self.bool_and.clone())
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self) - std::mem::size_of_val(&self.bool_and)
+            + self.bool_and.size()
+    }
+}
+
+#[derive(Debug)]
+struct BoolAndRowAccumulator {
+    index: usize,
+    datatype: DataType,
+}
+
+impl BoolAndRowAccumulator {
+    pub fn new(index: usize, datatype: DataType) -> Self {
+        Self { index, datatype }
+    }
+}
+
+impl RowAccumulator for BoolAndRowAccumulator {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let values = &values[0];
+        let delta = &bool_and_batch(values)?;
+        bool_and_row(self.index, accessor, delta)
+    }
+
+    fn update_scalar_values(
+        &mut self,
+        values: &[ScalarValue],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let value = &values[0];
+        bool_and_row(self.index, accessor, value)
+    }
+
+    fn update_scalar(
+        &mut self,
+        value: &ScalarValue,
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        bool_and_row(self.index, accessor, value)
+    }
+
+    fn merge_batch(
+        &mut self,
+        states: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        self.update_batch(states, accessor)
+    }
+
+    fn evaluate(&self, accessor: &RowAccessor) -> Result<ScalarValue> {
+        Ok(accessor.get_as_scalar(&self.datatype, self.index))
+    }
+
+    #[inline(always)]
+    fn state_index(&self) -> usize {
+        self.index
+    }
+}
+
+/// BOOL_OR aggregate expression
+#[derive(Debug, Clone)]
+pub struct BoolOr {
+    name: String,
+    pub data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
+}
+
+impl BoolOr {
+    /// Create a new BOOL_OR aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type: bool_and_or_aggregate_data_type(data_type),
+            nullable: true,
+        }
+    }
+}
+
+impl AggregateExpr for BoolOr {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BoolOrAccumulator::try_new(&self.data_type)?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "bool_or"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn row_accumulator_supported(&self) -> bool {
+        is_row_accumulator_support_dtype(&self.data_type)
+    }
+
+    fn supports_bounded_execution(&self) -> bool {
+        true

Review Comment:
   I don't think it supports bounded execution either. To support bounded execution accumulator should implement `retract_batch` method. (Theoretically, I don't think there is a way to implement `retract_batch` for `bitwise and` , `bitwise or`. However, it can be implemented for `bit wise xor` though [see discussion](https://security.stackexchange.com/questions/139717/the-reason-of-using-xor-operation-in-cryptographic-algorithms#:~:text=XOR%20is%20an%20operation%20that,or%200%20in%20the%20operands.)). Hence, I think we should not implement this method for `and`, `or` cases (it defaults to false). For `xor` we can support it, however, we may do these in followup PRs. As far as this Pr is concerned, deleting `supports_bounded_execution` implementation is enough I think. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] izveigor commented on a diff in pull request #6276: feat: support bitwise and boolean aggregate functions

Posted by "izveigor (via GitHub)" <gi...@apache.org>.
izveigor commented on code in PR #6276:
URL: https://github.com/apache/arrow-datafusion/pull/6276#discussion_r1192751625


##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -0,0 +1,1103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{
+        ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array,
+        UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::ArrowNativeTypeOp;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+use std::ops::BitXor as BitXorImplementation;
+
+fn bit_and<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitAndImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_and = data
+                .iter()
+                .fold(T::Native::ONE.neg_wrapping(), |accumulator, value| {
+                    accumulator & *value
+                });
+
+            Some(bit_and)
+        }
+        Some(nulls) => {
+            let mut bit_and = T::Native::ONE.neg_wrapping();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_and = bit_and & *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_and = bit_and & *value;
+                }
+            });
+
+            Some(bit_and)
+        }
+    }
+}
+
+fn bit_or<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitOrImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_or = data.iter().fold(T::default_value(), |accumulator, value| {
+                accumulator | *value
+            });
+
+            Some(bit_or)
+        }
+        Some(nulls) => {
+            let mut bit_or = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_or = bit_or | *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_or = bit_or | *value;
+                }
+            });
+
+            Some(bit_or)
+        }
+    }
+}
+
+fn bit_xor<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitXorImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_xor = data.iter().fold(T::default_value(), |accumulator, value| {
+                accumulator ^ *value
+            });
+
+            Some(bit_xor)
+        }
+        Some(nulls) => {
+            let mut bit_xor = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_xor = bit_xor ^ *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_xor = bit_xor ^ *value;
+                }
+            });
+
+            Some(bit_xor)
+        }
+    }
+}
+
+// Bit and/Bit or/Bit xor aggregation can take Dictionary encode input but always produces unpacked
+// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
+// The reason bit and/bit or/bit xor aggregate produces unpacked output because there is only one
+// bit and/bit or/bit xor value per group; there is no needs to keep them Dictionary encode
+fn bit_and_or_xor_aggregate_data_type(input_type: DataType) -> DataType {
+    if let DataType::Dictionary(_, value_type) = input_type {
+        *value_type
+    } else {
+        input_type
+    }
+}
+
+// returns the new value after bit_and/bit_or/bit_xor with the new values, taking nullability into account
+macro_rules! typed_bit_and_or_xor_batch {
+    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
+        let array = downcast_value!($VALUES, $ARRAYTYPE);
+        let delta = $OP(array);
+        Ok(ScalarValue::$SCALAR(delta))
+    }};
+}
+
+// bit_and/bit_or/bit_xor the array and returns a ScalarValue of its corresponding type.
+macro_rules! bit_and_or_xor_batch {
+    ($VALUES:expr, $OP:ident) => {{
+        match $VALUES.data_type() {

Review Comment:
   I think the usual way is more preferable (because other aggregate functions use this method). But, I think the idea can be continued in the separate PR for all aggregate functions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6276: feat: support bitwise and boolean aggregate functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6276:
URL: https://github.com/apache/arrow-datafusion/pull/6276#discussion_r1192445618


##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -0,0 +1,1103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{
+        ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array,
+        UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::ArrowNativeTypeOp;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+use std::ops::BitXor as BitXorImplementation;
+
+fn bit_and<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitAndImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_and = data
+                .iter()
+                .fold(T::Native::ONE.neg_wrapping(), |accumulator, value| {

Review Comment:
   TIL https://docs.rs/num-traits/latest/num_traits/ops/wrapping/trait.WrappingNeg.html 👍 



##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -0,0 +1,1103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{
+        ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array,
+        UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::ArrowNativeTypeOp;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+use std::ops::BitXor as BitXorImplementation;
+
+fn bit_and<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitAndImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_and = data
+                .iter()
+                .fold(T::Native::ONE.neg_wrapping(), |accumulator, value| {
+                    accumulator & *value
+                });
+
+            Some(bit_and)
+        }
+        Some(nulls) => {
+            let mut bit_and = T::Native::ONE.neg_wrapping();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_and = bit_and & *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_and = bit_and & *value;
+                }
+            });
+
+            Some(bit_and)
+        }
+    }
+}
+
+fn bit_or<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitOrImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {

Review Comment:
   This loop and the ones in `bit_and` are *very* similar -- (specifically the iteration over data chunks, the bit mask, and the remainder).  The only difference seems to be the initialization condition as well as how to update on each value.
   
   Could you please try and refactor them to avoid the duplication?



##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -0,0 +1,1103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{
+        ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array,
+        UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::ArrowNativeTypeOp;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+use std::ops::BitXor as BitXorImplementation;
+
+fn bit_and<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitAndImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_and = data
+                .iter()
+                .fold(T::Native::ONE.neg_wrapping(), |accumulator, value| {
+                    accumulator & *value
+                });
+
+            Some(bit_and)
+        }
+        Some(nulls) => {
+            let mut bit_and = T::Native::ONE.neg_wrapping();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_and = bit_and & *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_and = bit_and & *value;
+                }
+            });
+
+            Some(bit_and)
+        }
+    }
+}
+
+fn bit_or<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitOrImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_or = data.iter().fold(T::default_value(), |accumulator, value| {
+                accumulator | *value
+            });
+
+            Some(bit_or)
+        }
+        Some(nulls) => {
+            let mut bit_or = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_or = bit_or | *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_or = bit_or | *value;
+                }
+            });
+
+            Some(bit_or)
+        }
+    }
+}
+
+fn bit_xor<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitXorImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_xor = data.iter().fold(T::default_value(), |accumulator, value| {
+                accumulator ^ *value
+            });
+
+            Some(bit_xor)
+        }
+        Some(nulls) => {
+            let mut bit_xor = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_xor = bit_xor ^ *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_xor = bit_xor ^ *value;
+                }
+            });
+
+            Some(bit_xor)
+        }
+    }
+}
+
+// Bit and/Bit or/Bit xor aggregation can take Dictionary encode input but always produces unpacked
+// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
+// The reason bit and/bit or/bit xor aggregate produces unpacked output because there is only one
+// bit and/bit or/bit xor value per group; there is no needs to keep them Dictionary encode
+fn bit_and_or_xor_aggregate_data_type(input_type: DataType) -> DataType {
+    if let DataType::Dictionary(_, value_type) = input_type {
+        *value_type
+    } else {
+        input_type
+    }
+}
+
+// returns the new value after bit_and/bit_or/bit_xor with the new values, taking nullability into account
+macro_rules! typed_bit_and_or_xor_batch {
+    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
+        let array = downcast_value!($VALUES, $ARRAYTYPE);
+        let delta = $OP(array);
+        Ok(ScalarValue::$SCALAR(delta))
+    }};
+}
+
+// bit_and/bit_or/bit_xor the array and returns a ScalarValue of its corresponding type.
+macro_rules! bit_and_or_xor_batch {
+    ($VALUES:expr, $OP:ident) => {{
+        match $VALUES.data_type() {

Review Comment:
   I thin you could probably use https://docs.rs/arrow/latest/arrow/macro.downcast_primitive_array.html to avoid some of this replication



##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -0,0 +1,1103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{
+        ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array,
+        UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::ArrowNativeTypeOp;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+use std::ops::BitXor as BitXorImplementation;
+
+fn bit_and<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitAndImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_and = data
+                .iter()
+                .fold(T::Native::ONE.neg_wrapping(), |accumulator, value| {
+                    accumulator & *value
+                });
+
+            Some(bit_and)
+        }
+        Some(nulls) => {
+            let mut bit_and = T::Native::ONE.neg_wrapping();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_and = bit_and & *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_and = bit_and & *value;
+                }
+            });
+
+            Some(bit_and)
+        }
+    }
+}
+
+fn bit_or<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitOrImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_or = data.iter().fold(T::default_value(), |accumulator, value| {
+                accumulator | *value
+            });
+
+            Some(bit_or)
+        }
+        Some(nulls) => {
+            let mut bit_or = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_or = bit_or | *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_or = bit_or | *value;
+                }
+            });
+
+            Some(bit_or)
+        }
+    }
+}
+
+fn bit_xor<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitXorImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_xor = data.iter().fold(T::default_value(), |accumulator, value| {
+                accumulator ^ *value
+            });
+
+            Some(bit_xor)
+        }
+        Some(nulls) => {
+            let mut bit_xor = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_xor = bit_xor ^ *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_xor = bit_xor ^ *value;
+                }
+            });
+
+            Some(bit_xor)
+        }
+    }
+}
+
+// Bit and/Bit or/Bit xor aggregation can take Dictionary encode input but always produces unpacked
+// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
+// The reason bit and/bit or/bit xor aggregate produces unpacked output because there is only one
+// bit and/bit or/bit xor value per group; there is no needs to keep them Dictionary encode
+fn bit_and_or_xor_aggregate_data_type(input_type: DataType) -> DataType {
+    if let DataType::Dictionary(_, value_type) = input_type {
+        *value_type
+    } else {
+        input_type
+    }
+}
+
+// returns the new value after bit_and/bit_or/bit_xor with the new values, taking nullability into account
+macro_rules! typed_bit_and_or_xor_batch {
+    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
+        let array = downcast_value!($VALUES, $ARRAYTYPE);
+        let delta = $OP(array);
+        Ok(ScalarValue::$SCALAR(delta))
+    }};
+}
+
+// bit_and/bit_or/bit_xor the array and returns a ScalarValue of its corresponding type.
+macro_rules! bit_and_or_xor_batch {
+    ($VALUES:expr, $OP:ident) => {{
+        match $VALUES.data_type() {
+            DataType::Int64 => {
+                typed_bit_and_or_xor_batch!($VALUES, Int64Array, Int64, $OP)
+            }
+            DataType::Int32 => {
+                typed_bit_and_or_xor_batch!($VALUES, Int32Array, Int32, $OP)
+            }
+            DataType::Int16 => {
+                typed_bit_and_or_xor_batch!($VALUES, Int16Array, Int16, $OP)
+            }
+            DataType::Int8 => typed_bit_and_or_xor_batch!($VALUES, Int8Array, Int8, $OP),
+            DataType::UInt64 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt64Array, UInt64, $OP)
+            }
+            DataType::UInt32 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt32Array, UInt32, $OP)
+            }
+            DataType::UInt16 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt16Array, UInt16, $OP)
+            }
+            DataType::UInt8 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt8Array, UInt8, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "Bit and/Bit or/Bit xor is not expected to receive the type {e:?}"
+                )));
+            }
+        }
+    }};
+}
+
+/// dynamically-typed bit_and(array) -> ScalarValue
+fn bit_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bit_and_or_xor_batch!(values, bit_and)
+}
+
+/// dynamically-typed bit_or(array) -> ScalarValue
+fn bit_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bit_and_or_xor_batch!(values, bit_or)
+}
+
+/// dynamically-typed bit_xor(array) -> ScalarValue
+fn bit_xor_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bit_and_or_xor_batch!(values, bit_xor)
+}
+
+// bit_and/bit_or/bit_xor of two scalar values.
+macro_rules! typed_bit_and_or_xor {
+    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
+        ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
+            (None, None) => None,
+            (Some(a), None) => Some(*a),
+            (None, Some(b)) => Some(*b),
+            (Some(a), Some(b)) => Some((*a).$OP(*b)),
+        })
+    }};
+}
+
+// bit_and/bit_or/bit_xor of two scalar values.
+macro_rules! typed_bit_and_or_xor_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $TYPE:ident, $OP:ident) => {{
+        paste::item! {
+            match $SCALAR {
+                None => {}
+                Some(v) => $ACC.[<$OP _ $TYPE>]($INDEX, *v as $TYPE)
+            }
+        }
+    }};
+}
+
+// bit_and/bit_or/bit_xor of two scalar values of the same type
+macro_rules! bit_and_or_xor {
+    ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
+        Ok(match ($VALUE, $DELTA) {
+            (ScalarValue::UInt64(lhs), ScalarValue::UInt64(rhs)) => {
+                typed_bit_and_or_xor!(lhs, rhs, UInt64, $OP)
+            }
+            (ScalarValue::UInt32(lhs), ScalarValue::UInt32(rhs)) => {
+                typed_bit_and_or_xor!(lhs, rhs, UInt32, $OP)
+            }
+            (ScalarValue::UInt16(lhs), ScalarValue::UInt16(rhs)) => {
+                typed_bit_and_or_xor!(lhs, rhs, UInt16, $OP)
+            }
+            (ScalarValue::UInt8(lhs), ScalarValue::UInt8(rhs)) => {
+                typed_bit_and_or_xor!(lhs, rhs, UInt8, $OP)
+            }
+            (ScalarValue::Int64(lhs), ScalarValue::Int64(rhs)) => {
+                typed_bit_and_or_xor!(lhs, rhs, Int64, $OP)
+            }
+            (ScalarValue::Int32(lhs), ScalarValue::Int32(rhs)) => {
+                typed_bit_and_or_xor!(lhs, rhs, Int32, $OP)
+            }
+            (ScalarValue::Int16(lhs), ScalarValue::Int16(rhs)) => {
+                typed_bit_and_or_xor!(lhs, rhs, Int16, $OP)
+            }
+            (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
+                typed_bit_and_or_xor!(lhs, rhs, Int8, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BIT AND/BIT OR/BIT XOR is not expected to receive scalars of incompatible types {:?}",
+                    e
+                )))
+            }
+        })
+    }};
+}
+
+macro_rules! bit_and_or_xor_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $OP:ident) => {{
+        Ok(match $SCALAR {
+            ScalarValue::UInt64(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u64, $OP)
+            }
+            ScalarValue::UInt32(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u32, $OP)
+            }
+            ScalarValue::UInt16(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u16, $OP)
+            }
+            ScalarValue::UInt8(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u8, $OP)
+            }
+            ScalarValue::Int64(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i64, $OP)
+            }
+            ScalarValue::Int32(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i32, $OP)
+            }
+            ScalarValue::Int16(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i16, $OP)
+            }
+            ScalarValue::Int8(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i8, $OP)
+            }
+            ScalarValue::Null => {
+                // do nothing
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BIT AND/BIT OR/BIT XOR is not expected to receive scalars of incompatible types {:?}",
+                    e
+                )))
+            }
+        })
+    }};
+}
+
+/// the bit_and of two scalar values
+pub fn compute_bit_and(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+    bit_and_or_xor!(lhs, rhs, bitand)
+}
+
+pub fn bit_and_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bit_and_or_xor_v2!(index, accessor, s, bitand)
+}
+
+/// the bit_or of two scalar values
+pub fn compute_bit_or(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+    bit_and_or_xor!(lhs, rhs, bitor)
+}
+
+pub fn bit_or_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bit_and_or_xor_v2!(index, accessor, s, bitor)
+}
+
+/// the bit_xor of two scalar values
+pub fn compute_bit_xor(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+    bit_and_or_xor!(lhs, rhs, bitxor)
+}
+
+pub fn bit_xor_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bit_and_or_xor_v2!(index, accessor, s, bitxor)
+}
+
+/// BIT_AND aggregate expression
+#[derive(Debug, Clone)]
+pub struct BitAnd {
+    name: String,
+    pub data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
+}
+
+impl BitAnd {
+    /// Create a new BIT_AND aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type: bit_and_or_xor_aggregate_data_type(data_type),
+            nullable: true,
+        }
+    }
+}
+
+impl AggregateExpr for BitAnd {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BitAndAccumulator::try_new(&self.data_type)?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "bit_and"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn row_accumulator_supported(&self) -> bool {
+        is_row_accumulator_support_dtype(&self.data_type)
+    }
+
+    fn supports_bounded_execution(&self) -> bool {
+        true
+    }
+
+    fn create_row_accumulator(
+        &self,
+        start_index: usize,
+    ) -> Result<Box<dyn RowAccumulator>> {
+        Ok(Box::new(BitAndRowAccumulator::new(
+            start_index,
+            self.data_type.clone(),
+        )))
+    }
+
+    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+        Some(Arc::new(self.clone()))
+    }
+
+    fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {

Review Comment:
   I don't think the BitAnd and other accumulators in this file will correctly implement sliding accumulators (this is another place where `supportes_bounded_execution` should return false as suggested by @mustafasrepo )



##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -0,0 +1,1103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{
+        ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array,
+        UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::ArrowNativeTypeOp;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+use std::ops::BitXor as BitXorImplementation;
+
+fn bit_and<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitAndImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_and = data
+                .iter()
+                .fold(T::Native::ONE.neg_wrapping(), |accumulator, value| {
+                    accumulator & *value
+                });
+
+            Some(bit_and)
+        }
+        Some(nulls) => {
+            let mut bit_and = T::Native::ONE.neg_wrapping();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_and = bit_and & *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_and = bit_and & *value;
+                }
+            });
+
+            Some(bit_and)
+        }
+    }
+}
+
+fn bit_or<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitOrImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_or = data.iter().fold(T::default_value(), |accumulator, value| {
+                accumulator | *value
+            });
+
+            Some(bit_or)
+        }
+        Some(nulls) => {
+            let mut bit_or = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_or = bit_or | *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_or = bit_or | *value;
+                }
+            });
+
+            Some(bit_or)
+        }
+    }
+}
+
+fn bit_xor<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitXorImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_xor = data.iter().fold(T::default_value(), |accumulator, value| {
+                accumulator ^ *value
+            });
+
+            Some(bit_xor)
+        }
+        Some(nulls) => {
+            let mut bit_xor = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_xor = bit_xor ^ *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_xor = bit_xor ^ *value;
+                }
+            });
+
+            Some(bit_xor)
+        }
+    }
+}
+
+// Bit and/Bit or/Bit xor aggregation can take Dictionary encode input but always produces unpacked
+// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
+// The reason bit and/bit or/bit xor aggregate produces unpacked output because there is only one
+// bit and/bit or/bit xor value per group; there is no needs to keep them Dictionary encode
+fn bit_and_or_xor_aggregate_data_type(input_type: DataType) -> DataType {

Review Comment:
   again, not sure if this is the case or if it is left over copy/paste. I didn't see any tests for dictionary encoded values 



##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -0,0 +1,1103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{
+        ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array,
+        UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::ArrowNativeTypeOp;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+use std::ops::BitXor as BitXorImplementation;
+
+fn bit_and<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitAndImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_and = data
+                .iter()
+                .fold(T::Native::ONE.neg_wrapping(), |accumulator, value| {
+                    accumulator & *value
+                });
+
+            Some(bit_and)
+        }
+        Some(nulls) => {
+            let mut bit_and = T::Native::ONE.neg_wrapping();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_and = bit_and & *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_and = bit_and & *value;
+                }
+            });
+
+            Some(bit_and)
+        }
+    }
+}
+
+fn bit_or<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitOrImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_or = data.iter().fold(T::default_value(), |accumulator, value| {
+                accumulator | *value
+            });
+
+            Some(bit_or)
+        }
+        Some(nulls) => {
+            let mut bit_or = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_or = bit_or | *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_or = bit_or | *value;
+                }
+            });
+
+            Some(bit_or)
+        }
+    }
+}
+
+fn bit_xor<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitXorImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_xor = data.iter().fold(T::default_value(), |accumulator, value| {
+                accumulator ^ *value
+            });
+
+            Some(bit_xor)
+        }
+        Some(nulls) => {
+            let mut bit_xor = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_xor = bit_xor ^ *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_xor = bit_xor ^ *value;
+                }
+            });
+
+            Some(bit_xor)
+        }
+    }
+}
+
+// Bit and/Bit or/Bit xor aggregation can take Dictionary encode input but always produces unpacked
+// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
+// The reason bit and/bit or/bit xor aggregate produces unpacked output because there is only one
+// bit and/bit or/bit xor value per group; there is no needs to keep them Dictionary encode
+fn bit_and_or_xor_aggregate_data_type(input_type: DataType) -> DataType {
+    if let DataType::Dictionary(_, value_type) = input_type {
+        *value_type
+    } else {
+        input_type
+    }
+}
+
+// returns the new value after bit_and/bit_or/bit_xor with the new values, taking nullability into account
+macro_rules! typed_bit_and_or_xor_batch {
+    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
+        let array = downcast_value!($VALUES, $ARRAYTYPE);
+        let delta = $OP(array);
+        Ok(ScalarValue::$SCALAR(delta))
+    }};
+}
+
+// bit_and/bit_or/bit_xor the array and returns a ScalarValue of its corresponding type.
+macro_rules! bit_and_or_xor_batch {
+    ($VALUES:expr, $OP:ident) => {{
+        match $VALUES.data_type() {
+            DataType::Int64 => {
+                typed_bit_and_or_xor_batch!($VALUES, Int64Array, Int64, $OP)
+            }
+            DataType::Int32 => {
+                typed_bit_and_or_xor_batch!($VALUES, Int32Array, Int32, $OP)
+            }
+            DataType::Int16 => {
+                typed_bit_and_or_xor_batch!($VALUES, Int16Array, Int16, $OP)
+            }
+            DataType::Int8 => typed_bit_and_or_xor_batch!($VALUES, Int8Array, Int8, $OP),
+            DataType::UInt64 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt64Array, UInt64, $OP)
+            }
+            DataType::UInt32 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt32Array, UInt32, $OP)
+            }
+            DataType::UInt16 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt16Array, UInt16, $OP)
+            }
+            DataType::UInt8 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt8Array, UInt8, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "Bit and/Bit or/Bit xor is not expected to receive the type {e:?}"
+                )));
+            }
+        }
+    }};
+}
+
+/// dynamically-typed bit_and(array) -> ScalarValue
+fn bit_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bit_and_or_xor_batch!(values, bit_and)
+}
+
+/// dynamically-typed bit_or(array) -> ScalarValue
+fn bit_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bit_and_or_xor_batch!(values, bit_or)
+}
+
+/// dynamically-typed bit_xor(array) -> ScalarValue
+fn bit_xor_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bit_and_or_xor_batch!(values, bit_xor)
+}
+
+// bit_and/bit_or/bit_xor of two scalar values.
+macro_rules! typed_bit_and_or_xor {

Review Comment:
   I recommend we make this a method on `ScalarValue` so that it is more discoverable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6276: feat: support bitwise and boolean aggregate functions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6276:
URL: https://github.com/apache/arrow-datafusion/pull/6276#discussion_r1191082495


##########
datafusion/physical-expr/src/aggregate/bool_and_or.rs:
##########
@@ -0,0 +1,643 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{ArrayRef, BooleanArray},
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+
+fn bool_and(array: &BooleanArray) -> Option<bool> {
+    if array.null_count() == array.len() {
+        return None;
+    }
+    Some(array.false_count() == 0)
+}
+
+fn bool_or(array: &BooleanArray) -> Option<bool> {
+    if array.null_count() == array.len() {
+        return None;
+    }
+    Some(array.true_count() != 0)
+}
+
+// Bool and/Bool or aggregation can take Dictionary encode input but always produces unpacked
+// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
+// The reason bool and/bool or aggregate produces unpacked output because there is only one
+// bool and/bool or value per group; there is no needs to keep them Dictionary encode
+fn bool_and_or_aggregate_data_type(input_type: DataType) -> DataType {
+    if let DataType::Dictionary(_, value_type) = input_type {
+        *value_type
+    } else {
+        input_type
+    }
+}
+
+// returns the new value after bool_and/bool_or with the new values, taking nullability into account
+macro_rules! typed_bool_and_or_batch {
+    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
+        let array = downcast_value!($VALUES, $ARRAYTYPE);
+        let delta = $OP(array);
+        Ok(ScalarValue::$SCALAR(delta))
+    }};
+}
+
+// bool_and/bool_or the array and returns a ScalarValue of its corresponding type.
+macro_rules! bool_and_or_batch {
+    ($VALUES:expr, $OP:ident) => {{
+        match $VALUES.data_type() {
+            DataType::Boolean => {
+                typed_bool_and_or_batch!($VALUES, BooleanArray, Boolean, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "Bool and/Bool or is not expected to receive the type {e:?}"
+                )));
+            }
+        }
+    }};
+}
+
+/// dynamically-typed bool_and(array) -> ScalarValue
+fn bool_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bool_and_or_batch!(values, bool_and)
+}
+
+/// dynamically-typed bool_or(array) -> ScalarValue
+fn bool_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bool_and_or_batch!(values, bool_or)
+}
+
+// bool_and/bool_or of two scalar values.
+macro_rules! typed_bool_and_or {
+    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
+        ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
+            (None, None) => None,
+            (Some(a), None) => Some(*a),
+            (None, Some(b)) => Some(*b),
+            (Some(a), Some(b)) => Some((*a).$OP(*b)),
+        })
+    }};
+}
+
+// bool_and/bool_or of two scalar values.
+macro_rules! typed_bool_and_or_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $TYPE:ident, $OP:ident) => {{
+        paste::item! {
+            match $SCALAR {
+                None => {}
+                Some(v) => $ACC.[<$OP _ $TYPE>]($INDEX, *v as $TYPE)
+            }
+        }
+    }};
+}
+
+// bool_and/bool_or of two scalar values of the same type
+macro_rules! bool_and_or {
+    ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
+        Ok(match ($VALUE, $DELTA) {
+            (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => {
+                typed_bool_and_or!(lhs, rhs, Boolean, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BOOL AND/BOOL OR is not expected to receive scalars of incompatible types {:?}",
+                    e
+                )))
+            }
+        })
+    }};
+}
+
+macro_rules! bool_and_or_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $OP:ident) => {{
+        Ok(match $SCALAR {
+            ScalarValue::Boolean(rhs) => {
+                typed_bool_and_or_v2!($INDEX, $ACC, rhs, bool, $OP)
+            }
+            ScalarValue::Null => {
+                // do nothing
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BOOL AND/BOOL OR is not expected to receive scalars of incompatible types {:?}",
+                    e
+                )))
+            }
+        })
+    }};
+}
+
+/// the bool_and of two scalar values
+pub fn booland(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+    bool_and_or!(lhs, rhs, bitand)
+}
+
+pub fn bool_and_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bool_and_or_v2!(index, accessor, s, bitand)
+}
+
+/// the bool_or of two scalar values
+pub fn boolor(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+    bool_and_or!(lhs, rhs, bitor)
+}
+
+pub fn bool_or_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bool_and_or_v2!(index, accessor, s, bitor)
+}
+
+/// BOOL_AND aggregate expression
+#[derive(Debug, Clone)]
+pub struct BoolAnd {
+    name: String,
+    pub data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
+}
+
+impl BoolAnd {
+    /// Create a new BOOL_AND aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type: bool_and_or_aggregate_data_type(data_type),
+            nullable: true,
+        }
+    }
+}
+
+impl AggregateExpr for BoolAnd {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BoolAndAccumulator::try_new(&self.data_type)?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "bool_and"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn row_accumulator_supported(&self) -> bool {
+        is_row_accumulator_support_dtype(&self.data_type)
+    }
+
+    fn supports_bounded_execution(&self) -> bool {
+        true
+    }
+
+    fn create_row_accumulator(
+        &self,
+        start_index: usize,
+    ) -> Result<Box<dyn RowAccumulator>> {
+        Ok(Box::new(BoolAndRowAccumulator::new(
+            start_index,
+            self.data_type.clone(),
+        )))
+    }
+
+    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+        Some(Arc::new(self.clone()))
+    }
+
+    fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BoolAndAccumulator::try_new(&self.data_type)?))
+    }
+}
+
+impl PartialEq<dyn Any> for BoolAnd {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.name == x.name
+                    && self.data_type == x.data_type
+                    && self.nullable == x.nullable
+                    && self.expr.eq(&x.expr)
+            })
+            .unwrap_or(false)
+    }
+}
+
+#[derive(Debug)]
+struct BoolAndAccumulator {
+    bool_and: ScalarValue,
+}
+
+impl BoolAndAccumulator {
+    /// new bool_and accumulator
+    pub fn try_new(data_type: &DataType) -> Result<Self> {
+        Ok(Self {
+            bool_and: ScalarValue::try_from(data_type)?,
+        })
+    }
+}
+
+impl Accumulator for BoolAndAccumulator {
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let values = &values[0];
+        let delta = &bool_and_batch(values)?;
+        self.bool_and = booland(&self.bool_and, delta)?;
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        self.update_batch(states)
+    }
+
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![self.bool_and.clone()])
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(self.bool_and.clone())
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self) - std::mem::size_of_val(&self.bool_and)
+            + self.bool_and.size()
+    }
+}
+
+#[derive(Debug)]
+struct BoolAndRowAccumulator {
+    index: usize,
+    datatype: DataType,
+}
+
+impl BoolAndRowAccumulator {
+    pub fn new(index: usize, datatype: DataType) -> Self {
+        Self { index, datatype }
+    }
+}
+
+impl RowAccumulator for BoolAndRowAccumulator {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let values = &values[0];
+        let delta = &bool_and_batch(values)?;
+        bool_and_row(self.index, accessor, delta)
+    }
+
+    fn update_scalar_values(
+        &mut self,
+        values: &[ScalarValue],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let value = &values[0];
+        bool_and_row(self.index, accessor, value)
+    }
+
+    fn update_scalar(
+        &mut self,
+        value: &ScalarValue,
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        bool_and_row(self.index, accessor, value)
+    }
+
+    fn merge_batch(
+        &mut self,
+        states: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        self.update_batch(states, accessor)
+    }
+
+    fn evaluate(&self, accessor: &RowAccessor) -> Result<ScalarValue> {
+        Ok(accessor.get_as_scalar(&self.datatype, self.index))
+    }
+
+    #[inline(always)]
+    fn state_index(&self) -> usize {
+        self.index
+    }
+}
+
+/// BOOL_OR aggregate expression
+#[derive(Debug, Clone)]
+pub struct BoolOr {
+    name: String,
+    pub data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
+}
+
+impl BoolOr {
+    /// Create a new BOOL_OR aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type: bool_and_or_aggregate_data_type(data_type),
+            nullable: true,
+        }
+    }
+}
+
+impl AggregateExpr for BoolOr {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BoolOrAccumulator::try_new(&self.data_type)?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "bool_or"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn row_accumulator_supported(&self) -> bool {
+        is_row_accumulator_support_dtype(&self.data_type)
+    }
+
+    fn supports_bounded_execution(&self) -> bool {
+        true

Review Comment:
   I don't think it supports bounded execution either. To support bounded execution accumulator should implement `retract_batch` method. (Theoretically, I don't think there is a way to implement `retract_batch` for bitwise operators.)Hence, I think we should not implement this method (it defaults to false).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #6276: feat: support bitwise and boolean aggregate functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #6276:
URL: https://github.com/apache/arrow-datafusion/pull/6276


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] izveigor commented on pull request #6276: feat: support bitwise and boolean aggregate functions

Posted by "izveigor (via GitHub)" <gi...@apache.org>.
izveigor commented on PR #6276:
URL: https://github.com/apache/arrow-datafusion/pull/6276#issuecomment-1540812983

   @alamb, @tustvold, @mingmwang I wonder if you have time to review this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] izveigor commented on pull request #6276: feat: support bitwise and boolean aggregate functions

Posted by "izveigor (via GitHub)" <gi...@apache.org>.
izveigor commented on PR #6276:
URL: https://github.com/apache/arrow-datafusion/pull/6276#issuecomment-1546247029

   @alamb I have updated the PR based on your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] izveigor commented on a diff in pull request #6276: feat: support bitwise and boolean aggregate functions

Posted by "izveigor (via GitHub)" <gi...@apache.org>.
izveigor commented on code in PR #6276:
URL: https://github.com/apache/arrow-datafusion/pull/6276#discussion_r1192748600


##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -0,0 +1,1103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{
+        ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array,
+        UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::ArrowNativeTypeOp;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+use std::ops::BitXor as BitXorImplementation;
+
+fn bit_and<T>(array: &PrimitiveArray<T>) -> Option<T::Native>

Review Comment:
   Done! https://github.com/apache/arrow-rs/pull/4210



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6276: feat: support bitwise and boolean aggregate functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6276:
URL: https://github.com/apache/arrow-datafusion/pull/6276#issuecomment-1541029167

   Thanks @izveigor  -- I will try and review this PR in the next day or two


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] izveigor commented on a diff in pull request #6276: feat: support bitwise and boolean aggregate functions

Posted by "izveigor (via GitHub)" <gi...@apache.org>.
izveigor commented on code in PR #6276:
URL: https://github.com/apache/arrow-datafusion/pull/6276#discussion_r1192468116


##########
datafusion/physical-expr/src/aggregate/bool_and_or.rs:
##########
@@ -0,0 +1,643 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{ArrayRef, BooleanArray},
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+
+fn bool_and(array: &BooleanArray) -> Option<bool> {
+    if array.null_count() == array.len() {
+        return None;
+    }
+    Some(array.false_count() == 0)
+}
+
+fn bool_or(array: &BooleanArray) -> Option<bool> {
+    if array.null_count() == array.len() {
+        return None;
+    }
+    Some(array.true_count() != 0)
+}
+
+// Bool and/Bool or aggregation can take Dictionary encode input but always produces unpacked
+// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
+// The reason bool and/bool or aggregate produces unpacked output because there is only one
+// bool and/bool or value per group; there is no needs to keep them Dictionary encode
+fn bool_and_or_aggregate_data_type(input_type: DataType) -> DataType {
+    if let DataType::Dictionary(_, value_type) = input_type {
+        *value_type
+    } else {
+        input_type
+    }
+}
+
+// returns the new value after bool_and/bool_or with the new values, taking nullability into account
+macro_rules! typed_bool_and_or_batch {
+    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
+        let array = downcast_value!($VALUES, $ARRAYTYPE);
+        let delta = $OP(array);
+        Ok(ScalarValue::$SCALAR(delta))
+    }};
+}
+
+// bool_and/bool_or the array and returns a ScalarValue of its corresponding type.
+macro_rules! bool_and_or_batch {
+    ($VALUES:expr, $OP:ident) => {{
+        match $VALUES.data_type() {
+            DataType::Boolean => {
+                typed_bool_and_or_batch!($VALUES, BooleanArray, Boolean, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "Bool and/Bool or is not expected to receive the type {e:?}"
+                )));
+            }
+        }
+    }};
+}
+
+/// dynamically-typed bool_and(array) -> ScalarValue
+fn bool_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bool_and_or_batch!(values, bool_and)
+}
+
+/// dynamically-typed bool_or(array) -> ScalarValue
+fn bool_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bool_and_or_batch!(values, bool_or)
+}
+
+// bool_and/bool_or of two scalar values.
+macro_rules! typed_bool_and_or {
+    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
+        ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
+            (None, None) => None,
+            (Some(a), None) => Some(*a),
+            (None, Some(b)) => Some(*b),
+            (Some(a), Some(b)) => Some((*a).$OP(*b)),
+        })
+    }};
+}
+
+// bool_and/bool_or of two scalar values.
+macro_rules! typed_bool_and_or_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $TYPE:ident, $OP:ident) => {{
+        paste::item! {
+            match $SCALAR {
+                None => {}
+                Some(v) => $ACC.[<$OP _ $TYPE>]($INDEX, *v as $TYPE)
+            }
+        }
+    }};
+}
+
+// bool_and/bool_or of two scalar values of the same type
+macro_rules! bool_and_or {
+    ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
+        Ok(match ($VALUE, $DELTA) {
+            (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => {
+                typed_bool_and_or!(lhs, rhs, Boolean, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BOOL AND/BOOL OR is not expected to receive scalars of incompatible types {:?}",
+                    e
+                )))
+            }
+        })
+    }};
+}
+
+macro_rules! bool_and_or_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $OP:ident) => {{
+        Ok(match $SCALAR {
+            ScalarValue::Boolean(rhs) => {
+                typed_bool_and_or_v2!($INDEX, $ACC, rhs, bool, $OP)
+            }
+            ScalarValue::Null => {
+                // do nothing
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BOOL AND/BOOL OR is not expected to receive scalars of incompatible types {:?}",
+                    e
+                )))
+            }
+        })
+    }};
+}
+
+/// the bool_and of two scalar values
+pub fn booland(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+    bool_and_or!(lhs, rhs, bitand)
+}
+
+pub fn bool_and_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bool_and_or_v2!(index, accessor, s, bitand)
+}
+
+/// the bool_or of two scalar values
+pub fn boolor(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+    bool_and_or!(lhs, rhs, bitor)
+}
+
+pub fn bool_or_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bool_and_or_v2!(index, accessor, s, bitor)
+}
+
+/// BOOL_AND aggregate expression
+#[derive(Debug, Clone)]
+pub struct BoolAnd {
+    name: String,
+    pub data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
+}
+
+impl BoolAnd {
+    /// Create a new BOOL_AND aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type: bool_and_or_aggregate_data_type(data_type),
+            nullable: true,
+        }
+    }
+}
+
+impl AggregateExpr for BoolAnd {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BoolAndAccumulator::try_new(&self.data_type)?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "bool_and"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn row_accumulator_supported(&self) -> bool {
+        is_row_accumulator_support_dtype(&self.data_type)
+    }
+
+    fn supports_bounded_execution(&self) -> bool {
+        true
+    }
+
+    fn create_row_accumulator(
+        &self,
+        start_index: usize,
+    ) -> Result<Box<dyn RowAccumulator>> {
+        Ok(Box::new(BoolAndRowAccumulator::new(
+            start_index,
+            self.data_type.clone(),
+        )))
+    }
+
+    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+        Some(Arc::new(self.clone()))
+    }
+
+    fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BoolAndAccumulator::try_new(&self.data_type)?))
+    }
+}
+
+impl PartialEq<dyn Any> for BoolAnd {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.name == x.name
+                    && self.data_type == x.data_type
+                    && self.nullable == x.nullable
+                    && self.expr.eq(&x.expr)
+            })
+            .unwrap_or(false)
+    }
+}
+
+#[derive(Debug)]
+struct BoolAndAccumulator {
+    bool_and: ScalarValue,
+}
+
+impl BoolAndAccumulator {
+    /// new bool_and accumulator
+    pub fn try_new(data_type: &DataType) -> Result<Self> {
+        Ok(Self {
+            bool_and: ScalarValue::try_from(data_type)?,
+        })
+    }
+}
+
+impl Accumulator for BoolAndAccumulator {
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let values = &values[0];
+        let delta = &bool_and_batch(values)?;
+        self.bool_and = booland(&self.bool_and, delta)?;
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        self.update_batch(states)
+    }
+
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![self.bool_and.clone()])
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(self.bool_and.clone())
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self) - std::mem::size_of_val(&self.bool_and)
+            + self.bool_and.size()
+    }
+}
+
+#[derive(Debug)]
+struct BoolAndRowAccumulator {
+    index: usize,
+    datatype: DataType,
+}
+
+impl BoolAndRowAccumulator {
+    pub fn new(index: usize, datatype: DataType) -> Self {
+        Self { index, datatype }
+    }
+}
+
+impl RowAccumulator for BoolAndRowAccumulator {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let values = &values[0];
+        let delta = &bool_and_batch(values)?;
+        bool_and_row(self.index, accessor, delta)
+    }
+
+    fn update_scalar_values(
+        &mut self,
+        values: &[ScalarValue],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let value = &values[0];
+        bool_and_row(self.index, accessor, value)
+    }
+
+    fn update_scalar(
+        &mut self,
+        value: &ScalarValue,
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        bool_and_row(self.index, accessor, value)
+    }
+
+    fn merge_batch(
+        &mut self,
+        states: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        self.update_batch(states, accessor)
+    }
+
+    fn evaluate(&self, accessor: &RowAccessor) -> Result<ScalarValue> {
+        Ok(accessor.get_as_scalar(&self.datatype, self.index))
+    }
+
+    #[inline(always)]
+    fn state_index(&self) -> usize {
+        self.index
+    }
+}
+
+/// BOOL_OR aggregate expression
+#[derive(Debug, Clone)]
+pub struct BoolOr {
+    name: String,
+    pub data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
+}
+
+impl BoolOr {
+    /// Create a new BOOL_OR aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type: bool_and_or_aggregate_data_type(data_type),
+            nullable: true,
+        }
+    }
+}
+
+impl AggregateExpr for BoolOr {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BoolOrAccumulator::try_new(&self.data_type)?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "bool_or"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn row_accumulator_supported(&self) -> bool {
+        is_row_accumulator_support_dtype(&self.data_type)
+    }
+
+    fn supports_bounded_execution(&self) -> bool {
+        true

Review Comment:
   I also want to mention that this situation concerns relatively DISTINCT cases (only BIT_XOR(DISTINCT) makes sense, the rest of DISTINCT cases will produce the same results as without them)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] izveigor commented on a diff in pull request #6276: feat: support bitwise and boolean aggregate functions

Posted by "izveigor (via GitHub)" <gi...@apache.org>.
izveigor commented on code in PR #6276:
URL: https://github.com/apache/arrow-datafusion/pull/6276#discussion_r1192748197


##########
datafusion/core/tests/sqllogictests/test_files/aggregate.slt:
##########
@@ -1390,6 +1429,63 @@ as values
  ('2021-01-01T05:11:10.432', 'Row 3');
 
 
+statement ok

Review Comment:
   I think it is a nice idea for bitwise aggregate functions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6276: feat: support bitwise and boolean aggregate functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6276:
URL: https://github.com/apache/arrow-datafusion/pull/6276#discussion_r1191024150


##########
datafusion/expr/src/type_coercion/aggregates.rs:
##########
@@ -395,6 +440,17 @@ pub fn avg_sum_type(arg_type: &DataType) -> Result<DataType> {
     }
 }
 
+pub fn is_bit_and_or_xor_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        arg_type if NUMERICS.contains(arg_type)
+    )

Review Comment:
   Is there any reason to use a `matches!` here? it seems like this could just be `NUMERICS.contains(arg_type)` 🤔 



##########
datafusion/core/tests/sqllogictests/test_files/aggregate.slt:
##########
@@ -1390,6 +1429,63 @@ as values
  ('2021-01-01T05:11:10.432', 'Row 3');
 
 
+statement ok

Review Comment:
   Given postgres also runs these queries what do you think about also adding them into a `pg_compat` test in https://github.com/apache/arrow-datafusion/tree/main/datafusion/core/tests/sqllogictests/test_files/pg_compat?



##########
datafusion/core/tests/sqllogictests/test_files/aggregate.slt:
##########
@@ -1390,6 +1429,63 @@ as values
  ('2021-01-01T05:11:10.432', 'Row 3');
 
 
+statement ok
+create table bit_aggregate_functions (
+  c1 SMALLINT NOT NULL,
+  c2 SMALLINT NOT NULL,
+  c3 SMALLINT,
+)
+as values
+  (5, 10, 11),
+  (33, 11, null),
+  (9, 12, null);
+
+# query_bit_and
+query III
+SELECT bit_and(c1), bit_and(c2), bit_and(c3) FROM bit_aggregate_functions
+----
+1 8 11

Review Comment:
   I double checked this is consistent with postgres 👍 



##########
docs/source/user-guide/expressions.md:
##########
@@ -216,6 +216,11 @@ Unlike to some databases the math functions in Datafusion works the same way as
 | approx_median(expr)                                               | Calculates an approximation of the median for `expr`.                                   |
 | approx_percentile_cont(expr, percentile)                          | Calculates an approximation of the specified `percentile` for `expr`.                   |
 | approx_percentile_cont_with_weight(expr, weight_expr, percentile) | Calculates an approximation of the specified `percentile` for `expr` and `weight_expr`. |
+| bit_and(expr)                                                     | Computes the bitwise AND of all non-null input values for `expr`.                       |

Review Comment:
   Thank you for also updating the documentation



##########
datafusion/physical-expr/src/aggregate/bool_and_or.rs:
##########
@@ -0,0 +1,643 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{ArrayRef, BooleanArray},
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+
+fn bool_and(array: &BooleanArray) -> Option<bool> {
+    if array.null_count() == array.len() {
+        return None;
+    }
+    Some(array.false_count() == 0)
+}
+
+fn bool_or(array: &BooleanArray) -> Option<bool> {
+    if array.null_count() == array.len() {
+        return None;
+    }
+    Some(array.true_count() != 0)
+}
+
+// Bool and/Bool or aggregation can take Dictionary encode input but always produces unpacked
+// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
+// The reason bool and/bool or aggregate produces unpacked output because there is only one
+// bool and/bool or value per group; there is no needs to keep them Dictionary encode
+fn bool_and_or_aggregate_data_type(input_type: DataType) -> DataType {
+    if let DataType::Dictionary(_, value_type) = input_type {
+        *value_type
+    } else {
+        input_type
+    }
+}
+
+// returns the new value after bool_and/bool_or with the new values, taking nullability into account
+macro_rules! typed_bool_and_or_batch {
+    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
+        let array = downcast_value!($VALUES, $ARRAYTYPE);
+        let delta = $OP(array);
+        Ok(ScalarValue::$SCALAR(delta))
+    }};
+}
+
+// bool_and/bool_or the array and returns a ScalarValue of its corresponding type.
+macro_rules! bool_and_or_batch {
+    ($VALUES:expr, $OP:ident) => {{
+        match $VALUES.data_type() {
+            DataType::Boolean => {
+                typed_bool_and_or_batch!($VALUES, BooleanArray, Boolean, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "Bool and/Bool or is not expected to receive the type {e:?}"
+                )));
+            }
+        }
+    }};
+}
+
+/// dynamically-typed bool_and(array) -> ScalarValue
+fn bool_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bool_and_or_batch!(values, bool_and)
+}
+
+/// dynamically-typed bool_or(array) -> ScalarValue
+fn bool_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bool_and_or_batch!(values, bool_or)
+}
+
+// bool_and/bool_or of two scalar values.
+macro_rules! typed_bool_and_or {
+    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{

Review Comment:
   Given that you added `ScalarValue::bitand` etc, maybe you could also add `ScalarValue::and` ? I think the overall code complexity would be the same but then the logic wouldn't be hidden in macros inside the aggregator
   
   A similar comment applies to ScalarValue:or etc



##########
datafusion/physical-expr/src/aggregate/bool_and_or.rs:
##########
@@ -0,0 +1,643 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{ArrayRef, BooleanArray},
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+
+fn bool_and(array: &BooleanArray) -> Option<bool> {

Review Comment:
   👍  these are good implementations



##########
datafusion/physical-expr/src/aggregate/bool_and_or.rs:
##########
@@ -0,0 +1,643 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{ArrayRef, BooleanArray},
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+
+fn bool_and(array: &BooleanArray) -> Option<bool> {
+    if array.null_count() == array.len() {
+        return None;
+    }
+    Some(array.false_count() == 0)
+}
+
+fn bool_or(array: &BooleanArray) -> Option<bool> {
+    if array.null_count() == array.len() {
+        return None;
+    }
+    Some(array.true_count() != 0)
+}
+
+// Bool and/Bool or aggregation can take Dictionary encode input but always produces unpacked
+// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
+// The reason bool and/bool or aggregate produces unpacked output because there is only one
+// bool and/bool or value per group; there is no needs to keep them Dictionary encode
+fn bool_and_or_aggregate_data_type(input_type: DataType) -> DataType {
+    if let DataType::Dictionary(_, value_type) = input_type {
+        *value_type
+    } else {
+        input_type
+    }
+}
+
+// returns the new value after bool_and/bool_or with the new values, taking nullability into account
+macro_rules! typed_bool_and_or_batch {
+    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
+        let array = downcast_value!($VALUES, $ARRAYTYPE);
+        let delta = $OP(array);
+        Ok(ScalarValue::$SCALAR(delta))
+    }};
+}
+
+// bool_and/bool_or the array and returns a ScalarValue of its corresponding type.
+macro_rules! bool_and_or_batch {
+    ($VALUES:expr, $OP:ident) => {{
+        match $VALUES.data_type() {
+            DataType::Boolean => {
+                typed_bool_and_or_batch!($VALUES, BooleanArray, Boolean, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "Bool and/Bool or is not expected to receive the type {e:?}"
+                )));
+            }
+        }
+    }};
+}
+
+/// dynamically-typed bool_and(array) -> ScalarValue
+fn bool_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bool_and_or_batch!(values, bool_and)
+}
+
+/// dynamically-typed bool_or(array) -> ScalarValue
+fn bool_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bool_and_or_batch!(values, bool_or)
+}
+
+// bool_and/bool_or of two scalar values.
+macro_rules! typed_bool_and_or {
+    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
+        ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
+            (None, None) => None,
+            (Some(a), None) => Some(*a),
+            (None, Some(b)) => Some(*b),
+            (Some(a), Some(b)) => Some((*a).$OP(*b)),
+        })
+    }};
+}
+
+// bool_and/bool_or of two scalar values.
+macro_rules! typed_bool_and_or_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $TYPE:ident, $OP:ident) => {{
+        paste::item! {
+            match $SCALAR {
+                None => {}
+                Some(v) => $ACC.[<$OP _ $TYPE>]($INDEX, *v as $TYPE)
+            }
+        }
+    }};
+}
+
+// bool_and/bool_or of two scalar values of the same type
+macro_rules! bool_and_or {
+    ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
+        Ok(match ($VALUE, $DELTA) {
+            (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => {
+                typed_bool_and_or!(lhs, rhs, Boolean, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BOOL AND/BOOL OR is not expected to receive scalars of incompatible types {:?}",
+                    e
+                )))
+            }
+        })
+    }};
+}
+
+macro_rules! bool_and_or_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $OP:ident) => {{
+        Ok(match $SCALAR {
+            ScalarValue::Boolean(rhs) => {
+                typed_bool_and_or_v2!($INDEX, $ACC, rhs, bool, $OP)
+            }
+            ScalarValue::Null => {
+                // do nothing
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BOOL AND/BOOL OR is not expected to receive scalars of incompatible types {:?}",
+                    e
+                )))
+            }
+        })
+    }};
+}
+
+/// the bool_and of two scalar values
+pub fn booland(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+    bool_and_or!(lhs, rhs, bitand)
+}
+
+pub fn bool_and_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bool_and_or_v2!(index, accessor, s, bitand)
+}
+
+/// the bool_or of two scalar values
+pub fn boolor(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+    bool_and_or!(lhs, rhs, bitor)
+}
+
+pub fn bool_or_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bool_and_or_v2!(index, accessor, s, bitor)
+}
+
+/// BOOL_AND aggregate expression
+#[derive(Debug, Clone)]
+pub struct BoolAnd {
+    name: String,
+    pub data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
+}
+
+impl BoolAnd {
+    /// Create a new BOOL_AND aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type: bool_and_or_aggregate_data_type(data_type),
+            nullable: true,
+        }
+    }
+}
+
+impl AggregateExpr for BoolAnd {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BoolAndAccumulator::try_new(&self.data_type)?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "bool_and"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn row_accumulator_supported(&self) -> bool {
+        is_row_accumulator_support_dtype(&self.data_type)
+    }
+
+    fn supports_bounded_execution(&self) -> bool {
+        true

Review Comment:
   As I mention below I am not sure this aggregate supports bounded execution



##########
datafusion/physical-expr/src/aggregate/bool_and_or.rs:
##########
@@ -0,0 +1,643 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{ArrayRef, BooleanArray},
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+
+fn bool_and(array: &BooleanArray) -> Option<bool> {
+    if array.null_count() == array.len() {
+        return None;
+    }
+    Some(array.false_count() == 0)
+}
+
+fn bool_or(array: &BooleanArray) -> Option<bool> {
+    if array.null_count() == array.len() {
+        return None;
+    }
+    Some(array.true_count() != 0)
+}
+
+// Bool and/Bool or aggregation can take Dictionary encode input but always produces unpacked

Review Comment:
   The idea of dictionary packing booleans  seems bad to me (the encoded form would perform worse). I think we should not support this case. If we want to keep this code, can we please add a test ?
   
   You can make such an array like 
   
   ```
   select arrow_cast(bool_col, 'Dictionary(Boolean, Int8)`)
   ```
   
   



##########
datafusion/physical-expr/src/aggregate/bool_and_or.rs:
##########
@@ -0,0 +1,643 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{ArrayRef, BooleanArray},
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+
+fn bool_and(array: &BooleanArray) -> Option<bool> {
+    if array.null_count() == array.len() {
+        return None;
+    }
+    Some(array.false_count() == 0)
+}
+
+fn bool_or(array: &BooleanArray) -> Option<bool> {
+    if array.null_count() == array.len() {
+        return None;
+    }
+    Some(array.true_count() != 0)
+}
+
+// Bool and/Bool or aggregation can take Dictionary encode input but always produces unpacked
+// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
+// The reason bool and/bool or aggregate produces unpacked output because there is only one
+// bool and/bool or value per group; there is no needs to keep them Dictionary encode
+fn bool_and_or_aggregate_data_type(input_type: DataType) -> DataType {
+    if let DataType::Dictionary(_, value_type) = input_type {
+        *value_type
+    } else {
+        input_type
+    }
+}
+
+// returns the new value after bool_and/bool_or with the new values, taking nullability into account
+macro_rules! typed_bool_and_or_batch {
+    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
+        let array = downcast_value!($VALUES, $ARRAYTYPE);
+        let delta = $OP(array);
+        Ok(ScalarValue::$SCALAR(delta))
+    }};
+}
+
+// bool_and/bool_or the array and returns a ScalarValue of its corresponding type.
+macro_rules! bool_and_or_batch {
+    ($VALUES:expr, $OP:ident) => {{
+        match $VALUES.data_type() {
+            DataType::Boolean => {
+                typed_bool_and_or_batch!($VALUES, BooleanArray, Boolean, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "Bool and/Bool or is not expected to receive the type {e:?}"
+                )));
+            }
+        }
+    }};
+}
+
+/// dynamically-typed bool_and(array) -> ScalarValue
+fn bool_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bool_and_or_batch!(values, bool_and)
+}
+
+/// dynamically-typed bool_or(array) -> ScalarValue
+fn bool_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bool_and_or_batch!(values, bool_or)
+}
+
+// bool_and/bool_or of two scalar values.
+macro_rules! typed_bool_and_or {
+    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
+        ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
+            (None, None) => None,
+            (Some(a), None) => Some(*a),
+            (None, Some(b)) => Some(*b),
+            (Some(a), Some(b)) => Some((*a).$OP(*b)),
+        })
+    }};
+}
+
+// bool_and/bool_or of two scalar values.
+macro_rules! typed_bool_and_or_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $TYPE:ident, $OP:ident) => {{
+        paste::item! {
+            match $SCALAR {
+                None => {}
+                Some(v) => $ACC.[<$OP _ $TYPE>]($INDEX, *v as $TYPE)
+            }
+        }
+    }};
+}
+
+// bool_and/bool_or of two scalar values of the same type
+macro_rules! bool_and_or {
+    ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
+        Ok(match ($VALUE, $DELTA) {
+            (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => {
+                typed_bool_and_or!(lhs, rhs, Boolean, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BOOL AND/BOOL OR is not expected to receive scalars of incompatible types {:?}",
+                    e
+                )))
+            }
+        })
+    }};
+}
+
+macro_rules! bool_and_or_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $OP:ident) => {{
+        Ok(match $SCALAR {
+            ScalarValue::Boolean(rhs) => {
+                typed_bool_and_or_v2!($INDEX, $ACC, rhs, bool, $OP)
+            }
+            ScalarValue::Null => {
+                // do nothing
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BOOL AND/BOOL OR is not expected to receive scalars of incompatible types {:?}",
+                    e
+                )))
+            }
+        })
+    }};
+}
+
+/// the bool_and of two scalar values
+pub fn booland(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+    bool_and_or!(lhs, rhs, bitand)
+}
+
+pub fn bool_and_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bool_and_or_v2!(index, accessor, s, bitand)
+}
+
+/// the bool_or of two scalar values
+pub fn boolor(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+    bool_and_or!(lhs, rhs, bitor)
+}
+
+pub fn bool_or_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bool_and_or_v2!(index, accessor, s, bitor)
+}
+
+/// BOOL_AND aggregate expression
+#[derive(Debug, Clone)]
+pub struct BoolAnd {
+    name: String,
+    pub data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
+}
+
+impl BoolAnd {
+    /// Create a new BOOL_AND aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type: bool_and_or_aggregate_data_type(data_type),
+            nullable: true,
+        }
+    }
+}
+
+impl AggregateExpr for BoolAnd {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BoolAndAccumulator::try_new(&self.data_type)?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "bool_and"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn row_accumulator_supported(&self) -> bool {
+        is_row_accumulator_support_dtype(&self.data_type)
+    }
+
+    fn supports_bounded_execution(&self) -> bool {
+        true
+    }
+
+    fn create_row_accumulator(
+        &self,
+        start_index: usize,
+    ) -> Result<Box<dyn RowAccumulator>> {
+        Ok(Box::new(BoolAndRowAccumulator::new(
+            start_index,
+            self.data_type.clone(),
+        )))
+    }
+
+    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+        Some(Arc::new(self.clone()))
+    }
+
+    fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BoolAndAccumulator::try_new(&self.data_type)?))
+    }
+}
+
+impl PartialEq<dyn Any> for BoolAnd {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.name == x.name
+                    && self.data_type == x.data_type
+                    && self.nullable == x.nullable
+                    && self.expr.eq(&x.expr)
+            })
+            .unwrap_or(false)
+    }
+}
+
+#[derive(Debug)]
+struct BoolAndAccumulator {
+    bool_and: ScalarValue,
+}
+
+impl BoolAndAccumulator {
+    /// new bool_and accumulator
+    pub fn try_new(data_type: &DataType) -> Result<Self> {
+        Ok(Self {
+            bool_and: ScalarValue::try_from(data_type)?,
+        })
+    }
+}
+
+impl Accumulator for BoolAndAccumulator {
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let values = &values[0];
+        let delta = &bool_and_batch(values)?;
+        self.bool_and = booland(&self.bool_and, delta)?;
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        self.update_batch(states)
+    }
+
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![self.bool_and.clone()])
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(self.bool_and.clone())
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self) - std::mem::size_of_val(&self.bool_and)
+            + self.bool_and.size()
+    }
+}
+
+#[derive(Debug)]
+struct BoolAndRowAccumulator {
+    index: usize,
+    datatype: DataType,
+}
+
+impl BoolAndRowAccumulator {
+    pub fn new(index: usize, datatype: DataType) -> Self {
+        Self { index, datatype }
+    }
+}
+
+impl RowAccumulator for BoolAndRowAccumulator {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let values = &values[0];
+        let delta = &bool_and_batch(values)?;
+        bool_and_row(self.index, accessor, delta)
+    }
+
+    fn update_scalar_values(
+        &mut self,
+        values: &[ScalarValue],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let value = &values[0];
+        bool_and_row(self.index, accessor, value)
+    }
+
+    fn update_scalar(
+        &mut self,
+        value: &ScalarValue,
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        bool_and_row(self.index, accessor, value)
+    }
+
+    fn merge_batch(
+        &mut self,
+        states: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        self.update_batch(states, accessor)
+    }
+
+    fn evaluate(&self, accessor: &RowAccessor) -> Result<ScalarValue> {
+        Ok(accessor.get_as_scalar(&self.datatype, self.index))
+    }
+
+    #[inline(always)]
+    fn state_index(&self) -> usize {
+        self.index
+    }
+}
+
+/// BOOL_OR aggregate expression
+#[derive(Debug, Clone)]
+pub struct BoolOr {
+    name: String,
+    pub data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
+}
+
+impl BoolOr {
+    /// Create a new BOOL_OR aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type: bool_and_or_aggregate_data_type(data_type),
+            nullable: true,
+        }
+    }
+}
+
+impl AggregateExpr for BoolOr {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BoolOrAccumulator::try_new(&self.data_type)?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "bool_or"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn row_accumulator_supported(&self) -> bool {
+        is_row_accumulator_support_dtype(&self.data_type)
+    }
+
+    fn supports_bounded_execution(&self) -> bool {
+        true

Review Comment:
   I am not sure this  aggregator actually supports bounded execution (that are used in sliding window functions) -- to do so I think it would need to remember how many tries it has seen perhaps. 
   
   @mustafasrepo or @ozankabak  can you double check this?



##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -0,0 +1,1103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{
+        ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array,
+        UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::ArrowNativeTypeOp;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+use std::ops::BitXor as BitXorImplementation;
+
+fn bit_and<T>(array: &PrimitiveArray<T>) -> Option<T::Native>

Review Comment:
   I think we should strive to move these kernels into arrow-rs eventually. 
   
   Can you please file a ticket in arrow-rs to do so (or I can do to so if you prefer) and leave a comment here with a reference to that ticket?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org