You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/15 14:19:58 UTC

[arrow-datafusion] branch main updated: feat: support bitwise and boolean aggregate functions (#6276)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 93ff57e6e0 feat: support bitwise and boolean aggregate functions (#6276)
93ff57e6e0 is described below

commit 93ff57e6e080b7cdf7e598784c81587dcd11416f
Author: Igor Izvekov <iz...@gmail.com>
AuthorDate: Mon May 15 17:19:51 2023 +0300

    feat: support bitwise and boolean aggregate functions (#6276)
    
    * feat: bitwise and boolean aggregate functions
    
    * feat: SQL implementation
    
    * feat: proto
    
    * fix: import modules in proto
    
    * fix: sqllogictests
    
    * feat: docs
    
    * fix: clippy
    
    * refactor: bit_and_or_xor.rs and bool_and_or.rs
    
    * feat: macro_rules for bitwise aggregate operations
    
    * feat: bitwise aggregate functions in pg_compat
---
 datafusion/common/src/scalar.rs                    | 108 +++
 .../tests/sqllogictests/test_files/aggregate.slt   |  96 +++
 .../test_files/pg_compat/pg_compat_simple.slt      |  26 +-
 .../test_files/pg_compat/pg_compat_window.slt      | 828 +++++++++---------
 datafusion/expr/src/aggregate_function.rs          |  27 +
 datafusion/expr/src/type_coercion/aggregates.rs    |  65 +-
 .../physical-expr/src/aggregate/bit_and_or_xor.rs  | 934 +++++++++++++++++++++
 .../physical-expr/src/aggregate/bool_and_or.rs     | 582 +++++++++++++
 datafusion/physical-expr/src/aggregate/build_in.rs | 150 +++-
 datafusion/physical-expr/src/aggregate/mod.rs      |   2 +
 .../physical-expr/src/aggregate/row_accumulator.rs |   3 +-
 datafusion/physical-expr/src/expressions/mod.rs    |   2 +
 datafusion/proto/proto/datafusion.proto            |   5 +
 datafusion/proto/src/generated/pbjson.rs           |  15 +
 datafusion/proto/src/generated/prost.rs            |  15 +
 datafusion/proto/src/logical_plan/from_proto.rs    |   5 +
 datafusion/proto/src/logical_plan/to_proto.rs      |  10 +
 datafusion/proto/src/physical_plan/to_proto.rs     |  13 +-
 datafusion/row/src/accessor.rs                     |  54 ++
 docs/source/user-guide/expressions.md              |   5 +
 docs/source/user-guide/sql/aggregate_functions.md  |  70 ++
 21 files changed, 2587 insertions(+), 428 deletions(-)

diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index bd2cb34515..16a88c283d 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -650,6 +650,21 @@ macro_rules! primitive_right {
             "Can not divide an uninitialized value to a non-floating point value",
         )))
     };
+    ($TERM:expr, &, $SCALAR:ident) => {
+        Ok(ScalarValue::$SCALAR(Some($TERM)))
+    };
+    ($TERM:expr, |, $SCALAR:ident) => {
+        Ok(ScalarValue::$SCALAR(Some($TERM)))
+    };
+    ($TERM:expr, ^, $SCALAR:ident) => {
+        Ok(ScalarValue::$SCALAR(Some($TERM)))
+    };
+    ($TERM:expr, &&, $SCALAR:ident) => {
+        Ok(ScalarValue::$SCALAR(Some($TERM)))
+    };
+    ($TERM:expr, ||, $SCALAR:ident) => {
+        Ok(ScalarValue::$SCALAR(Some($TERM)))
+    };
 }
 
 macro_rules! unsigned_subtraction_error {
@@ -756,6 +771,74 @@ macro_rules! impl_op {
             _ => impl_op_arithmetic!($LHS, $RHS, -)
         }
     };
+    ($LHS:expr, $RHS:expr, &) => {
+        impl_bit_op_arithmetic!($LHS, $RHS, &)
+    };
+    ($LHS:expr, $RHS:expr, |) => {
+        impl_bit_op_arithmetic!($LHS, $RHS, |)
+    };
+    ($LHS:expr, $RHS:expr, ^) => {
+        impl_bit_op_arithmetic!($LHS, $RHS, ^)
+    };
+    ($LHS:expr, $RHS:expr, &&) => {
+        impl_bool_op_arithmetic!($LHS, $RHS, &&)
+    };
+    ($LHS:expr, $RHS:expr, ||) => {
+        impl_bool_op_arithmetic!($LHS, $RHS, ||)
+    };
+}
+
+macro_rules! impl_bit_op_arithmetic {
+    ($LHS:expr, $RHS:expr, $OPERATION:tt) => {
+        match ($LHS, $RHS) {
+            (ScalarValue::UInt64(lhs), ScalarValue::UInt64(rhs)) => {
+                primitive_op!(lhs, rhs, UInt64, $OPERATION)
+            }
+            (ScalarValue::Int64(lhs), ScalarValue::Int64(rhs)) => {
+                primitive_op!(lhs, rhs, Int64, $OPERATION)
+            }
+            (ScalarValue::UInt32(lhs), ScalarValue::UInt32(rhs)) => {
+                primitive_op!(lhs, rhs, UInt32, $OPERATION)
+            }
+            (ScalarValue::Int32(lhs), ScalarValue::Int32(rhs)) => {
+                primitive_op!(lhs, rhs, Int32, $OPERATION)
+            }
+            (ScalarValue::UInt16(lhs), ScalarValue::UInt16(rhs)) => {
+                primitive_op!(lhs, rhs, UInt16, $OPERATION)
+            }
+            (ScalarValue::Int16(lhs), ScalarValue::Int16(rhs)) => {
+                primitive_op!(lhs, rhs, Int16, $OPERATION)
+            }
+            (ScalarValue::UInt8(lhs), ScalarValue::UInt8(rhs)) => {
+                primitive_op!(lhs, rhs, UInt8, $OPERATION)
+            }
+            (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
+                primitive_op!(lhs, rhs, Int8, $OPERATION)
+            }
+            _ => Err(DataFusionError::Internal(format!(
+                "Operator {} is not implemented for types {:?} and {:?}",
+                stringify!($OPERATION),
+                $LHS,
+                $RHS
+            ))),
+        }
+    };
+}
+
+macro_rules! impl_bool_op_arithmetic {
+    ($LHS:expr, $RHS:expr, $OPERATION:tt) => {
+        match ($LHS, $RHS) {
+            (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => {
+                primitive_op!(lhs, rhs, Boolean, $OPERATION)
+            }
+            _ => Err(DataFusionError::Internal(format!(
+                "Operator {} is not implemented for types {:?} and {:?}",
+                stringify!($OPERATION),
+                $LHS,
+                $RHS
+            ))),
+        }
+    };
 }
 
 macro_rules! impl_op_arithmetic {
@@ -1965,6 +2048,31 @@ impl ScalarValue {
         impl_checked_op!(self, rhs, checked_sub, -)
     }
 
+    pub fn and<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
+        let rhs = other.borrow();
+        impl_op!(self, rhs, &&)
+    }
+
+    pub fn or<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
+        let rhs = other.borrow();
+        impl_op!(self, rhs, ||)
+    }
+
+    pub fn bitand<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
+        let rhs = other.borrow();
+        impl_op!(self, rhs, &)
+    }
+
+    pub fn bitor<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
+        let rhs = other.borrow();
+        impl_op!(self, rhs, |)
+    }
+
+    pub fn bitxor<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
+        let rhs = other.borrow();
+        impl_op!(self, rhs, ^)
+    }
+
     pub fn is_unsigned(&self) -> bool {
         matches!(
             self,
diff --git a/datafusion/core/tests/sqllogictests/test_files/aggregate.slt b/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
index 0c0a2c49b4..47d7d031ce 100644
--- a/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
@@ -81,6 +81,24 @@ SELECT avg(c12) FROM aggregate_test_100
 ----
 0.508972509913
 
+# csv_query_bit_and
+query IIIII
+SELECT bit_and(c5), bit_and(c6), bit_and(c7), bit_and(c8), bit_and(c9) FROM aggregate_test_100
+----
+0 0 0 0 0
+
+# csv_query_bit_or
+query IIIII
+SELECT bit_or(c5), bit_or(c6), bit_or(c7), bit_or(c8), bit_or(c9) FROM aggregate_test_100
+----
+-1 -1 255 65535 4294967295
+
+# csv_query_bit_xor
+query IIIII
+SELECT bit_xor(c5), bit_xor(c6), bit_xor(c7), bit_xor(c8), bit_xor(c9) FROM aggregate_test_100
+----
+1632751011 5960911605712039654 148 54789 169634700
+
 # csv_query_covariance_1
 query R
 SELECT covar_pop(c2, c12) FROM aggregate_test_100
@@ -1372,6 +1390,27 @@ SELECT avg(column1), column2 from the_nulls group by column2 order by column2;
 NULL 1
 NULL 2
 
+# bit_and should be null
+query II
+SELECT bit_and(column1), column2 from the_nulls group by column2 order by column2;
+----
+NULL 1
+NULL 2
+
+# bit_or should be null
+query II
+SELECT bit_or(column1), column2 from the_nulls group by column2 order by column2;
+----
+NULL 1
+NULL 2
+
+# bit_xor should be null
+query II
+SELECT bit_xor(column1), column2 from the_nulls group by column2 order by column2;
+----
+NULL 1
+NULL 2
+
 # min should be null
 query II
 SELECT min(column1), column2 from the_nulls group by column2 order by column2;
@@ -1406,6 +1445,63 @@ as values
  (null, 'Row 2'),
  ('2021-01-01T05:11:10.432', 'Row 3');
 
+statement ok
+create table bit_aggregate_functions (
+  c1 SMALLINT NOT NULL,
+  c2 SMALLINT NOT NULL,
+  c3 SMALLINT,
+)
+as values
+  (5, 10, 11),
+  (33, 11, null),
+  (9, 12, null);
+
+# query_bit_and
+query III
+SELECT bit_and(c1), bit_and(c2), bit_and(c3) FROM bit_aggregate_functions
+----
+1 8 11
+
+# query_bit_or
+query III
+SELECT bit_or(c1), bit_or(c2), bit_or(c3) FROM bit_aggregate_functions
+----
+45 15 11
+
+# query_bit_xor
+query III
+SELECT bit_xor(c1), bit_xor(c2), bit_xor(c3) FROM bit_aggregate_functions
+----
+45 13 11
+
+statement ok
+create table bool_aggregate_functions (
+  c1 boolean not null,
+  c2 boolean not null,
+  c3 boolean not null,
+  c4 boolean not null,
+  c5 boolean,
+  c6 boolean,
+  c7 boolean,
+  c8 boolean,
+)
+as values
+  (true, true, false, false, true, true, null, null),
+  (true, false, true, false, false, null, false, null),
+  (true, true, false, false, null, true, false, null);
+
+# query_bool_and
+query BBBBBBBB
+SELECT bool_and(c1), bool_and(c2), bool_and(c3), bool_and(c4), bool_and(c5), bool_and(c6), bool_and(c7), bool_and(c8) FROM bool_aggregate_functions
+----
+true false false false false true false NULL
+
+# query_bool_or
+query BBBBBBBB
+SELECT bool_or(c1), bool_or(c2), bool_or(c3), bool_or(c4), bool_or(c5), bool_or(c6), bool_or(c7), bool_or(c8) FROM bool_aggregate_functions
+----
+true true true false true true false NULL
+
 statement ok
 create table t as
 select
diff --git a/datafusion/core/tests/sqllogictests/test_files/pg_compat/pg_compat_simple.slt b/datafusion/core/tests/sqllogictests/test_files/pg_compat/pg_compat_simple.slt
index 5e9eb07f77..b01ea73c80 100644
--- a/datafusion/core/tests/sqllogictests/test_files/pg_compat/pg_compat_simple.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/pg_compat/pg_compat_simple.slt
@@ -325,36 +325,42 @@ AS tbl(int_col, float_col, negative_col, summation);
 10 20 -30 4
 
 
-query IIRIII
+query IIRIIIIII
 SELECT
   count(*) AS count_all,
   count(c3) AS count_c3,
   avg(c3) AS avg,
   sum(c3) AS sum,
   max(c3) AS max,
-  min(c3) AS min
+  min(c3) AS min,
+  bit_and(c3) as bit_and,
+  bit_or(c3) as bit_or,
+  bit_xor(c3) as bit_xor
 FROM aggregate_test_100_by_sql;
 ----
-100 100 7.81 781 125 -117
+100 100 7.81 781 125 -117 0 -1 -61
 
 
-query IIRIII
+query IIRIIIIII
 select
   c2,
   sum(c3) sum_c3,
   avg(c3) avg_c3,
   max(c3) max_c3,
   min(c3) min_c3,
-  count(c3) count_c3
+  count(c3) count_c3,
+  bit_and(c3) bit_and_c3,
+  bit_or(c3) bit_or_c3,
+  bit_xor(c3) bit_xor_c3
 from aggregate_test_100_by_sql
 group by c2
 order by c2;
 ----
-1 367 16.681818181818 125 -99 22
-2 184 8.363636363636 122 -117 22
-3 395 20.789473684211 123 -101 19
-4 29 1.260869565217 123 -117 23
-5 -194 -13.857142857143 118 -101 14
+1 367 16.681818181818 125 -99 22 0 -1 -47
+2 184 8.363636363636 122 -117 22 0 -1 -2
+3 395 20.789473684211 123 -101 19 0 -1 101
+4 29 1.260869565217 123 -117 23 0 -1 15
+5 -194 -13.857142857143 118 -101 14 0 -1 -122
 
 
 query I
diff --git a/datafusion/core/tests/sqllogictests/test_files/pg_compat/pg_compat_window.slt b/datafusion/core/tests/sqllogictests/test_files/pg_compat/pg_compat_window.slt
index 7218b7d881..cec51d4720 100644
--- a/datafusion/core/tests/sqllogictests/test_files/pg_compat/pg_compat_window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/pg_compat/pg_compat_window.slt
@@ -301,117 +301,120 @@ ORDER BY c9;
 4268716378 100 NULL 4229654142 28774375 4268716378 4268716378 4268716378 63044568 NULL
 
 
-query IIRIII
+query IIRIIIIII
 SELECT
   row_number() OVER () AS row_number,
   count(c3) OVER () AS count_c3,
   avg(c3) OVER () AS avg,
   sum(c3) OVER () AS sum,
   max(c3) OVER () AS max,
-  min(c3) OVER () AS min
+  min(c3) OVER () AS min,
+  bit_and(c3) OVER () AS bit_and,
+  bit_or(c3) OVER () AS bit_or,
+  bit_xor(c3) OVER () AS bit_xor
 FROM aggregate_test_100_by_sql
 ORDER BY row_number;
 ----
-1 100 7.81 781 125 -117
-2 100 7.81 781 125 -117
-3 100 7.81 781 125 -117
-4 100 7.81 781 125 -117
-5 100 7.81 781 125 -117
-6 100 7.81 781 125 -117
-7 100 7.81 781 125 -117
-8 100 7.81 781 125 -117
-9 100 7.81 781 125 -117
-10 100 7.81 781 125 -117
-11 100 7.81 781 125 -117
-12 100 7.81 781 125 -117
-13 100 7.81 781 125 -117
-14 100 7.81 781 125 -117
-15 100 7.81 781 125 -117
-16 100 7.81 781 125 -117
-17 100 7.81 781 125 -117
-18 100 7.81 781 125 -117
-19 100 7.81 781 125 -117
-20 100 7.81 781 125 -117
-21 100 7.81 781 125 -117
-22 100 7.81 781 125 -117
-23 100 7.81 781 125 -117
-24 100 7.81 781 125 -117
-25 100 7.81 781 125 -117
-26 100 7.81 781 125 -117
-27 100 7.81 781 125 -117
-28 100 7.81 781 125 -117
-29 100 7.81 781 125 -117
-30 100 7.81 781 125 -117
-31 100 7.81 781 125 -117
-32 100 7.81 781 125 -117
-33 100 7.81 781 125 -117
-34 100 7.81 781 125 -117
-35 100 7.81 781 125 -117
-36 100 7.81 781 125 -117
-37 100 7.81 781 125 -117
-38 100 7.81 781 125 -117
-39 100 7.81 781 125 -117
-40 100 7.81 781 125 -117
-41 100 7.81 781 125 -117
-42 100 7.81 781 125 -117
-43 100 7.81 781 125 -117
-44 100 7.81 781 125 -117
-45 100 7.81 781 125 -117
-46 100 7.81 781 125 -117
-47 100 7.81 781 125 -117
-48 100 7.81 781 125 -117
-49 100 7.81 781 125 -117
-50 100 7.81 781 125 -117
-51 100 7.81 781 125 -117
-52 100 7.81 781 125 -117
-53 100 7.81 781 125 -117
-54 100 7.81 781 125 -117
-55 100 7.81 781 125 -117
-56 100 7.81 781 125 -117
-57 100 7.81 781 125 -117
-58 100 7.81 781 125 -117
-59 100 7.81 781 125 -117
-60 100 7.81 781 125 -117
-61 100 7.81 781 125 -117
-62 100 7.81 781 125 -117
-63 100 7.81 781 125 -117
-64 100 7.81 781 125 -117
-65 100 7.81 781 125 -117
-66 100 7.81 781 125 -117
-67 100 7.81 781 125 -117
-68 100 7.81 781 125 -117
-69 100 7.81 781 125 -117
-70 100 7.81 781 125 -117
-71 100 7.81 781 125 -117
-72 100 7.81 781 125 -117
-73 100 7.81 781 125 -117
-74 100 7.81 781 125 -117
-75 100 7.81 781 125 -117
-76 100 7.81 781 125 -117
-77 100 7.81 781 125 -117
-78 100 7.81 781 125 -117
-79 100 7.81 781 125 -117
-80 100 7.81 781 125 -117
-81 100 7.81 781 125 -117
-82 100 7.81 781 125 -117
-83 100 7.81 781 125 -117
-84 100 7.81 781 125 -117
-85 100 7.81 781 125 -117
-86 100 7.81 781 125 -117
-87 100 7.81 781 125 -117
-88 100 7.81 781 125 -117
-89 100 7.81 781 125 -117
-90 100 7.81 781 125 -117
-91 100 7.81 781 125 -117
-92 100 7.81 781 125 -117
-93 100 7.81 781 125 -117
-94 100 7.81 781 125 -117
-95 100 7.81 781 125 -117
-96 100 7.81 781 125 -117
-97 100 7.81 781 125 -117
-98 100 7.81 781 125 -117
-99 100 7.81 781 125 -117
-100 100 7.81 781 125 -117
+1 100 7.81 781 125 -117 0 -1 -61
+2 100 7.81 781 125 -117 0 -1 -61
+3 100 7.81 781 125 -117 0 -1 -61
+4 100 7.81 781 125 -117 0 -1 -61
+5 100 7.81 781 125 -117 0 -1 -61
+6 100 7.81 781 125 -117 0 -1 -61
+7 100 7.81 781 125 -117 0 -1 -61
+8 100 7.81 781 125 -117 0 -1 -61
+9 100 7.81 781 125 -117 0 -1 -61
+10 100 7.81 781 125 -117 0 -1 -61
+11 100 7.81 781 125 -117 0 -1 -61
+12 100 7.81 781 125 -117 0 -1 -61
+13 100 7.81 781 125 -117 0 -1 -61
+14 100 7.81 781 125 -117 0 -1 -61
+15 100 7.81 781 125 -117 0 -1 -61
+16 100 7.81 781 125 -117 0 -1 -61
+17 100 7.81 781 125 -117 0 -1 -61
+18 100 7.81 781 125 -117 0 -1 -61
+19 100 7.81 781 125 -117 0 -1 -61
+20 100 7.81 781 125 -117 0 -1 -61
+21 100 7.81 781 125 -117 0 -1 -61
+22 100 7.81 781 125 -117 0 -1 -61
+23 100 7.81 781 125 -117 0 -1 -61
+24 100 7.81 781 125 -117 0 -1 -61
+25 100 7.81 781 125 -117 0 -1 -61
+26 100 7.81 781 125 -117 0 -1 -61
+27 100 7.81 781 125 -117 0 -1 -61
+28 100 7.81 781 125 -117 0 -1 -61
+29 100 7.81 781 125 -117 0 -1 -61
+30 100 7.81 781 125 -117 0 -1 -61
+31 100 7.81 781 125 -117 0 -1 -61
+32 100 7.81 781 125 -117 0 -1 -61
+33 100 7.81 781 125 -117 0 -1 -61
+34 100 7.81 781 125 -117 0 -1 -61
+35 100 7.81 781 125 -117 0 -1 -61
+36 100 7.81 781 125 -117 0 -1 -61
+37 100 7.81 781 125 -117 0 -1 -61
+38 100 7.81 781 125 -117 0 -1 -61
+39 100 7.81 781 125 -117 0 -1 -61
+40 100 7.81 781 125 -117 0 -1 -61
+41 100 7.81 781 125 -117 0 -1 -61
+42 100 7.81 781 125 -117 0 -1 -61
+43 100 7.81 781 125 -117 0 -1 -61
+44 100 7.81 781 125 -117 0 -1 -61
+45 100 7.81 781 125 -117 0 -1 -61
+46 100 7.81 781 125 -117 0 -1 -61
+47 100 7.81 781 125 -117 0 -1 -61
+48 100 7.81 781 125 -117 0 -1 -61
+49 100 7.81 781 125 -117 0 -1 -61
+50 100 7.81 781 125 -117 0 -1 -61
+51 100 7.81 781 125 -117 0 -1 -61
+52 100 7.81 781 125 -117 0 -1 -61
+53 100 7.81 781 125 -117 0 -1 -61
+54 100 7.81 781 125 -117 0 -1 -61
+55 100 7.81 781 125 -117 0 -1 -61
+56 100 7.81 781 125 -117 0 -1 -61
+57 100 7.81 781 125 -117 0 -1 -61
+58 100 7.81 781 125 -117 0 -1 -61
+59 100 7.81 781 125 -117 0 -1 -61
+60 100 7.81 781 125 -117 0 -1 -61
+61 100 7.81 781 125 -117 0 -1 -61
+62 100 7.81 781 125 -117 0 -1 -61
+63 100 7.81 781 125 -117 0 -1 -61
+64 100 7.81 781 125 -117 0 -1 -61
+65 100 7.81 781 125 -117 0 -1 -61
+66 100 7.81 781 125 -117 0 -1 -61
+67 100 7.81 781 125 -117 0 -1 -61
+68 100 7.81 781 125 -117 0 -1 -61
+69 100 7.81 781 125 -117 0 -1 -61
+70 100 7.81 781 125 -117 0 -1 -61
+71 100 7.81 781 125 -117 0 -1 -61
+72 100 7.81 781 125 -117 0 -1 -61
+73 100 7.81 781 125 -117 0 -1 -61
+74 100 7.81 781 125 -117 0 -1 -61
+75 100 7.81 781 125 -117 0 -1 -61
+76 100 7.81 781 125 -117 0 -1 -61
+77 100 7.81 781 125 -117 0 -1 -61
+78 100 7.81 781 125 -117 0 -1 -61
+79 100 7.81 781 125 -117 0 -1 -61
+80 100 7.81 781 125 -117 0 -1 -61
+81 100 7.81 781 125 -117 0 -1 -61
+82 100 7.81 781 125 -117 0 -1 -61
+83 100 7.81 781 125 -117 0 -1 -61
+84 100 7.81 781 125 -117 0 -1 -61
+85 100 7.81 781 125 -117 0 -1 -61
+86 100 7.81 781 125 -117 0 -1 -61
+87 100 7.81 781 125 -117 0 -1 -61
+88 100 7.81 781 125 -117 0 -1 -61
+89 100 7.81 781 125 -117 0 -1 -61
+90 100 7.81 781 125 -117 0 -1 -61
+91 100 7.81 781 125 -117 0 -1 -61
+92 100 7.81 781 125 -117 0 -1 -61
+93 100 7.81 781 125 -117 0 -1 -61
+94 100 7.81 781 125 -117 0 -1 -61
+95 100 7.81 781 125 -117 0 -1 -61
+96 100 7.81 781 125 -117 0 -1 -61
+97 100 7.81 781 125 -117 0 -1 -61
+98 100 7.81 781 125 -117 0 -1 -61
+99 100 7.81 781 125 -117 0 -1 -61
+100 100 7.81 781 125 -117 0 -1 -61
 
 query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII
 SELECT
@@ -686,7 +689,7 @@ ORDER BY c8;
 64517 2809 63353 10 36599 56980 10
 
 
-query IIIRIII
+query IIIRIIIIII
 SELECT
   c9,
   row_number() OVER (ORDER BY c2, c9) AS row_number,
@@ -694,113 +697,116 @@ SELECT
   avg(c3) OVER (ORDER BY c2) AS avg_c3_by_c2,
   sum(c3) OVER (ORDER BY c2) AS sum_c3_by_c2,
   max(c3) OVER (ORDER BY c2) AS max_c3_by_c2,
-  min(c3) OVER (ORDER BY c2) AS min_c3_by_c2
+  min(c3) OVER (ORDER BY c2) AS min_c3_by_c2,
+  bit_and(c3) OVER (ORDER BY c2) AS bit_and_c3_by_c2,
+  bit_or(c3) OVER (ORDER BY c2) AS bit_or_c3_by_c2,
+  bit_xor(c3) OVER (ORDER BY c2) AS bit_xor_c3_by_c2
 FROM aggregate_test_100_by_sql
 ORDER BY row_number;
 ----
-225513085 1 6 16.681818181818 367 125 -99
-473294098 2 11 16.681818181818 367 125 -99
-520189543 3 12 16.681818181818 367 125 -99
-774637006 4 19 16.681818181818 367 125 -99
-879082834 5 21 16.681818181818 367 125 -99
-1454057357 6 34 16.681818181818 367 125 -99
-1842680163 7 40 16.681818181818 367 125 -99
-2125812933 8 46 16.681818181818 367 125 -99
-2610290479 9 56 16.681818181818 367 125 -99
-2669374863 10 57 16.681818181818 367 125 -99
-2712615025 11 59 16.681818181818 367 125 -99
-2830981072 12 62 16.681818181818 367 125 -99
-2861376515 13 64 16.681818181818 367 125 -99
-3275293996 14 72 16.681818181818 367 125 -99
-3276123488 15 73 16.681818181818 367 125 -99
-3542840110 16 82 16.681818181818 367 125 -99
-3625286410 17 87 16.681818181818 367 125 -99
-3766999078 18 90 16.681818181818 367 125 -99
-4015442341 19 94 16.681818181818 367 125 -99
-4076864659 20 96 16.681818181818 367 125 -99
-4216440507 21 98 16.681818181818 367 125 -99
-4229654142 22 99 16.681818181818 367 125 -99
-63044568 23 2 12.522727272727 551 125 -117
-141680161 24 4 12.522727272727 551 125 -117
-145294611 25 5 12.522727272727 551 125 -117
-598822671 26 16 12.522727272727 551 125 -117
-1000948272 27 24 12.522727272727 551 125 -117
-1098639440 28 27 12.522727272727 551 125 -117
-1157161427 29 28 12.522727272727 551 125 -117
-1289293657 30 31 12.522727272727 551 125 -117
-1491205016 31 35 12.522727272727 551 125 -117
-2013662838 32 43 12.522727272727 551 125 -117
-2293105904 33 48 12.522727272727 551 125 -117
-2525744318 34 54 12.522727272727 551 125 -117
-2705709344 35 58 12.522727272727 551 125 -117
-2844041986 36 63 12.522727272727 551 125 -117
-2939920218 37 66 12.522727272727 551 125 -117
-3188005828 38 70 12.522727272727 551 125 -117
-3314983189 39 74 12.522727272727 551 125 -117
-3398507249 40 77 12.522727272727 551 125 -117
-3455216719 41 78 12.522727272727 551 125 -117
-3717551163 42 88 12.522727272727 551 125 -117
-4061635107 43 95 12.522727272727 551 125 -117
-4144173353 44 97 12.522727272727 551 125 -117
-243203849 45 7 15.015873015873 946 125 -117
-431948861 46 9 15.015873015873 946 125 -117
-559847112 47 15 15.015873015873 946 125 -117
-754775609 48 18 15.015873015873 946 125 -117
-1088543984 49 26 15.015873015873 946 125 -117
-1362369177 50 32 15.015873015873 946 125 -117
-1538863055 51 37 15.015873015873 946 125 -117
-1824517658 52 39 15.015873015873 946 125 -117
-1995343206 53 42 15.015873015873 946 125 -117
-2093538928 54 45 15.015873015873 946 125 -117
-2214035726 55 47 15.015873015873 946 125 -117
-2592330556 56 55 15.015873015873 946 125 -117
-3105312559 57 68 15.015873015873 946 125 -117
-3473924576 58 80 15.015873015873 946 125 -117
-3577318119 59 85 15.015873015873 946 125 -117
-3759340273 60 89 15.015873015873 946 125 -117
-3862393166 61 91 15.015873015873 946 125 -117
-3959216334 62 92 15.015873015873 946 125 -117
-3998790955 63 93 15.015873015873 946 125 -117
-28774375 64 1 11.337209302326 975 125 -117
-326151275 65 8 11.337209302326 975 125 -117
-466439833 66 10 11.337209302326 975 125 -117
-538589788 67 13 11.337209302326 975 125 -117
-557517119 68 14 11.337209302326 975 125 -117
-811650497 69 20 11.337209302326 975 125 -117
-933879086 70 22 11.337209302326 975 125 -117
-1243785310 71 30 11.337209302326 975 125 -117
-1534194097 72 36 11.337209302326 975 125 -117
-1787652631 73 38 11.337209302326 975 125 -117
-1865307672 74 41 11.337209302326 975 125 -117
-2042457019 75 44 11.337209302326 975 125 -117
-2306130875 76 49 11.337209302326 975 125 -117
-2502326480 77 53 11.337209302326 975 125 -117
-2778168728 78 60 11.337209302326 975 125 -117
-2818832252 79 61 11.337209302326 975 125 -117
-3023531799 80 67 11.337209302326 975 125 -117
-3126475872 81 69 11.337209302326 975 125 -117
-3198969145 82 71 11.337209302326 975 125 -117
-3521368277 83 81 11.337209302326 975 125 -117
-3566741189 84 83 11.337209302326 975 125 -117
-3570297463 85 84 11.337209302326 975 125 -117
-3593959807 86 86 11.337209302326 975 125 -117
-141047417 87 3 7.81 781 125 -117
-662099130 88 17 7.81 781 125 -117
-974297360 89 23 7.81 781 125 -117
-1013876852 90 25 7.81 781 125 -117
-1229567292 91 29 7.81 781 125 -117
-1365198901 92 33 7.81 781 125 -117
-2307004493 93 50 7.81 781 125 -117
-2424630722 94 51 7.81 781 125 -117
-2496054700 95 52 7.81 781 125 -117
-2861911482 96 65 7.81 781 125 -117
-3342719438 97 75 7.81 781 125 -117
-3373581039 98 76 7.81 781 125 -117
-3457053821 99 79 7.81 781 125 -117
-4268716378 100 100 7.81 781 125 -117
+225513085 1 6 16.681818181818 367 125 -99 0 -1 -47
+473294098 2 11 16.681818181818 367 125 -99 0 -1 -47
+520189543 3 12 16.681818181818 367 125 -99 0 -1 -47
+774637006 4 19 16.681818181818 367 125 -99 0 -1 -47
+879082834 5 21 16.681818181818 367 125 -99 0 -1 -47
+1454057357 6 34 16.681818181818 367 125 -99 0 -1 -47
+1842680163 7 40 16.681818181818 367 125 -99 0 -1 -47
+2125812933 8 46 16.681818181818 367 125 -99 0 -1 -47
+2610290479 9 56 16.681818181818 367 125 -99 0 -1 -47
+2669374863 10 57 16.681818181818 367 125 -99 0 -1 -47
+2712615025 11 59 16.681818181818 367 125 -99 0 -1 -47
+2830981072 12 62 16.681818181818 367 125 -99 0 -1 -47
+2861376515 13 64 16.681818181818 367 125 -99 0 -1 -47
+3275293996 14 72 16.681818181818 367 125 -99 0 -1 -47
+3276123488 15 73 16.681818181818 367 125 -99 0 -1 -47
+3542840110 16 82 16.681818181818 367 125 -99 0 -1 -47
+3625286410 17 87 16.681818181818 367 125 -99 0 -1 -47
+3766999078 18 90 16.681818181818 367 125 -99 0 -1 -47
+4015442341 19 94 16.681818181818 367 125 -99 0 -1 -47
+4076864659 20 96 16.681818181818 367 125 -99 0 -1 -47
+4216440507 21 98 16.681818181818 367 125 -99 0 -1 -47
+4229654142 22 99 16.681818181818 367 125 -99 0 -1 -47
+63044568 23 2 12.522727272727 551 125 -117 0 -1 47
+141680161 24 4 12.522727272727 551 125 -117 0 -1 47
+145294611 25 5 12.522727272727 551 125 -117 0 -1 47
+598822671 26 16 12.522727272727 551 125 -117 0 -1 47
+1000948272 27 24 12.522727272727 551 125 -117 0 -1 47
+1098639440 28 27 12.522727272727 551 125 -117 0 -1 47
+1157161427 29 28 12.522727272727 551 125 -117 0 -1 47
+1289293657 30 31 12.522727272727 551 125 -117 0 -1 47
+1491205016 31 35 12.522727272727 551 125 -117 0 -1 47
+2013662838 32 43 12.522727272727 551 125 -117 0 -1 47
+2293105904 33 48 12.522727272727 551 125 -117 0 -1 47
+2525744318 34 54 12.522727272727 551 125 -117 0 -1 47
+2705709344 35 58 12.522727272727 551 125 -117 0 -1 47
+2844041986 36 63 12.522727272727 551 125 -117 0 -1 47
+2939920218 37 66 12.522727272727 551 125 -117 0 -1 47
+3188005828 38 70 12.522727272727 551 125 -117 0 -1 47
+3314983189 39 74 12.522727272727 551 125 -117 0 -1 47
+3398507249 40 77 12.522727272727 551 125 -117 0 -1 47
+3455216719 41 78 12.522727272727 551 125 -117 0 -1 47
+3717551163 42 88 12.522727272727 551 125 -117 0 -1 47
+4061635107 43 95 12.522727272727 551 125 -117 0 -1 47
+4144173353 44 97 12.522727272727 551 125 -117 0 -1 47
+243203849 45 7 15.015873015873 946 125 -117 0 -1 74
+431948861 46 9 15.015873015873 946 125 -117 0 -1 74
+559847112 47 15 15.015873015873 946 125 -117 0 -1 74
+754775609 48 18 15.015873015873 946 125 -117 0 -1 74
+1088543984 49 26 15.015873015873 946 125 -117 0 -1 74
+1362369177 50 32 15.015873015873 946 125 -117 0 -1 74
+1538863055 51 37 15.015873015873 946 125 -117 0 -1 74
+1824517658 52 39 15.015873015873 946 125 -117 0 -1 74
+1995343206 53 42 15.015873015873 946 125 -117 0 -1 74
+2093538928 54 45 15.015873015873 946 125 -117 0 -1 74
+2214035726 55 47 15.015873015873 946 125 -117 0 -1 74
+2592330556 56 55 15.015873015873 946 125 -117 0 -1 74
+3105312559 57 68 15.015873015873 946 125 -117 0 -1 74
+3473924576 58 80 15.015873015873 946 125 -117 0 -1 74
+3577318119 59 85 15.015873015873 946 125 -117 0 -1 74
+3759340273 60 89 15.015873015873 946 125 -117 0 -1 74
+3862393166 61 91 15.015873015873 946 125 -117 0 -1 74
+3959216334 62 92 15.015873015873 946 125 -117 0 -1 74
+3998790955 63 93 15.015873015873 946 125 -117 0 -1 74
+28774375 64 1 11.337209302326 975 125 -117 0 -1 69
+326151275 65 8 11.337209302326 975 125 -117 0 -1 69
+466439833 66 10 11.337209302326 975 125 -117 0 -1 69
+538589788 67 13 11.337209302326 975 125 -117 0 -1 69
+557517119 68 14 11.337209302326 975 125 -117 0 -1 69
+811650497 69 20 11.337209302326 975 125 -117 0 -1 69
+933879086 70 22 11.337209302326 975 125 -117 0 -1 69
+1243785310 71 30 11.337209302326 975 125 -117 0 -1 69
+1534194097 72 36 11.337209302326 975 125 -117 0 -1 69
+1787652631 73 38 11.337209302326 975 125 -117 0 -1 69
+1865307672 74 41 11.337209302326 975 125 -117 0 -1 69
+2042457019 75 44 11.337209302326 975 125 -117 0 -1 69
+2306130875 76 49 11.337209302326 975 125 -117 0 -1 69
+2502326480 77 53 11.337209302326 975 125 -117 0 -1 69
+2778168728 78 60 11.337209302326 975 125 -117 0 -1 69
+2818832252 79 61 11.337209302326 975 125 -117 0 -1 69
+3023531799 80 67 11.337209302326 975 125 -117 0 -1 69
+3126475872 81 69 11.337209302326 975 125 -117 0 -1 69
+3198969145 82 71 11.337209302326 975 125 -117 0 -1 69
+3521368277 83 81 11.337209302326 975 125 -117 0 -1 69
+3566741189 84 83 11.337209302326 975 125 -117 0 -1 69
+3570297463 85 84 11.337209302326 975 125 -117 0 -1 69
+3593959807 86 86 11.337209302326 975 125 -117 0 -1 69
+141047417 87 3 7.81 781 125 -117 0 -1 -61
+662099130 88 17 7.81 781 125 -117 0 -1 -61
+974297360 89 23 7.81 781 125 -117 0 -1 -61
+1013876852 90 25 7.81 781 125 -117 0 -1 -61
+1229567292 91 29 7.81 781 125 -117 0 -1 -61
+1365198901 92 33 7.81 781 125 -117 0 -1 -61
+2307004493 93 50 7.81 781 125 -117 0 -1 -61
+2424630722 94 51 7.81 781 125 -117 0 -1 -61
+2496054700 95 52 7.81 781 125 -117 0 -1 -61
+2861911482 96 65 7.81 781 125 -117 0 -1 -61
+3342719438 97 75 7.81 781 125 -117 0 -1 -61
+3373581039 98 76 7.81 781 125 -117 0 -1 -61
+3457053821 99 79 7.81 781 125 -117 0 -1 -61
+4268716378 100 100 7.81 781 125 -117 0 -1 -61
 
 
-query IIIRIII
+query IIIRIIIIII
 SELECT
   c9,
   row_number() OVER (PARTITION BY c2, c9) AS row_number,
@@ -808,113 +814,116 @@ SELECT
   avg(c3) OVER (PARTITION BY c2) AS avg_c3_by_c2,
   sum(c3) OVER (PARTITION BY c2) AS sum_c3_by_c2,
   max(c3) OVER (PARTITION BY c2) AS max_c3_by_c2,
-  min(c3) OVER (PARTITION BY c2) AS min_c3_by_c2
+  min(c3) OVER (PARTITION BY c2) AS min_c3_by_c2,
+  bit_and(c3) OVER (PARTITION BY c2) AS bit_and_c3_by_c2,
+  bit_or(c3) OVER (PARTITION BY c2) AS bit_or_c3_by_c2,
+  bit_xor(c3) OVER (PARTITION BY c2) AS bit_xor_c3_by_c2
 FROM aggregate_test_100_by_sql
 ORDER BY c9;
 ----
-28774375 1 23 1.260869565217 29 123 -117
-63044568 1 22 8.363636363636 184 122 -117
-141047417 1 14 -13.857142857143 -194 118 -101
-141680161 1 22 8.363636363636 184 122 -117
-145294611 1 22 8.363636363636 184 122 -117
-225513085 1 22 16.681818181818 367 125 -99
-243203849 1 19 20.789473684211 395 123 -101
-326151275 1 23 1.260869565217 29 123 -117
-431948861 1 19 20.789473684211 395 123 -101
-466439833 1 23 1.260869565217 29 123 -117
-473294098 1 22 16.681818181818 367 125 -99
-520189543 1 22 16.681818181818 367 125 -99
-538589788 1 23 1.260869565217 29 123 -117
-557517119 1 23 1.260869565217 29 123 -117
-559847112 1 19 20.789473684211 395 123 -101
-598822671 1 22 8.363636363636 184 122 -117
-662099130 1 14 -13.857142857143 -194 118 -101
-754775609 1 19 20.789473684211 395 123 -101
-774637006 1 22 16.681818181818 367 125 -99
-811650497 1 23 1.260869565217 29 123 -117
-879082834 1 22 16.681818181818 367 125 -99
-933879086 1 23 1.260869565217 29 123 -117
-974297360 1 14 -13.857142857143 -194 118 -101
-1000948272 1 22 8.363636363636 184 122 -117
-1013876852 1 14 -13.857142857143 -194 118 -101
-1088543984 1 19 20.789473684211 395 123 -101
-1098639440 1 22 8.363636363636 184 122 -117
-1157161427 1 22 8.363636363636 184 122 -117
-1229567292 1 14 -13.857142857143 -194 118 -101
-1243785310 1 23 1.260869565217 29 123 -117
-1289293657 1 22 8.363636363636 184 122 -117
-1362369177 1 19 20.789473684211 395 123 -101
-1365198901 1 14 -13.857142857143 -194 118 -101
-1454057357 1 22 16.681818181818 367 125 -99
-1491205016 1 22 8.363636363636 184 122 -117
-1534194097 1 23 1.260869565217 29 123 -117
-1538863055 1 19 20.789473684211 395 123 -101
-1787652631 1 23 1.260869565217 29 123 -117
-1824517658 1 19 20.789473684211 395 123 -101
-1842680163 1 22 16.681818181818 367 125 -99
-1865307672 1 23 1.260869565217 29 123 -117
-1995343206 1 19 20.789473684211 395 123 -101
-2013662838 1 22 8.363636363636 184 122 -117
-2042457019 1 23 1.260869565217 29 123 -117
-2093538928 1 19 20.789473684211 395 123 -101
-2125812933 1 22 16.681818181818 367 125 -99
-2214035726 1 19 20.789473684211 395 123 -101
-2293105904 1 22 8.363636363636 184 122 -117
-2306130875 1 23 1.260869565217 29 123 -117
-2307004493 1 14 -13.857142857143 -194 118 -101
-2424630722 1 14 -13.857142857143 -194 118 -101
-2496054700 1 14 -13.857142857143 -194 118 -101
-2502326480 1 23 1.260869565217 29 123 -117
-2525744318 1 22 8.363636363636 184 122 -117
-2592330556 1 19 20.789473684211 395 123 -101
-2610290479 1 22 16.681818181818 367 125 -99
-2669374863 1 22 16.681818181818 367 125 -99
-2705709344 1 22 8.363636363636 184 122 -117
-2712615025 1 22 16.681818181818 367 125 -99
-2778168728 1 23 1.260869565217 29 123 -117
-2818832252 1 23 1.260869565217 29 123 -117
-2830981072 1 22 16.681818181818 367 125 -99
-2844041986 1 22 8.363636363636 184 122 -117
-2861376515 1 22 16.681818181818 367 125 -99
-2861911482 1 14 -13.857142857143 -194 118 -101
-2939920218 1 22 8.363636363636 184 122 -117
-3023531799 1 23 1.260869565217 29 123 -117
-3105312559 1 19 20.789473684211 395 123 -101
-3126475872 1 23 1.260869565217 29 123 -117
-3188005828 1 22 8.363636363636 184 122 -117
-3198969145 1 23 1.260869565217 29 123 -117
-3275293996 1 22 16.681818181818 367 125 -99
-3276123488 1 22 16.681818181818 367 125 -99
-3314983189 1 22 8.363636363636 184 122 -117
-3342719438 1 14 -13.857142857143 -194 118 -101
-3373581039 1 14 -13.857142857143 -194 118 -101
-3398507249 1 22 8.363636363636 184 122 -117
-3455216719 1 22 8.363636363636 184 122 -117
-3457053821 1 14 -13.857142857143 -194 118 -101
-3473924576 1 19 20.789473684211 395 123 -101
-3521368277 1 23 1.260869565217 29 123 -117
-3542840110 1 22 16.681818181818 367 125 -99
-3566741189 1 23 1.260869565217 29 123 -117
-3570297463 1 23 1.260869565217 29 123 -117
-3577318119 1 19 20.789473684211 395 123 -101
-3593959807 1 23 1.260869565217 29 123 -117
-3625286410 1 22 16.681818181818 367 125 -99
-3717551163 1 22 8.363636363636 184 122 -117
-3759340273 1 19 20.789473684211 395 123 -101
-3766999078 1 22 16.681818181818 367 125 -99
-3862393166 1 19 20.789473684211 395 123 -101
-3959216334 1 19 20.789473684211 395 123 -101
-3998790955 1 19 20.789473684211 395 123 -101
-4015442341 1 22 16.681818181818 367 125 -99
-4061635107 1 22 8.363636363636 184 122 -117
-4076864659 1 22 16.681818181818 367 125 -99
-4144173353 1 22 8.363636363636 184 122 -117
-4216440507 1 22 16.681818181818 367 125 -99
-4229654142 1 22 16.681818181818 367 125 -99
-4268716378 1 14 -13.857142857143 -194 118 -101
+28774375 1 23 1.260869565217 29 123 -117 0 -1 15
+63044568 1 22 8.363636363636 184 122 -117 0 -1 -2
+141047417 1 14 -13.857142857143 -194 118 -101 0 -1 -122
+141680161 1 22 8.363636363636 184 122 -117 0 -1 -2
+145294611 1 22 8.363636363636 184 122 -117 0 -1 -2
+225513085 1 22 16.681818181818 367 125 -99 0 -1 -47
+243203849 1 19 20.789473684211 395 123 -101 0 -1 101
+326151275 1 23 1.260869565217 29 123 -117 0 -1 15
+431948861 1 19 20.789473684211 395 123 -101 0 -1 101
+466439833 1 23 1.260869565217 29 123 -117 0 -1 15
+473294098 1 22 16.681818181818 367 125 -99 0 -1 -47
+520189543 1 22 16.681818181818 367 125 -99 0 -1 -47
+538589788 1 23 1.260869565217 29 123 -117 0 -1 15
+557517119 1 23 1.260869565217 29 123 -117 0 -1 15
+559847112 1 19 20.789473684211 395 123 -101 0 -1 101
+598822671 1 22 8.363636363636 184 122 -117 0 -1 -2
+662099130 1 14 -13.857142857143 -194 118 -101 0 -1 -122
+754775609 1 19 20.789473684211 395 123 -101 0 -1 101
+774637006 1 22 16.681818181818 367 125 -99 0 -1 -47
+811650497 1 23 1.260869565217 29 123 -117 0 -1 15
+879082834 1 22 16.681818181818 367 125 -99 0 -1 -47
+933879086 1 23 1.260869565217 29 123 -117 0 -1 15
+974297360 1 14 -13.857142857143 -194 118 -101 0 -1 -122
+1000948272 1 22 8.363636363636 184 122 -117 0 -1 -2
+1013876852 1 14 -13.857142857143 -194 118 -101 0 -1 -122
+1088543984 1 19 20.789473684211 395 123 -101 0 -1 101
+1098639440 1 22 8.363636363636 184 122 -117 0 -1 -2
+1157161427 1 22 8.363636363636 184 122 -117 0 -1 -2
+1229567292 1 14 -13.857142857143 -194 118 -101 0 -1 -122
+1243785310 1 23 1.260869565217 29 123 -117 0 -1 15
+1289293657 1 22 8.363636363636 184 122 -117 0 -1 -2
+1362369177 1 19 20.789473684211 395 123 -101 0 -1 101
+1365198901 1 14 -13.857142857143 -194 118 -101 0 -1 -122
+1454057357 1 22 16.681818181818 367 125 -99 0 -1 -47
+1491205016 1 22 8.363636363636 184 122 -117 0 -1 -2
+1534194097 1 23 1.260869565217 29 123 -117 0 -1 15
+1538863055 1 19 20.789473684211 395 123 -101 0 -1 101
+1787652631 1 23 1.260869565217 29 123 -117 0 -1 15
+1824517658 1 19 20.789473684211 395 123 -101 0 -1 101
+1842680163 1 22 16.681818181818 367 125 -99 0 -1 -47
+1865307672 1 23 1.260869565217 29 123 -117 0 -1 15
+1995343206 1 19 20.789473684211 395 123 -101 0 -1 101
+2013662838 1 22 8.363636363636 184 122 -117 0 -1 -2
+2042457019 1 23 1.260869565217 29 123 -117 0 -1 15
+2093538928 1 19 20.789473684211 395 123 -101 0 -1 101
+2125812933 1 22 16.681818181818 367 125 -99 0 -1 -47
+2214035726 1 19 20.789473684211 395 123 -101 0 -1 101
+2293105904 1 22 8.363636363636 184 122 -117 0 -1 -2
+2306130875 1 23 1.260869565217 29 123 -117 0 -1 15
+2307004493 1 14 -13.857142857143 -194 118 -101 0 -1 -122
+2424630722 1 14 -13.857142857143 -194 118 -101 0 -1 -122
+2496054700 1 14 -13.857142857143 -194 118 -101 0 -1 -122
+2502326480 1 23 1.260869565217 29 123 -117 0 -1 15
+2525744318 1 22 8.363636363636 184 122 -117 0 -1 -2
+2592330556 1 19 20.789473684211 395 123 -101 0 -1 101
+2610290479 1 22 16.681818181818 367 125 -99 0 -1 -47
+2669374863 1 22 16.681818181818 367 125 -99 0 -1 -47
+2705709344 1 22 8.363636363636 184 122 -117 0 -1 -2
+2712615025 1 22 16.681818181818 367 125 -99 0 -1 -47
+2778168728 1 23 1.260869565217 29 123 -117 0 -1 15
+2818832252 1 23 1.260869565217 29 123 -117 0 -1 15
+2830981072 1 22 16.681818181818 367 125 -99 0 -1 -47
+2844041986 1 22 8.363636363636 184 122 -117 0 -1 -2
+2861376515 1 22 16.681818181818 367 125 -99 0 -1 -47
+2861911482 1 14 -13.857142857143 -194 118 -101 0 -1 -122
+2939920218 1 22 8.363636363636 184 122 -117 0 -1 -2
+3023531799 1 23 1.260869565217 29 123 -117 0 -1 15
+3105312559 1 19 20.789473684211 395 123 -101 0 -1 101
+3126475872 1 23 1.260869565217 29 123 -117 0 -1 15
+3188005828 1 22 8.363636363636 184 122 -117 0 -1 -2
+3198969145 1 23 1.260869565217 29 123 -117 0 -1 15
+3275293996 1 22 16.681818181818 367 125 -99 0 -1 -47
+3276123488 1 22 16.681818181818 367 125 -99 0 -1 -47
+3314983189 1 22 8.363636363636 184 122 -117 0 -1 -2
+3342719438 1 14 -13.857142857143 -194 118 -101 0 -1 -122
+3373581039 1 14 -13.857142857143 -194 118 -101 0 -1 -122
+3398507249 1 22 8.363636363636 184 122 -117 0 -1 -2
+3455216719 1 22 8.363636363636 184 122 -117 0 -1 -2
+3457053821 1 14 -13.857142857143 -194 118 -101 0 -1 -122
+3473924576 1 19 20.789473684211 395 123 -101 0 -1 101
+3521368277 1 23 1.260869565217 29 123 -117 0 -1 15
+3542840110 1 22 16.681818181818 367 125 -99 0 -1 -47
+3566741189 1 23 1.260869565217 29 123 -117 0 -1 15
+3570297463 1 23 1.260869565217 29 123 -117 0 -1 15
+3577318119 1 19 20.789473684211 395 123 -101 0 -1 101
+3593959807 1 23 1.260869565217 29 123 -117 0 -1 15
+3625286410 1 22 16.681818181818 367 125 -99 0 -1 -47
+3717551163 1 22 8.363636363636 184 122 -117 0 -1 -2
+3759340273 1 19 20.789473684211 395 123 -101 0 -1 101
+3766999078 1 22 16.681818181818 367 125 -99 0 -1 -47
+3862393166 1 19 20.789473684211 395 123 -101 0 -1 101
+3959216334 1 19 20.789473684211 395 123 -101 0 -1 101
+3998790955 1 19 20.789473684211 395 123 -101 0 -1 101
+4015442341 1 22 16.681818181818 367 125 -99 0 -1 -47
+4061635107 1 22 8.363636363636 184 122 -117 0 -1 -2
+4076864659 1 22 16.681818181818 367 125 -99 0 -1 -47
+4144173353 1 22 8.363636363636 184 122 -117 0 -1 -2
+4216440507 1 22 16.681818181818 367 125 -99 0 -1 -47
+4229654142 1 22 16.681818181818 367 125 -99 0 -1 -47
+4268716378 1 14 -13.857142857143 -194 118 -101 0 -1 -122
 
 
-query IIIRIII
+query IIIRIIIIII
 SELECT
   c9,
   row_number() OVER (PARTITION BY c2 ORDER BY c9) AS row_number,
@@ -922,110 +931,113 @@ SELECT
   avg(c3) OVER (PARTITION BY c2 ORDER BY c9) AS avg_c3_by_c2,
   sum(c3) OVER (PARTITION BY c2 ORDER BY c9) AS sum_c3_by_c2,
   max(c3) OVER (PARTITION BY c2 ORDER BY c9) AS max_c3_by_c2,
-  min(c3) OVER (PARTITION BY c2 ORDER BY c9) AS min_c3_by_c2
+  min(c3) OVER (PARTITION BY c2 ORDER BY c9) AS min_c3_by_c2,
+  bit_and(c3) OVER (PARTITION BY c2 ORDER BY c9) AS bit_and_c3_by_c2,
+  bit_or(c3) OVER (PARTITION BY c2 ORDER BY c9) AS bit_or_c3_by_c2,
+  bit_xor(c3) OVER (PARTITION BY c2 ORDER BY c9) AS bit_xor_c3_by_c2
 FROM aggregate_test_100_by_sql
 ORDER BY c9;
 ----
-28774375 1 1 30 30 30 30
-63044568 1 1 113 113 113 113
-141047417 1 1 36 36 36 36
-141680161 2 2 3.5 7 113 -106
-145294611 3 3 17.333333333333 52 113 -106
-225513085 1 1 -98 -98 -98 -98
-243203849 1 1 -101 -101 -101 -101
-326151275 2 2 38.5 77 47 30
-431948861 2 2 -43.5 -87 14 -101
-466439833 3 3 -8 -24 47 -101
-473294098 2 2 -14 -28 70 -98
-520189543 3 3 -17.333333333333 -52 70 -98
-538589788 4 4 12.25 49 73 -101
-557517119 5 5 -1.4 -7 73 -101
-559847112 3 3 -60.666666666667 -182 14 -101
-598822671 4 4 5.75 23 113 -106
-662099130 2 2 50 100 64 36
-754775609 4 4 -41.25 -165 17 -101
-774637006 4 4 -34.25 -137 70 -98
-811650497 6 6 8 48 73 -101
-879082834 5 5 -16 -80 70 -98
-933879086 7 7 9.285714285714 65 73 -101
-974297360 3 3 56 168 68 36
-1000948272 5 5 23.2 116 113 -106
-1013876852 4 4 34.25 137 68 -31
-1088543984 5 5 -18.4 -92 73 -101
-1098639440 6 6 24.5 147 113 -106
-1157161427 7 7 5.714285714286 40 113 -107
-1229567292 5 5 51 255 118 -31
-1243785310 8 8 -5.75 -46 73 -111
-1289293657 8 8 11.125 89 113 -107
-1362369177 6 6 -11.666666666667 -70 73 -101
-1365198901 6 6 32.666666666667 196 118 -59
-1454057357 6 6 -22.666666666667 -136 70 -98
-1491205016 9 9 10 90 113 -107
-1534194097 9 9 6.222222222222 56 102 -111
-1538863055 7 7 -7.571428571429 -53 73 -101
-1787652631 10 10 1.8 18 102 -111
-1824517658 8 8 -16.125 -129 73 -101
-1842680163 7 7 -11.714285714286 -82 70 -98
-1865307672 11 11 7.545454545455 83 102 -111
-1995343206 9 9 -22.333333333333 -201 73 -101
-2013662838 10 10 14.2 142 113 -107
-2042457019 12 12 15 180 102 -111
-2093538928 10 10 -12.4 -124 77 -101
-2125812933 8 8 -5.125 -41 70 -98
-2214035726 11 11 -10.090909090909 -111 77 -101
-2293105904 11 11 2.272727272727 25 113 -117
-2306130875 13 13 14.076923076923 183 102 -111
-2307004493 7 7 36.857142857143 258 118 -59
-2424630722 8 8 31.625 253 118 -59
-2496054700 9 9 16.888888888889 152 118 -101
-2502326480 14 14 9.214285714286 129 102 -111
-2525744318 12 12 -2.916666666667 -35 113 -117
-2592330556 12 12 1 12 123 -101
-2610290479 9 9 -0.555555555556 -5 70 -98
-2669374863 10 10 -1 -10 70 -98
-2705709344 13 13 2.153846153846 28 113 -117
-2712615025 11 11 2.545454545455 28 70 -98
-2778168728 15 15 5.066666666667 76 102 -111
-2818832252 16 16 -0.1875 -3 102 -111
-2830981072 12 12 12.333333333333 148 120 -98
-2844041986 14 14 -2.285714285714 -32 113 -117
-2861376515 13 13 10.769230769231 140 120 -98
-2861911482 10 10 6.6 66 118 -101
-2939920218 15 15 -5 -75 113 -117
-3023531799 17 17 -7.058823529412 -120 102 -117
-3105312559 13 13 6.384615384615 83 123 -101
-3126475872 18 18 -6.388888888889 -115 102 -117
-3188005828 16 16 1.375 22 113 -117
-3198969145 19 19 -2.157894736842 -41 102 -117
-3275293996 14 14 12.071428571429 169 120 -98
-3276123488 15 15 9.6 144 120 -98
-3314983189 17 17 4.352941176471 74 113 -117
-3342719438 11 11 -1.454545454545 -16 118 -101
-3373581039 12 12 -4.666666666667 -56 118 -101
-3398507249 18 18 5.722222222222 103 113 -117
-3455216719 19 19 9 171 113 -117
-3457053821 13 13 -7.692307692308 -100 118 -101
-3473924576 14 14 12.857142857143 180 123 -101
-3521368277 20 20 2.75 55 102 -117
-3542840110 16 16 4.5 72 120 -98
-3566741189 21 21 8.47619047619 178 123 -117
-3570297463 22 22 5.409090909091 119 123 -117
-3577318119 15 15 18.933333333333 284 123 -101
-3593959807 23 23 1.260869565217 29 123 -117
-3625286410 17 17 11.588235294118 197 125 -98
-3717551163 20 20 6.15 123 113 -117
-3759340273 16 16 24.75 396 123 -101
-3766999078 18 18 16.666666666667 300 125 -98
-3862393166 17 17 23.176470588235 394 123 -101
-3959216334 18 18 21.222222222222 382 123 -101
-3998790955 19 19 20.789473684211 395 123 -101
-4015442341 19 19 20.157894736842 383 125 -98
-4061635107 21 21 11.666666666667 245 122 -117
-4076864659 20 20 19.75 395 125 -98
-4144173353 22 22 8.363636363636 184 122 -117
-4216440507 21 21 14.095238095238 296 125 -99
-4229654142 22 22 16.681818181818 367 125 -99
-4268716378 14 14 -13.857142857143 -194 118 -101
+28774375 1 1 30 30 30 30 30 30 30
+63044568 1 1 113 113 113 113 113 113 113
+141047417 1 1 36 36 36 36 36 36 36
+141680161 2 2 3.5 7 113 -106 16 -9 -25
+145294611 3 3 17.333333333333 52 113 -106 0 -1 -54
+225513085 1 1 -98 -98 -98 -98 -98 -98 -98
+243203849 1 1 -101 -101 -101 -101 -101 -101 -101
+326151275 2 2 38.5 77 47 30 14 63 49
+431948861 2 2 -43.5 -87 14 -101 10 -97 -107
+466439833 3 3 -8 -24 47 -101 10 -65 -86
+473294098 2 2 -14 -28 70 -98 6 -34 -40
+520189543 3 3 -17.333333333333 -52 70 -98 0 -2 48
+538589788 4 4 12.25 49 73 -101 8 -1 -29
+557517119 5 5 -1.4 -7 73 -101 8 -1 43
+559847112 3 3 -60.666666666667 -182 14 -101 0 -65 52
+598822671 4 4 5.75 23 113 -106 0 -1 41
+662099130 2 2 50 100 64 36 0 100 100
+754775609 4 4 -41.25 -165 17 -101 0 -65 37
+774637006 4 4 -34.25 -137 70 -98 0 -1 -101
+811650497 6 6 8 48 73 -101 0 -1 28
+879082834 5 5 -16 -80 70 -98 0 -1 -94
+933879086 7 7 9.285714285714 65 73 -101 0 -1 13
+974297360 3 3 56 168 68 36 0 100 32
+1000948272 5 5 23.2 116 113 -106 0 -1 116
+1013876852 4 4 34.25 137 68 -31 0 -27 -63
+1088543984 5 5 -18.4 -92 73 -101 0 -1 108
+1098639440 6 6 24.5 147 113 -106 0 -1 107
+1157161427 7 7 5.714285714286 40 113 -107 0 -1 -2
+1229567292 5 5 51 255 118 -31 0 -9 -73
+1243785310 8 8 -5.75 -46 73 -111 0 -1 -100
+1289293657 8 8 11.125 89 113 -107 0 -1 -49
+1362369177 6 6 -11.666666666667 -70 73 -101 0 -1 122
+1365198901 6 6 32.666666666667 196 118 -59 0 -9 114
+1454057357 6 6 -22.666666666667 -136 70 -98 0 -1 106
+1491205016 9 9 10 90 113 -107 0 -1 -50
+1534194097 9 9 6.222222222222 56 102 -111 0 -1 -6
+1538863055 7 7 -7.571428571429 -53 73 -101 0 -1 107
+1787652631 10 10 1.8 18 102 -111 0 -1 32
+1824517658 8 8 -16.125 -129 73 -101 0 -1 -33
+1842680163 7 7 -11.714285714286 -82 70 -98 0 -1 92
+1865307672 11 11 7.545454545455 83 102 -111 0 -1 97
+1995343206 9 9 -22.333333333333 -201 73 -101 0 -1 103
+2013662838 10 10 14.2 142 113 -107 0 -1 -6
+2042457019 12 12 15 180 102 -111 0 -1 0
+2093538928 10 10 -12.4 -124 77 -101 0 -1 42
+2125812933 8 8 -5.125 -41 70 -98 0 -1 117
+2214035726 11 11 -10.090909090909 -111 77 -101 0 -1 39
+2293105904 11 11 2.272727272727 25 113 -117 0 -1 113
+2306130875 13 13 14.076923076923 183 102 -111 0 -1 3
+2307004493 7 7 36.857142857143 258 118 -59 0 -1 76
+2424630722 8 8 31.625 253 118 -59 0 -1 -73
+2496054700 9 9 16.888888888889 152 118 -101 0 -1 44
+2502326480 14 14 9.214285714286 129 102 -111 0 -1 -55
+2525744318 12 12 -2.916666666667 -35 113 -117 0 -1 -75
+2592330556 12 12 1 12 123 -101 0 -1 92
+2610290479 9 9 -0.555555555556 -5 70 -98 0 -1 81
+2669374863 10 10 -1 -10 70 -98 0 -1 -86
+2705709344 13 13 2.153846153846 28 113 -117 0 -1 -118
+2712615025 11 11 2.545454545455 28 70 -98 0 -1 -116
+2778168728 15 15 5.066666666667 76 102 -111 0 -1 2
+2818832252 16 16 -0.1875 -3 102 -111 0 -1 -77
+2830981072 12 12 12.333333333333 148 120 -98 0 -1 -12
+2844041986 14 14 -2.285714285714 -32 113 -117 0 -1 78
+2861376515 13 13 10.769230769231 140 120 -98 0 -1 12
+2861911482 10 10 6.6 66 118 -101 0 -1 -122
+2939920218 15 15 -5 -75 113 -117 0 -1 -101
+3023531799 17 17 -7.058823529412 -120 102 -117 0 -1 56
+3105312559 13 13 6.384615384615 83 123 -101 0 -1 27
+3126475872 18 18 -6.388888888889 -115 102 -117 0 -1 61
+3188005828 16 16 1.375 22 113 -117 0 -1 -6
+3198969145 19 19 -2.157894736842 -41 102 -117 0 -1 119
+3275293996 14 14 12.071428571429 169 120 -98 0 -1 17
+3276123488 15 15 9.6 144 120 -98 0 -1 -10
+3314983189 17 17 4.352941176471 74 113 -117 0 -1 -50
+3342719438 11 11 -1.454545454545 -16 118 -101 0 -1 40
+3373581039 12 12 -4.666666666667 -56 118 -101 0 -1 -16
+3398507249 18 18 5.722222222222 103 113 -117 0 -1 -45
+3455216719 19 19 9 171 113 -117 0 -1 -105
+3457053821 13 13 -7.692307692308 -100 118 -101 0 -1 36
+3473924576 14 14 12.857142857143 180 123 -101 0 -1 122
+3521368277 20 20 2.75 55 102 -117 0 -1 23
+3542840110 16 16 4.5 72 120 -98 0 -1 78
+3566741189 21 21 8.47619047619 178 123 -117 0 -1 108
+3570297463 22 22 5.409090909091 119 123 -117 0 -1 -87
+3577318119 15 15 18.933333333333 284 123 -101 0 -1 18
+3593959807 23 23 1.260869565217 29 123 -117 0 -1 15
+3625286410 17 17 11.588235294118 197 125 -98 0 -1 51
+3717551163 20 20 6.15 123 113 -117 0 -1 71
+3759340273 16 16 24.75 396 123 -101 0 -1 98
+3766999078 18 18 16.666666666667 300 125 -98 0 -1 84
+3862393166 17 17 23.176470588235 394 123 -101 0 -1 -100
+3959216334 18 18 21.222222222222 382 123 -101 0 -1 104
+3998790955 19 19 20.789473684211 395 123 -101 0 -1 101
+4015442341 19 19 20.157894736842 383 125 -98 0 -1 7
+4061635107 21 21 11.666666666667 245 122 -117 0 -1 61
+4076864659 20 20 19.75 395 125 -98 0 -1 11
+4144173353 22 22 8.363636363636 184 122 -117 0 -1 -2
+4216440507 21 21 14.095238095238 296 125 -99 0 -1 -106
+4229654142 22 22 16.681818181818 367 125 -99 0 -1 -47
+4268716378 14 14 -13.857142857143 -194 118 -101 0 -1 -122
 
 
 query IIIIIIIIII
diff --git a/datafusion/expr/src/aggregate_function.rs b/datafusion/expr/src/aggregate_function.rs
index dd27d9504b..7d5fa277de 100644
--- a/datafusion/expr/src/aggregate_function.rs
+++ b/datafusion/expr/src/aggregate_function.rs
@@ -64,6 +64,16 @@ pub enum AggregateFunction {
     ApproxMedian,
     /// Grouping
     Grouping,
+    /// Bit And
+    BitAnd,
+    /// Bit Or
+    BitOr,
+    /// Bit Xor
+    BitXor,
+    /// Bool And
+    BoolAnd,
+    /// Bool Or
+    BoolOr,
 }
 
 impl fmt::Display for AggregateFunction {
@@ -79,6 +89,11 @@ impl FromStr for AggregateFunction {
         Ok(match name {
             // general
             "avg" => AggregateFunction::Avg,
+            "bit_and" => AggregateFunction::BitAnd,
+            "bit_or" => AggregateFunction::BitOr,
+            "bit_xor" => AggregateFunction::BitXor,
+            "bool_and" => AggregateFunction::BoolAnd,
+            "bool_or" => AggregateFunction::BoolOr,
             "count" => AggregateFunction::Count,
             "max" => AggregateFunction::Max,
             "mean" => AggregateFunction::Avg,
@@ -140,6 +155,10 @@ pub fn return_type(
             Ok(coerced_data_types[0].clone())
         }
         AggregateFunction::Sum => sum_return_type(&coerced_data_types[0]),
+        AggregateFunction::BitAnd
+        | AggregateFunction::BitOr
+        | AggregateFunction::BitXor => Ok(coerced_data_types[0].clone()),
+        AggregateFunction::BoolAnd | AggregateFunction::BoolOr => Ok(DataType::Boolean),
         AggregateFunction::Variance => variance_return_type(&coerced_data_types[0]),
         AggregateFunction::VariancePop => variance_return_type(&coerced_data_types[0]),
         AggregateFunction::Covariance => covariance_return_type(&coerced_data_types[0]),
@@ -198,6 +217,14 @@ pub fn signature(fun: &AggregateFunction) -> Signature {
                 .collect::<Vec<_>>();
             Signature::uniform(1, valid, Volatility::Immutable)
         }
+        AggregateFunction::BitAnd
+        | AggregateFunction::BitOr
+        | AggregateFunction::BitXor => {
+            Signature::uniform(1, INTEGERS.to_vec(), Volatility::Immutable)
+        }
+        AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
+            Signature::uniform(1, vec![DataType::Boolean], Volatility::Immutable)
+        }
         AggregateFunction::Avg
         | AggregateFunction::Sum
         | AggregateFunction::Variance
diff --git a/datafusion/expr/src/type_coercion/aggregates.rs b/datafusion/expr/src/type_coercion/aggregates.rs
index 26498ed5b6..2cc6c322e6 100644
--- a/datafusion/expr/src/type_coercion/aggregates.rs
+++ b/datafusion/expr/src/type_coercion/aggregates.rs
@@ -27,6 +27,31 @@ use super::functions::can_coerce_from;
 
 pub static STRINGS: &[DataType] = &[DataType::Utf8, DataType::LargeUtf8];
 
+pub static SIGNED_INTEGERS: &[DataType] = &[
+    DataType::Int8,
+    DataType::Int16,
+    DataType::Int32,
+    DataType::Int64,
+];
+
+pub static UNSIGNED_INTEGERS: &[DataType] = &[
+    DataType::UInt8,
+    DataType::UInt16,
+    DataType::UInt32,
+    DataType::UInt64,
+];
+
+pub static INTEGERS: &[DataType] = &[
+    DataType::Int8,
+    DataType::Int16,
+    DataType::Int32,
+    DataType::Int64,
+    DataType::UInt8,
+    DataType::UInt16,
+    DataType::UInt32,
+    DataType::UInt64,
+];
+
 pub static NUMERICS: &[DataType] = &[
     DataType::Int8,
     DataType::Int16,
@@ -98,6 +123,30 @@ pub fn coerce_types(
             }
             Ok(input_types.to_vec())
         }
+        AggregateFunction::BitAnd
+        | AggregateFunction::BitOr
+        | AggregateFunction::BitXor => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval.
+            if !is_bit_and_or_xor_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} does not support inputs of type {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval.
+            if !is_bool_and_or_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} does not support inputs of type {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
         AggregateFunction::Variance => {
             if !is_variance_support_arg_type(&input_types[0]) {
                 return Err(DataFusionError::Plan(format!(
@@ -298,12 +347,8 @@ fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
 /// function return type of a sum
 pub fn sum_return_type(arg_type: &DataType) -> Result<DataType> {
     match arg_type {
-        DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
-            Ok(DataType::Int64)
-        }
-        DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
-            Ok(DataType::UInt64)
-        }
+        arg_type if SIGNED_INTEGERS.contains(arg_type) => Ok(DataType::Int64),
+        arg_type if UNSIGNED_INTEGERS.contains(arg_type) => Ok(DataType::UInt64),
         // In the https://www.postgresql.org/docs/current/functions-aggregate.html doc,
         // the result type of floating-point is FLOAT64 with the double precision.
         DataType::Float64 | DataType::Float32 => Ok(DataType::Float64),
@@ -404,6 +449,14 @@ pub fn avg_sum_type(arg_type: &DataType) -> Result<DataType> {
     }
 }
 
+pub fn is_bit_and_or_xor_support_arg_type(arg_type: &DataType) -> bool {
+    NUMERICS.contains(arg_type)
+}
+
+pub fn is_bool_and_or_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(arg_type, DataType::Boolean)
+}
+
 pub fn is_sum_support_arg_type(arg_type: &DataType) -> bool {
     match arg_type {
         DataType::Dictionary(_, dict_value_type) => {
diff --git a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
new file mode 100644
index 0000000000..eeb48035be
--- /dev/null
+++ b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
@@ -0,0 +1,934 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{
+        ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array,
+        UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::ArrowNativeTypeOp;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+use std::ops::BitXor as BitXorImplementation;
+
+// TODO: remove this macro rules after implementation in arrow-rs
+// https://github.com/apache/arrow-rs/pull/4210
+macro_rules! bit_operation {
+    ($NAME:ident, $OP:ident, $NATIVE:ident, $DEFAULT:expr, $DOC:expr) => {
+        #[doc = $DOC]
+        ///
+        /// Returns `None` if the array is empty or only contains null values.
+        fn $NAME<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+        where
+            T: ArrowNumericType,
+            T::Native: $NATIVE<Output = T::Native> + ArrowNativeTypeOp,
+        {
+            let default;
+            if $DEFAULT == -1 {
+                default = T::Native::ONE.neg_wrapping();
+            } else {
+                default = T::default_value();
+            }
+
+            let null_count = array.null_count();
+
+            if null_count == array.len() {
+                return None;
+            }
+
+            let data: &[T::Native] = array.values();
+
+            match array.nulls() {
+                None => {
+                    let result = data
+                        .iter()
+                        .fold(default, |accumulator, value| accumulator.$OP(*value));
+
+                    Some(result)
+                }
+                Some(nulls) => {
+                    let mut result = default;
+                    let data_chunks = data.chunks_exact(64);
+                    let remainder = data_chunks.remainder();
+
+                    let bit_chunks = nulls.inner().bit_chunks();
+                    data_chunks
+                        .zip(bit_chunks.iter())
+                        .for_each(|(chunk, mask)| {
+                            // index_mask has value 1 << i in the loop
+                            let mut index_mask = 1;
+                            chunk.iter().for_each(|value| {
+                                if (mask & index_mask) != 0 {
+                                    result = result.$OP(*value);
+                                }
+                                index_mask <<= 1;
+                            });
+                        });
+
+                    let remainder_bits = bit_chunks.remainder_bits();
+
+                    remainder.iter().enumerate().for_each(|(i, value)| {
+                        if remainder_bits & (1 << i) != 0 {
+                            result = result.$OP(*value);
+                        }
+                    });
+
+                    Some(result)
+                }
+            }
+        }
+    };
+}
+
+bit_operation!(
+    bit_and,
+    bitand,
+    BitAndImplementation,
+    -1,
+    "Returns the bitwise and of all non-null input values."
+);
+bit_operation!(
+    bit_or,
+    bitor,
+    BitOrImplementation,
+    0,
+    "Returns the bitwise or of all non-null input values."
+);
+bit_operation!(
+    bit_xor,
+    bitxor,
+    BitXorImplementation,
+    0,
+    "Returns the bitwise xor of all non-null input values."
+);
+
+// returns the new value after bit_and/bit_or/bit_xor with the new values, taking nullability into account
+macro_rules! typed_bit_and_or_xor_batch {
+    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
+        let array = downcast_value!($VALUES, $ARRAYTYPE);
+        let delta = $OP(array);
+        Ok(ScalarValue::$SCALAR(delta))
+    }};
+}
+
+// bit_and/bit_or/bit_xor the array and returns a ScalarValue of its corresponding type.
+macro_rules! bit_and_or_xor_batch {
+    ($VALUES:expr, $OP:ident) => {{
+        match $VALUES.data_type() {
+            DataType::Int64 => {
+                typed_bit_and_or_xor_batch!($VALUES, Int64Array, Int64, $OP)
+            }
+            DataType::Int32 => {
+                typed_bit_and_or_xor_batch!($VALUES, Int32Array, Int32, $OP)
+            }
+            DataType::Int16 => {
+                typed_bit_and_or_xor_batch!($VALUES, Int16Array, Int16, $OP)
+            }
+            DataType::Int8 => {
+                typed_bit_and_or_xor_batch!($VALUES, Int8Array, Int8, $OP)
+            }
+            DataType::UInt64 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt64Array, UInt64, $OP)
+            }
+            DataType::UInt32 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt32Array, UInt32, $OP)
+            }
+            DataType::UInt16 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt16Array, UInt16, $OP)
+            }
+            DataType::UInt8 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt8Array, UInt8, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "Bit and/Bit or/Bit xor is not expected to receive the type {e:?}"
+                )));
+            }
+        }
+    }};
+}
+
+/// dynamically-typed bit_and(array) -> ScalarValue
+fn bit_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bit_and_or_xor_batch!(values, bit_and)
+}
+
+/// dynamically-typed bit_or(array) -> ScalarValue
+fn bit_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bit_and_or_xor_batch!(values, bit_or)
+}
+
+/// dynamically-typed bit_xor(array) -> ScalarValue
+fn bit_xor_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bit_and_or_xor_batch!(values, bit_xor)
+}
+
+// bit_and/bit_or/bit_xor of two scalar values.
+macro_rules! typed_bit_and_or_xor_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $TYPE:ident, $OP:ident) => {{
+        paste::item! {
+            match $SCALAR {
+                None => {}
+                Some(v) => $ACC.[<$OP _ $TYPE>]($INDEX, *v as $TYPE)
+            }
+        }
+    }};
+}
+
+macro_rules! bit_and_or_xor_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $OP:ident) => {{
+        Ok(match $SCALAR {
+            ScalarValue::UInt64(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u64, $OP)
+            }
+            ScalarValue::UInt32(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u32, $OP)
+            }
+            ScalarValue::UInt16(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u16, $OP)
+            }
+            ScalarValue::UInt8(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u8, $OP)
+            }
+            ScalarValue::Int64(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i64, $OP)
+            }
+            ScalarValue::Int32(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i32, $OP)
+            }
+            ScalarValue::Int16(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i16, $OP)
+            }
+            ScalarValue::Int8(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i8, $OP)
+            }
+            ScalarValue::Null => {
+                // do nothing
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BIT AND/BIT OR/BIT XOR is not expected to receive scalars of incompatible types {:?}",
+                    e
+                )))
+            }
+        })
+    }};
+}
+
+pub fn bit_and_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bit_and_or_xor_v2!(index, accessor, s, bitand)
+}
+
+pub fn bit_or_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bit_and_or_xor_v2!(index, accessor, s, bitor)
+}
+
+pub fn bit_xor_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bit_and_or_xor_v2!(index, accessor, s, bitxor)
+}
+
+/// BIT_AND aggregate expression
+#[derive(Debug, Clone)]
+pub struct BitAnd {
+    name: String,
+    pub data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
+}
+
+impl BitAnd {
+    /// Create a new BIT_AND aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type,
+            nullable: true,
+        }
+    }
+}
+
+impl AggregateExpr for BitAnd {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BitAndAccumulator::try_new(&self.data_type)?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "bit_and"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn row_accumulator_supported(&self) -> bool {
+        is_row_accumulator_support_dtype(&self.data_type)
+    }
+
+    fn create_row_accumulator(
+        &self,
+        start_index: usize,
+    ) -> Result<Box<dyn RowAccumulator>> {
+        Ok(Box::new(BitAndRowAccumulator::new(
+            start_index,
+            self.data_type.clone(),
+        )))
+    }
+
+    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+        Some(Arc::new(self.clone()))
+    }
+}
+
+impl PartialEq<dyn Any> for BitAnd {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.name == x.name
+                    && self.data_type == x.data_type
+                    && self.nullable == x.nullable
+                    && self.expr.eq(&x.expr)
+            })
+            .unwrap_or(false)
+    }
+}
+
+#[derive(Debug)]
+struct BitAndAccumulator {
+    bit_and: ScalarValue,
+}
+
+impl BitAndAccumulator {
+    /// new bit_and accumulator
+    pub fn try_new(data_type: &DataType) -> Result<Self> {
+        Ok(Self {
+            bit_and: ScalarValue::try_from(data_type)?,
+        })
+    }
+}
+
+impl Accumulator for BitAndAccumulator {
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let values = &values[0];
+        let delta = &bit_and_batch(values)?;
+        self.bit_and = self.bit_and.bitand(delta)?;
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        self.update_batch(states)
+    }
+
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![self.bit_and.clone()])
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(self.bit_and.clone())
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self) - std::mem::size_of_val(&self.bit_and)
+            + self.bit_and.size()
+    }
+}
+
+#[derive(Debug)]
+struct BitAndRowAccumulator {
+    index: usize,
+    datatype: DataType,
+}
+
+impl BitAndRowAccumulator {
+    pub fn new(index: usize, datatype: DataType) -> Self {
+        Self { index, datatype }
+    }
+}
+
+impl RowAccumulator for BitAndRowAccumulator {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let values = &values[0];
+        let delta = &bit_and_batch(values)?;
+        bit_and_row(self.index, accessor, delta)
+    }
+
+    fn update_scalar_values(
+        &mut self,
+        values: &[ScalarValue],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let value = &values[0];
+        bit_and_row(self.index, accessor, value)
+    }
+
+    fn update_scalar(
+        &mut self,
+        value: &ScalarValue,
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        bit_and_row(self.index, accessor, value)
+    }
+
+    fn merge_batch(
+        &mut self,
+        states: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        self.update_batch(states, accessor)
+    }
+
+    fn evaluate(&self, accessor: &RowAccessor) -> Result<ScalarValue> {
+        Ok(accessor.get_as_scalar(&self.datatype, self.index))
+    }
+
+    #[inline(always)]
+    fn state_index(&self) -> usize {
+        self.index
+    }
+}
+
+/// BIT_OR aggregate expression
+#[derive(Debug, Clone)]
+pub struct BitOr {
+    name: String,
+    pub data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
+}
+
+impl BitOr {
+    /// Create a new BIT_OR aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type,
+            nullable: true,
+        }
+    }
+}
+
+impl AggregateExpr for BitOr {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BitOrAccumulator::try_new(&self.data_type)?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "bit_or"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn row_accumulator_supported(&self) -> bool {
+        is_row_accumulator_support_dtype(&self.data_type)
+    }
+
+    fn create_row_accumulator(
+        &self,
+        start_index: usize,
+    ) -> Result<Box<dyn RowAccumulator>> {
+        Ok(Box::new(BitOrRowAccumulator::new(
+            start_index,
+            self.data_type.clone(),
+        )))
+    }
+
+    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+        Some(Arc::new(self.clone()))
+    }
+}
+
+impl PartialEq<dyn Any> for BitOr {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.name == x.name
+                    && self.data_type == x.data_type
+                    && self.nullable == x.nullable
+                    && self.expr.eq(&x.expr)
+            })
+            .unwrap_or(false)
+    }
+}
+
+#[derive(Debug)]
+struct BitOrAccumulator {
+    bit_or: ScalarValue,
+}
+
+impl BitOrAccumulator {
+    /// new bit_or accumulator
+    pub fn try_new(data_type: &DataType) -> Result<Self> {
+        Ok(Self {
+            bit_or: ScalarValue::try_from(data_type)?,
+        })
+    }
+}
+
+impl Accumulator for BitOrAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![self.bit_or.clone()])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let values = &values[0];
+        let delta = &bit_or_batch(values)?;
+        self.bit_or = self.bit_or.bitor(delta)?;
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        self.update_batch(states)
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(self.bit_or.clone())
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self) - std::mem::size_of_val(&self.bit_or)
+            + self.bit_or.size()
+    }
+}
+
+#[derive(Debug)]
+struct BitOrRowAccumulator {
+    index: usize,
+    datatype: DataType,
+}
+
+impl BitOrRowAccumulator {
+    pub fn new(index: usize, datatype: DataType) -> Self {
+        Self { index, datatype }
+    }
+}
+
+impl RowAccumulator for BitOrRowAccumulator {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let values = &values[0];
+        let delta = &bit_or_batch(values)?;
+        bit_or_row(self.index, accessor, delta)?;
+        Ok(())
+    }
+
+    fn update_scalar_values(
+        &mut self,
+        values: &[ScalarValue],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let value = &values[0];
+        bit_or_row(self.index, accessor, value)
+    }
+
+    fn update_scalar(
+        &mut self,
+        value: &ScalarValue,
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        bit_or_row(self.index, accessor, value)
+    }
+
+    fn merge_batch(
+        &mut self,
+        states: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        self.update_batch(states, accessor)
+    }
+
+    fn evaluate(&self, accessor: &RowAccessor) -> Result<ScalarValue> {
+        Ok(accessor.get_as_scalar(&self.datatype, self.index))
+    }
+
+    #[inline(always)]
+    fn state_index(&self) -> usize {
+        self.index
+    }
+}
+
+/// BIT_XOR aggregate expression
+#[derive(Debug, Clone)]
+pub struct BitXor {
+    name: String,
+    pub data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
+}
+
+impl BitXor {
+    /// Create a new BIT_XOR aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type,
+            nullable: true,
+        }
+    }
+}
+
+impl AggregateExpr for BitXor {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BitXorAccumulator::try_new(&self.data_type)?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "bit_xor"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn row_accumulator_supported(&self) -> bool {
+        is_row_accumulator_support_dtype(&self.data_type)
+    }
+
+    fn create_row_accumulator(
+        &self,
+        start_index: usize,
+    ) -> Result<Box<dyn RowAccumulator>> {
+        Ok(Box::new(BitXorRowAccumulator::new(
+            start_index,
+            self.data_type.clone(),
+        )))
+    }
+
+    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+        Some(Arc::new(self.clone()))
+    }
+}
+
+impl PartialEq<dyn Any> for BitXor {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.name == x.name
+                    && self.data_type == x.data_type
+                    && self.nullable == x.nullable
+                    && self.expr.eq(&x.expr)
+            })
+            .unwrap_or(false)
+    }
+}
+
+#[derive(Debug)]
+struct BitXorAccumulator {
+    bit_xor: ScalarValue,
+}
+
+impl BitXorAccumulator {
+    /// new bit_xor accumulator
+    pub fn try_new(data_type: &DataType) -> Result<Self> {
+        Ok(Self {
+            bit_xor: ScalarValue::try_from(data_type)?,
+        })
+    }
+}
+
+impl Accumulator for BitXorAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![self.bit_xor.clone()])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let values = &values[0];
+        let delta = &bit_xor_batch(values)?;
+        self.bit_xor = self.bit_xor.bitxor(delta)?;
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        self.update_batch(states)
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(self.bit_xor.clone())
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self) - std::mem::size_of_val(&self.bit_xor)
+            + self.bit_xor.size()
+    }
+}
+
+#[derive(Debug)]
+struct BitXorRowAccumulator {
+    index: usize,
+    datatype: DataType,
+}
+
+impl BitXorRowAccumulator {
+    pub fn new(index: usize, datatype: DataType) -> Self {
+        Self { index, datatype }
+    }
+}
+
+impl RowAccumulator for BitXorRowAccumulator {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let values = &values[0];
+        let delta = &bit_xor_batch(values)?;
+        bit_xor_row(self.index, accessor, delta)?;
+        Ok(())
+    }
+
+    fn update_scalar_values(
+        &mut self,
+        values: &[ScalarValue],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let value = &values[0];
+        bit_xor_row(self.index, accessor, value)
+    }
+
+    fn update_scalar(
+        &mut self,
+        value: &ScalarValue,
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        bit_xor_row(self.index, accessor, value)
+    }
+
+    fn merge_batch(
+        &mut self,
+        states: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        self.update_batch(states, accessor)
+    }
+
+    fn evaluate(&self, accessor: &RowAccessor) -> Result<ScalarValue> {
+        Ok(accessor.get_as_scalar(&self.datatype, self.index))
+    }
+
+    #[inline(always)]
+    fn state_index(&self) -> usize {
+        self.index
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::expressions::col;
+    use crate::expressions::tests::aggregate;
+    use crate::generic_test_op;
+    use arrow::datatypes::*;
+    use arrow::record_batch::RecordBatch;
+    use datafusion_common::Result;
+
+    #[test]
+    fn bit_and_i32() -> Result<()> {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![4, 7, 15]));
+        generic_test_op!(a, DataType::Int32, BitAnd, ScalarValue::from(4i32))
+    }
+
+    #[test]
+    fn bit_and_i32_with_nulls() -> Result<()> {
+        let a: ArrayRef =
+            Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(5)]));
+        generic_test_op!(a, DataType::Int32, BitAnd, ScalarValue::from(1i32))
+    }
+
+    #[test]
+    fn bit_and_i32_all_nulls() -> Result<()> {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
+        generic_test_op!(a, DataType::Int32, BitAnd, ScalarValue::Int32(None))
+    }
+
+    #[test]
+    fn bit_and_u32() -> Result<()> {
+        let a: ArrayRef = Arc::new(UInt32Array::from(vec![4_u32, 7_u32, 15_u32]));
+        generic_test_op!(a, DataType::UInt32, BitAnd, ScalarValue::from(4u32))
+    }
+
+    #[test]
+    fn bit_or_i32() -> Result<()> {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![4, 7, 15]));
+        generic_test_op!(a, DataType::Int32, BitOr, ScalarValue::from(15i32))
+    }
+
+    #[test]
+    fn bit_or_i32_with_nulls() -> Result<()> {
+        let a: ArrayRef =
+            Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(5)]));
+        generic_test_op!(a, DataType::Int32, BitOr, ScalarValue::from(7i32))
+    }
+
+    #[test]
+    fn bit_or_i32_all_nulls() -> Result<()> {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
+        generic_test_op!(a, DataType::Int32, BitOr, ScalarValue::Int32(None))
+    }
+
+    #[test]
+    fn bit_or_u32() -> Result<()> {
+        let a: ArrayRef = Arc::new(UInt32Array::from(vec![4_u32, 7_u32, 15_u32]));
+        generic_test_op!(a, DataType::UInt32, BitOr, ScalarValue::from(15u32))
+    }
+
+    #[test]
+    fn bit_xor_i32() -> Result<()> {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![4, 7, 15]));
+        generic_test_op!(a, DataType::Int32, BitXor, ScalarValue::from(12i32))
+    }
+
+    #[test]
+    fn bit_xor_i32_with_nulls() -> Result<()> {
+        let a: ArrayRef =
+            Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(5)]));
+        generic_test_op!(a, DataType::Int32, BitXor, ScalarValue::from(7i32))
+    }
+
+    #[test]
+    fn bit_xor_i32_all_nulls() -> Result<()> {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
+        generic_test_op!(a, DataType::Int32, BitXor, ScalarValue::Int32(None))
+    }
+
+    #[test]
+    fn bit_xor_u32() -> Result<()> {
+        let a: ArrayRef = Arc::new(UInt32Array::from(vec![4_u32, 7_u32, 15_u32]));
+        generic_test_op!(a, DataType::UInt32, BitXor, ScalarValue::from(12u32))
+    }
+}
diff --git a/datafusion/physical-expr/src/aggregate/bool_and_or.rs b/datafusion/physical-expr/src/aggregate/bool_and_or.rs
new file mode 100644
index 0000000000..15b6964f7a
--- /dev/null
+++ b/datafusion/physical-expr/src/aggregate/bool_and_or.rs
@@ -0,0 +1,582 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{ArrayRef, BooleanArray},
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use datafusion_row::accessor::RowAccessor;
+
+fn bool_and(array: &BooleanArray) -> Option<bool> {
+    if array.null_count() == array.len() {
+        return None;
+    }
+    Some(array.false_count() == 0)
+}
+
+fn bool_or(array: &BooleanArray) -> Option<bool> {
+    if array.null_count() == array.len() {
+        return None;
+    }
+    Some(array.true_count() != 0)
+}
+
+// returns the new value after bool_and/bool_or with the new values, taking nullability into account
+macro_rules! typed_bool_and_or_batch {
+    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
+        let array = downcast_value!($VALUES, $ARRAYTYPE);
+        let delta = $OP(array);
+        Ok(ScalarValue::$SCALAR(delta))
+    }};
+}
+
+// bool_and/bool_or the array and returns a ScalarValue of its corresponding type.
+macro_rules! bool_and_or_batch {
+    ($VALUES:expr, $OP:ident) => {{
+        match $VALUES.data_type() {
+            DataType::Boolean => {
+                typed_bool_and_or_batch!($VALUES, BooleanArray, Boolean, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "Bool and/Bool or is not expected to receive the type {e:?}"
+                )));
+            }
+        }
+    }};
+}
+
+/// dynamically-typed bool_and(array) -> ScalarValue
+fn bool_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bool_and_or_batch!(values, bool_and)
+}
+
+/// dynamically-typed bool_or(array) -> ScalarValue
+fn bool_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bool_and_or_batch!(values, bool_or)
+}
+
+// bool_and/bool_or of two scalar values.
+macro_rules! typed_bool_and_or_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $TYPE:ident, $OP:ident) => {{
+        paste::item! {
+            match $SCALAR {
+                None => {}
+                Some(v) => $ACC.[<$OP _ $TYPE>]($INDEX, *v as $TYPE)
+            }
+        }
+    }};
+}
+
+macro_rules! bool_and_or_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $OP:ident) => {{
+        Ok(match $SCALAR {
+            ScalarValue::Boolean(rhs) => {
+                typed_bool_and_or_v2!($INDEX, $ACC, rhs, bool, $OP)
+            }
+            ScalarValue::Null => {
+                // do nothing
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BOOL AND/BOOL OR is not expected to receive scalars of incompatible types {:?}",
+                    e
+                )))
+            }
+        })
+    }};
+}
+
+pub fn bool_and_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bool_and_or_v2!(index, accessor, s, bitand)
+}
+
+pub fn bool_or_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bool_and_or_v2!(index, accessor, s, bitor)
+}
+
+/// BOOL_AND aggregate expression
+#[derive(Debug, Clone)]
+pub struct BoolAnd {
+    name: String,
+    pub data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
+}
+
+impl BoolAnd {
+    /// Create a new BOOL_AND aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type,
+            nullable: true,
+        }
+    }
+}
+
+impl AggregateExpr for BoolAnd {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BoolAndAccumulator::try_new(&self.data_type)?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "bool_and"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn row_accumulator_supported(&self) -> bool {
+        is_row_accumulator_support_dtype(&self.data_type)
+    }
+
+    fn create_row_accumulator(
+        &self,
+        start_index: usize,
+    ) -> Result<Box<dyn RowAccumulator>> {
+        Ok(Box::new(BoolAndRowAccumulator::new(
+            start_index,
+            self.data_type.clone(),
+        )))
+    }
+
+    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+        Some(Arc::new(self.clone()))
+    }
+
+    fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BoolAndAccumulator::try_new(&self.data_type)?))
+    }
+}
+
+impl PartialEq<dyn Any> for BoolAnd {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.name == x.name
+                    && self.data_type == x.data_type
+                    && self.nullable == x.nullable
+                    && self.expr.eq(&x.expr)
+            })
+            .unwrap_or(false)
+    }
+}
+
+#[derive(Debug)]
+struct BoolAndAccumulator {
+    bool_and: ScalarValue,
+}
+
+impl BoolAndAccumulator {
+    /// new bool_and accumulator
+    pub fn try_new(data_type: &DataType) -> Result<Self> {
+        Ok(Self {
+            bool_and: ScalarValue::try_from(data_type)?,
+        })
+    }
+}
+
+impl Accumulator for BoolAndAccumulator {
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let values = &values[0];
+        let delta = &bool_and_batch(values)?;
+        self.bool_and = self.bool_and.and(delta)?;
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        self.update_batch(states)
+    }
+
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![self.bool_and.clone()])
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(self.bool_and.clone())
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self) - std::mem::size_of_val(&self.bool_and)
+            + self.bool_and.size()
+    }
+}
+
+#[derive(Debug)]
+struct BoolAndRowAccumulator {
+    index: usize,
+    datatype: DataType,
+}
+
+impl BoolAndRowAccumulator {
+    pub fn new(index: usize, datatype: DataType) -> Self {
+        Self { index, datatype }
+    }
+}
+
+impl RowAccumulator for BoolAndRowAccumulator {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let values = &values[0];
+        let delta = &bool_and_batch(values)?;
+        bool_and_row(self.index, accessor, delta)
+    }
+
+    fn update_scalar_values(
+        &mut self,
+        values: &[ScalarValue],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let value = &values[0];
+        bool_and_row(self.index, accessor, value)
+    }
+
+    fn update_scalar(
+        &mut self,
+        value: &ScalarValue,
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        bool_and_row(self.index, accessor, value)
+    }
+
+    fn merge_batch(
+        &mut self,
+        states: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        self.update_batch(states, accessor)
+    }
+
+    fn evaluate(&self, accessor: &RowAccessor) -> Result<ScalarValue> {
+        Ok(accessor.get_as_scalar(&self.datatype, self.index))
+    }
+
+    #[inline(always)]
+    fn state_index(&self) -> usize {
+        self.index
+    }
+}
+
+/// BOOL_OR aggregate expression
+#[derive(Debug, Clone)]
+pub struct BoolOr {
+    name: String,
+    pub data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
+}
+
+impl BoolOr {
+    /// Create a new BOOL_OR aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type,
+            nullable: true,
+        }
+    }
+}
+
+impl AggregateExpr for BoolOr {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BoolOrAccumulator::try_new(&self.data_type)?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "bool_or"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn row_accumulator_supported(&self) -> bool {
+        is_row_accumulator_support_dtype(&self.data_type)
+    }
+
+    fn create_row_accumulator(
+        &self,
+        start_index: usize,
+    ) -> Result<Box<dyn RowAccumulator>> {
+        Ok(Box::new(BoolOrRowAccumulator::new(
+            start_index,
+            self.data_type.clone(),
+        )))
+    }
+
+    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+        Some(Arc::new(self.clone()))
+    }
+
+    fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BoolOrAccumulator::try_new(&self.data_type)?))
+    }
+}
+
+impl PartialEq<dyn Any> for BoolOr {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.name == x.name
+                    && self.data_type == x.data_type
+                    && self.nullable == x.nullable
+                    && self.expr.eq(&x.expr)
+            })
+            .unwrap_or(false)
+    }
+}
+
+#[derive(Debug)]
+struct BoolOrAccumulator {
+    bool_or: ScalarValue,
+}
+
+impl BoolOrAccumulator {
+    /// new bool_or accumulator
+    pub fn try_new(data_type: &DataType) -> Result<Self> {
+        Ok(Self {
+            bool_or: ScalarValue::try_from(data_type)?,
+        })
+    }
+}
+
+impl Accumulator for BoolOrAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![self.bool_or.clone()])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let values = &values[0];
+        let delta = bool_or_batch(values)?;
+        self.bool_or = self.bool_or.or(&delta)?;
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        self.update_batch(states)
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(self.bool_or.clone())
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self) - std::mem::size_of_val(&self.bool_or)
+            + self.bool_or.size()
+    }
+}
+
+#[derive(Debug)]
+struct BoolOrRowAccumulator {
+    index: usize,
+    datatype: DataType,
+}
+
+impl BoolOrRowAccumulator {
+    pub fn new(index: usize, datatype: DataType) -> Self {
+        Self { index, datatype }
+    }
+}
+
+impl RowAccumulator for BoolOrRowAccumulator {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let values = &values[0];
+        let delta = &bool_or_batch(values)?;
+        bool_or_row(self.index, accessor, delta)?;
+        Ok(())
+    }
+
+    fn update_scalar_values(
+        &mut self,
+        values: &[ScalarValue],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let value = &values[0];
+        bool_or_row(self.index, accessor, value)
+    }
+
+    fn update_scalar(
+        &mut self,
+        value: &ScalarValue,
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        bool_or_row(self.index, accessor, value)
+    }
+
+    fn merge_batch(
+        &mut self,
+        states: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        self.update_batch(states, accessor)
+    }
+
+    fn evaluate(&self, accessor: &RowAccessor) -> Result<ScalarValue> {
+        Ok(accessor.get_as_scalar(&self.datatype, self.index))
+    }
+
+    #[inline(always)]
+    fn state_index(&self) -> usize {
+        self.index
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::expressions::col;
+    use crate::expressions::tests::aggregate;
+    use crate::generic_test_op;
+    use arrow::datatypes::*;
+    use arrow::record_batch::RecordBatch;
+    use arrow_array::BooleanArray;
+    use datafusion_common::Result;
+
+    #[test]
+    fn test_bool_and() -> Result<()> {
+        let a: ArrayRef = Arc::new(BooleanArray::from(vec![true, true, false]));
+        generic_test_op!(a, DataType::Boolean, BoolAnd, ScalarValue::from(false))
+    }
+
+    #[test]
+    fn bool_and_with_nulls() -> Result<()> {
+        let a: ArrayRef = Arc::new(BooleanArray::from(vec![
+            Some(true),
+            None,
+            Some(true),
+            Some(true),
+        ]));
+        generic_test_op!(a, DataType::Boolean, BoolAnd, ScalarValue::from(true))
+    }
+
+    #[test]
+    fn bool_and_all_nulls() -> Result<()> {
+        let a: ArrayRef = Arc::new(BooleanArray::from(vec![None, None]));
+        generic_test_op!(a, DataType::Boolean, BoolAnd, ScalarValue::Boolean(None))
+    }
+
+    #[test]
+    fn test_bool_or() -> Result<()> {
+        let a: ArrayRef = Arc::new(BooleanArray::from(vec![true, true, false]));
+        generic_test_op!(a, DataType::Boolean, BoolOr, ScalarValue::from(true))
+    }
+
+    #[test]
+    fn bool_or_with_nulls() -> Result<()> {
+        let a: ArrayRef = Arc::new(BooleanArray::from(vec![
+            Some(false),
+            None,
+            Some(false),
+            Some(false),
+        ]));
+        generic_test_op!(a, DataType::Boolean, BoolOr, ScalarValue::from(false))
+    }
+
+    #[test]
+    fn bool_or_all_nulls() -> Result<()> {
+        let a: ArrayRef = Arc::new(BooleanArray::from(vec![None, None]));
+        generic_test_op!(a, DataType::Boolean, BoolOr, ScalarValue::Boolean(None))
+    }
+}
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs
index d3b85d706d..2410f0147e 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -65,6 +65,56 @@ pub fn create_aggregate_expr(
             name,
             rt_type,
         )),
+        (AggregateFunction::BitAnd, false) => Arc::new(expressions::BitAnd::new(
+            input_phy_exprs[0].clone(),
+            name,
+            rt_type,
+        )),
+        (AggregateFunction::BitAnd, true) => {
+            return Err(DataFusionError::NotImplemented(
+                "BIT_AND(DISTINCT) aggregations are not available".to_string(),
+            ));
+        }
+        (AggregateFunction::BitOr, false) => Arc::new(expressions::BitOr::new(
+            input_phy_exprs[0].clone(),
+            name,
+            rt_type,
+        )),
+        (AggregateFunction::BitOr, true) => {
+            return Err(DataFusionError::NotImplemented(
+                "BIT_OR(DISTINCT) aggregations are not available".to_string(),
+            ));
+        }
+        (AggregateFunction::BitXor, false) => Arc::new(expressions::BitXor::new(
+            input_phy_exprs[0].clone(),
+            name,
+            rt_type,
+        )),
+        (AggregateFunction::BitXor, true) => {
+            return Err(DataFusionError::NotImplemented(
+                "BIT_XOR(DISTINCT) aggregations are not available".to_string(),
+            ));
+        }
+        (AggregateFunction::BoolAnd, false) => Arc::new(expressions::BoolAnd::new(
+            input_phy_exprs[0].clone(),
+            name,
+            rt_type,
+        )),
+        (AggregateFunction::BoolAnd, true) => {
+            return Err(DataFusionError::NotImplemented(
+                "BOOL_AND(DISTINCT) aggregations are not available".to_string(),
+            ));
+        }
+        (AggregateFunction::BoolOr, false) => Arc::new(expressions::BoolOr::new(
+            input_phy_exprs[0].clone(),
+            name,
+            rt_type,
+        )),
+        (AggregateFunction::BoolOr, true) => {
+            return Err(DataFusionError::NotImplemented(
+                "BOOL_OR(DISTINCT) aggregations are not available".to_string(),
+            ));
+        }
         (AggregateFunction::Sum, false) => {
             let cast_to_sum_type = rt_type != input_phy_types[0];
             Arc::new(expressions::Sum::new_with_pre_cast(
@@ -266,8 +316,8 @@ mod tests {
     use super::*;
     use crate::expressions::{
         try_cast, ApproxDistinct, ApproxMedian, ApproxPercentileCont, ArrayAgg, Avg,
-        Correlation, Count, Covariance, DistinctArrayAgg, DistinctCount, Max, Min,
-        Stddev, Sum, Variance,
+        BitAnd, BitOr, BitXor, BoolAnd, BoolOr, Correlation, Count, Covariance,
+        DistinctArrayAgg, DistinctCount, Max, Min, Stddev, Sum, Variance,
     };
     use arrow::datatypes::{DataType, Field};
     use datafusion_common::ScalarValue;
@@ -480,6 +530,102 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_bit_and_or_xor_expr() -> Result<()> {
+        let funcs = vec![
+            AggregateFunction::BitAnd,
+            AggregateFunction::BitOr,
+            AggregateFunction::BitXor,
+        ];
+        let data_types = vec![DataType::UInt64, DataType::Int64];
+        for fun in funcs {
+            for data_type in &data_types {
+                let input_schema =
+                    Schema::new(vec![Field::new("c1", data_type.clone(), true)]);
+                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(
+                    expressions::Column::new_with_schema("c1", &input_schema).unwrap(),
+                )];
+                let result_agg_phy_exprs = create_physical_agg_expr_for_test(
+                    &fun,
+                    false,
+                    &input_phy_exprs[0..1],
+                    &input_schema,
+                    "c1",
+                )?;
+                match fun {
+                    AggregateFunction::BitAnd => {
+                        assert!(result_agg_phy_exprs.as_any().is::<BitAnd>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", data_type.clone(), true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::BitOr => {
+                        assert!(result_agg_phy_exprs.as_any().is::<BitOr>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", data_type.clone(), true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::BitXor => {
+                        assert!(result_agg_phy_exprs.as_any().is::<BitXor>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", data_type.clone(), true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    _ => {}
+                };
+            }
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_bool_and_or_expr() -> Result<()> {
+        let funcs = vec![AggregateFunction::BoolAnd, AggregateFunction::BoolOr];
+        let data_types = vec![DataType::Boolean];
+        for fun in funcs {
+            for data_type in &data_types {
+                let input_schema =
+                    Schema::new(vec![Field::new("c1", data_type.clone(), true)]);
+                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(
+                    expressions::Column::new_with_schema("c1", &input_schema).unwrap(),
+                )];
+                let result_agg_phy_exprs = create_physical_agg_expr_for_test(
+                    &fun,
+                    false,
+                    &input_phy_exprs[0..1],
+                    &input_schema,
+                    "c1",
+                )?;
+                match fun {
+                    AggregateFunction::BoolAnd => {
+                        assert!(result_agg_phy_exprs.as_any().is::<BoolAnd>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", data_type.clone(), true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::BoolOr => {
+                        assert!(result_agg_phy_exprs.as_any().is::<BoolOr>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", data_type.clone(), true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    _ => {}
+                };
+            }
+        }
+        Ok(())
+    }
+
     #[test]
     fn test_sum_avg_expr() -> Result<()> {
         let funcs = vec![AggregateFunction::Sum, AggregateFunction::Avg];
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs
index b3e37a8f92..34302c5aaf 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -31,6 +31,8 @@ pub(crate) mod approx_percentile_cont_with_weight;
 pub(crate) mod array_agg;
 pub(crate) mod array_agg_distinct;
 pub(crate) mod average;
+pub(crate) mod bit_and_or_xor;
+pub(crate) mod bool_and_or;
 pub(crate) mod correlation;
 pub(crate) mod count;
 pub(crate) mod count_distinct;
diff --git a/datafusion/physical-expr/src/aggregate/row_accumulator.rs b/datafusion/physical-expr/src/aggregate/row_accumulator.rs
index 19e847b3e7..e528262922 100644
--- a/datafusion/physical-expr/src/aggregate/row_accumulator.rs
+++ b/datafusion/physical-expr/src/aggregate/row_accumulator.rs
@@ -83,7 +83,8 @@ pub trait RowAccumulator: Send + Sync + Debug {
 pub fn is_row_accumulator_support_dtype(data_type: &DataType) -> bool {
     matches!(
         data_type,
-        DataType::UInt8
+        DataType::Boolean
+            | DataType::UInt8
             | DataType::UInt16
             | DataType::UInt32
             | DataType::UInt64
diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs
index 135e24dc83..afe1ccd99f 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -47,6 +47,8 @@ pub use crate::aggregate::approx_percentile_cont_with_weight::ApproxPercentileCo
 pub use crate::aggregate::array_agg::ArrayAgg;
 pub use crate::aggregate::array_agg_distinct::DistinctArrayAgg;
 pub use crate::aggregate::average::{Avg, AvgAccumulator};
+pub use crate::aggregate::bit_and_or_xor::{BitAnd, BitOr, BitXor};
+pub use crate::aggregate::bool_and_or::{BoolAnd, BoolOr};
 pub use crate::aggregate::build_in::create_aggregate_expr;
 pub use crate::aggregate::correlation::Correlation;
 pub use crate::aggregate::count::Count;
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index f2a6176bf2..08e6360aa3 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -564,6 +564,11 @@ enum AggregateFunction {
   APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16;
   GROUPING = 17;
   MEDIAN = 18;
+  BIT_AND = 19;
+  BIT_OR = 20;
+  BIT_XOR = 21;
+  BOOL_AND = 22;
+  BOOL_OR = 23;
 }
 
 message AggregateExprNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 405fa04bce..79f4f06d37 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -422,6 +422,11 @@ impl serde::Serialize for AggregateFunction {
             Self::ApproxPercentileContWithWeight => "APPROX_PERCENTILE_CONT_WITH_WEIGHT",
             Self::Grouping => "GROUPING",
             Self::Median => "MEDIAN",
+            Self::BitAnd => "BIT_AND",
+            Self::BitOr => "BIT_OR",
+            Self::BitXor => "BIT_XOR",
+            Self::BoolAnd => "BOOL_AND",
+            Self::BoolOr => "BOOL_OR",
         };
         serializer.serialize_str(variant)
     }
@@ -452,6 +457,11 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
             "APPROX_PERCENTILE_CONT_WITH_WEIGHT",
             "GROUPING",
             "MEDIAN",
+            "BIT_AND",
+            "BIT_OR",
+            "BIT_XOR",
+            "BOOL_AND",
+            "BOOL_OR",
         ];
 
         struct GeneratedVisitor;
@@ -513,6 +523,11 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
                     "APPROX_PERCENTILE_CONT_WITH_WEIGHT" => Ok(AggregateFunction::ApproxPercentileContWithWeight),
                     "GROUPING" => Ok(AggregateFunction::Grouping),
                     "MEDIAN" => Ok(AggregateFunction::Median),
+                    "BIT_AND" => Ok(AggregateFunction::BitAnd),
+                    "BIT_OR" => Ok(AggregateFunction::BitOr),
+                    "BIT_XOR" => Ok(AggregateFunction::BitXor),
+                    "BOOL_AND" => Ok(AggregateFunction::BoolAnd),
+                    "BOOL_OR" => Ok(AggregateFunction::BoolOr),
                     _ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
                 }
             }
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index 58cc0b312a..0736b75084 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2381,6 +2381,11 @@ pub enum AggregateFunction {
     ApproxPercentileContWithWeight = 16,
     Grouping = 17,
     Median = 18,
+    BitAnd = 19,
+    BitOr = 20,
+    BitXor = 21,
+    BoolAnd = 22,
+    BoolOr = 23,
 }
 impl AggregateFunction {
     /// String value of the enum field names used in the ProtoBuf definition.
@@ -2410,6 +2415,11 @@ impl AggregateFunction {
             }
             AggregateFunction::Grouping => "GROUPING",
             AggregateFunction::Median => "MEDIAN",
+            AggregateFunction::BitAnd => "BIT_AND",
+            AggregateFunction::BitOr => "BIT_OR",
+            AggregateFunction::BitXor => "BIT_XOR",
+            AggregateFunction::BoolAnd => "BOOL_AND",
+            AggregateFunction::BoolOr => "BOOL_OR",
         }
     }
     /// Creates an enum from field names used in the ProtoBuf definition.
@@ -2436,6 +2446,11 @@ impl AggregateFunction {
             }
             "GROUPING" => Some(Self::Grouping),
             "MEDIAN" => Some(Self::Median),
+            "BIT_AND" => Some(Self::BitAnd),
+            "BIT_OR" => Some(Self::BitOr),
+            "BIT_XOR" => Some(Self::BitXor),
+            "BOOL_AND" => Some(Self::BoolAnd),
+            "BOOL_OR" => Some(Self::BoolOr),
             _ => None,
         }
     }
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs
index 5afd94b5a7..de4b03a069 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -505,6 +505,11 @@ impl From<protobuf::AggregateFunction> for AggregateFunction {
             protobuf::AggregateFunction::Max => Self::Max,
             protobuf::AggregateFunction::Sum => Self::Sum,
             protobuf::AggregateFunction::Avg => Self::Avg,
+            protobuf::AggregateFunction::BitAnd => Self::BitAnd,
+            protobuf::AggregateFunction::BitOr => Self::BitOr,
+            protobuf::AggregateFunction::BitXor => Self::BitXor,
+            protobuf::AggregateFunction::BoolAnd => Self::BoolAnd,
+            protobuf::AggregateFunction::BoolOr => Self::BoolOr,
             protobuf::AggregateFunction::Count => Self::Count,
             protobuf::AggregateFunction::ApproxDistinct => Self::ApproxDistinct,
             protobuf::AggregateFunction::ArrayAgg => Self::ArrayAgg,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs
index e881051f27..0ffc893071 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -366,6 +366,11 @@ impl From<&AggregateFunction> for protobuf::AggregateFunction {
             AggregateFunction::Max => Self::Max,
             AggregateFunction::Sum => Self::Sum,
             AggregateFunction::Avg => Self::Avg,
+            AggregateFunction::BitAnd => Self::BitAnd,
+            AggregateFunction::BitOr => Self::BitOr,
+            AggregateFunction::BitXor => Self::BitXor,
+            AggregateFunction::BoolAnd => Self::BoolAnd,
+            AggregateFunction::BoolOr => Self::BoolOr,
             AggregateFunction::Count => Self::Count,
             AggregateFunction::ApproxDistinct => Self::ApproxDistinct,
             AggregateFunction::ArrayAgg => Self::ArrayAgg,
@@ -632,6 +637,11 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
                     AggregateFunction::Min => protobuf::AggregateFunction::Min,
                     AggregateFunction::Max => protobuf::AggregateFunction::Max,
                     AggregateFunction::Sum => protobuf::AggregateFunction::Sum,
+                    AggregateFunction::BitAnd => protobuf::AggregateFunction::BitAnd,
+                    AggregateFunction::BitOr => protobuf::AggregateFunction::BitOr,
+                    AggregateFunction::BitXor => protobuf::AggregateFunction::BitXor,
+                    AggregateFunction::BoolAnd => protobuf::AggregateFunction::BoolAnd,
+                    AggregateFunction::BoolOr => protobuf::AggregateFunction::BoolOr,
                     AggregateFunction::Avg => protobuf::AggregateFunction::Avg,
                     AggregateFunction::Count => protobuf::AggregateFunction::Count,
                     AggregateFunction::Variance => protobuf::AggregateFunction::Variance,
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs
index 9495c841be..90260b231f 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -37,7 +37,8 @@ use datafusion::physical_plan::file_format::FileScanConfig;
 use datafusion::physical_plan::expressions::{Count, DistinctCount, Literal};
 
 use datafusion::physical_plan::expressions::{
-    Avg, BinaryExpr, Column, LikeExpr, Max, Min, Sum,
+    Avg, BinaryExpr, BitAnd, BitOr, BitXor, BoolAnd, BoolOr, Column, LikeExpr, Max, Min,
+    Sum,
 };
 use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};
 
@@ -70,6 +71,16 @@ impl TryFrom<Arc<dyn AggregateExpr>> for protobuf::PhysicalExprNode {
             Ok(AggregateFunction::Sum.into())
         } else if a.as_any().downcast_ref::<Count>().is_some() {
             Ok(AggregateFunction::Count.into())
+        } else if a.as_any().downcast_ref::<BitAnd>().is_some() {
+            Ok(AggregateFunction::BitAnd.into())
+        } else if a.as_any().downcast_ref::<BitOr>().is_some() {
+            Ok(AggregateFunction::BitOr.into())
+        } else if a.as_any().downcast_ref::<BitXor>().is_some() {
+            Ok(AggregateFunction::BitXor.into())
+        } else if a.as_any().downcast_ref::<BoolAnd>().is_some() {
+            Ok(AggregateFunction::BoolAnd.into())
+        } else if a.as_any().downcast_ref::<BoolOr>().is_some() {
+            Ok(AggregateFunction::BoolOr.into())
         } else if a.as_any().downcast_ref::<DistinctCount>().is_some() {
             distinct = true;
             Ok(AggregateFunction::Count.into())
diff --git a/datafusion/row/src/accessor.rs b/datafusion/row/src/accessor.rs
index 14a7ca264c..d4db66f367 100644
--- a/datafusion/row/src/accessor.rs
+++ b/datafusion/row/src/accessor.rs
@@ -23,6 +23,7 @@ use crate::{fn_get_idx, fn_get_idx_opt, fn_set_idx};
 use arrow::datatypes::{DataType, Schema};
 use arrow::util::bit_util::{get_bit_raw, set_bit_raw};
 use datafusion_common::ScalarValue;
+use std::ops::{BitAnd, BitOr, BitXor};
 use std::sync::Arc;
 
 //TODO: DRY with reader and writer
@@ -102,6 +103,24 @@ macro_rules! fn_max_min_idx {
     };
 }
 
+macro_rules! fn_bit_and_or_xor_idx {
+    ($NATIVE: ident, $OP: ident) => {
+        paste::item! {
+            /// check bit_and then update
+            #[inline(always)]
+            pub fn [<$OP _ $NATIVE>](&mut self, idx: usize, value: $NATIVE) {
+                if self.is_valid_at(idx) {
+                    let v = value.$OP(self.[<get_ $NATIVE>](idx));
+                    self.[<set_ $NATIVE>](idx, v);
+                } else {
+                    self.set_non_null_at(idx);
+                    self.[<set_ $NATIVE>](idx, value);
+                }
+            }
+        }
+    };
+}
+
 macro_rules! fn_get_idx_scalar {
     ($NATIVE: ident, $SCALAR:ident) => {
         paste::item! {
@@ -264,6 +283,12 @@ impl<'a> RowAccessor<'a> {
         }
     }
 
+    fn set_bool(&mut self, idx: usize, value: bool) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets()[idx];
+        self.data[offset] = u8::from(value);
+    }
+
     fn set_u8(&mut self, idx: usize, value: u8) {
         self.assert_index_valid(idx);
         let offset = self.field_offsets()[idx];
@@ -325,4 +350,33 @@ impl<'a> RowAccessor<'a> {
     fn_max_min_idx!(f32, min);
     fn_max_min_idx!(f64, min);
     fn_max_min_idx!(i128, min);
+
+    fn_bit_and_or_xor_idx!(bool, bitand);
+    fn_bit_and_or_xor_idx!(u8, bitand);
+    fn_bit_and_or_xor_idx!(u16, bitand);
+    fn_bit_and_or_xor_idx!(u32, bitand);
+    fn_bit_and_or_xor_idx!(u64, bitand);
+    fn_bit_and_or_xor_idx!(i8, bitand);
+    fn_bit_and_or_xor_idx!(i16, bitand);
+    fn_bit_and_or_xor_idx!(i32, bitand);
+    fn_bit_and_or_xor_idx!(i64, bitand);
+
+    fn_bit_and_or_xor_idx!(bool, bitor);
+    fn_bit_and_or_xor_idx!(u8, bitor);
+    fn_bit_and_or_xor_idx!(u16, bitor);
+    fn_bit_and_or_xor_idx!(u32, bitor);
+    fn_bit_and_or_xor_idx!(u64, bitor);
+    fn_bit_and_or_xor_idx!(i8, bitor);
+    fn_bit_and_or_xor_idx!(i16, bitor);
+    fn_bit_and_or_xor_idx!(i32, bitor);
+    fn_bit_and_or_xor_idx!(i64, bitor);
+
+    fn_bit_and_or_xor_idx!(u8, bitxor);
+    fn_bit_and_or_xor_idx!(u16, bitxor);
+    fn_bit_and_or_xor_idx!(u32, bitxor);
+    fn_bit_and_or_xor_idx!(u64, bitxor);
+    fn_bit_and_or_xor_idx!(i8, bitxor);
+    fn_bit_and_or_xor_idx!(i16, bitxor);
+    fn_bit_and_or_xor_idx!(i32, bitxor);
+    fn_bit_and_or_xor_idx!(i64, bitxor);
 }
diff --git a/docs/source/user-guide/expressions.md b/docs/source/user-guide/expressions.md
index e250dc4bb6..f2305cb908 100644
--- a/docs/source/user-guide/expressions.md
+++ b/docs/source/user-guide/expressions.md
@@ -219,6 +219,11 @@ Unlike to some databases the math functions in Datafusion works the same way as
 | approx_median(expr)                                               | Calculates an approximation of the median for `expr`.                                   |
 | approx_percentile_cont(expr, percentile)                          | Calculates an approximation of the specified `percentile` for `expr`.                   |
 | approx_percentile_cont_with_weight(expr, weight_expr, percentile) | Calculates an approximation of the specified `percentile` for `expr` and `weight_expr`. |
+| bit_and(expr)                                                     | Computes the bitwise AND of all non-null input values for `expr`.                       |
+| bit_or(expr)                                                      | Computes the bitwise OR of all non-null input values for `expr`.                        |
+| bit_xor(expr)                                                     | Computes the bitwise exclusive OR of all non-null input values for `expr`.              |
+| bool_and(expr)                                                    | Returns true if all non-null input values (`expr`) are true, otherwise false.           |
+| bool_or(expr)                                                     | Returns true if any non-null input value (`expr`) is true, otherwise false.             |
 | count(expr)                                                       | Returns the number of rows for `expr`.                                                  |
 | count_distinct                                                    | Creates an expression to represent the count(distinct) aggregate function               |
 | cube(exprs)                                                       | Creates a grouping set for all combination of `exprs`                                   |
diff --git a/docs/source/user-guide/sql/aggregate_functions.md b/docs/source/user-guide/sql/aggregate_functions.md
index 497cf78aad..68c02ef550 100644
--- a/docs/source/user-guide/sql/aggregate_functions.md
+++ b/docs/source/user-guide/sql/aggregate_functions.md
@@ -24,6 +24,11 @@ Aggregate functions operate on a set of values to compute a single result.
 ## General
 
 - [avg](#avg)
+- [bit_and](#bit_and)
+- [bit_or](#bit_or)
+- [bit_xor](#bit_xor)
+- [bool_and](#bool_and)
+- [bool_or](#bool_or)
 - [count](#count)
 - [max](#max)
 - [mean](#mean)
@@ -49,6 +54,71 @@ avg(expression)
 
 - `mean`
 
+### `bit_and`
+
+Computes the bitwise AND of all non-null input values.
+
+```
+bit_and(expression)
+```
+
+#### Arguments
+
+- **expression**: Expression to operate on.
+  Can be a constant, column, or function, and any combination of arithmetic operators.
+
+### `bit_or`
+
+Computes the bitwise OR of all non-null input values.
+
+```
+bit_or(expression)
+```
+
+#### Arguments
+
+- **expression**: Expression to operate on.
+  Can be a constant, column, or function, and any combination of arithmetic operators.
+
+### `bit_xor`
+
+Computes the bitwise exclusive OR of all non-null input values.
+
+```
+bit_xor(expression)
+```
+
+#### Arguments
+
+- **expression**: Expression to operate on.
+  Can be a constant, column, or function, and any combination of arithmetic operators.
+
+### `bool_and`
+
+Returns true if all non-null input values are true, otherwise false.
+
+```
+bool_and(expression)
+```
+
+#### Arguments
+
+- **expression**: Expression to operate on.
+  Can be a constant, column, or function, and any combination of arithmetic operators.
+
+### `bool_or`
+
+Returns true if any non-null input value is true, otherwise false.
+
+```
+bool_or(expression)
+```
+
+#### Arguments
+
+- **expression**: Expression to operate on.
+  Can be a constant, column, or function, and any combination of arithmetic operators.
+
 ### `count`
 
 Returns the number of rows in the specified column.