You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/20 11:49:27 UTC

[arrow-datafusion] branch master updated: improve error messages while downcasting uint and boolean array (#4261)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 712b9fdab  improve error messages while downcasting uint and boolean array (#4261)
712b9fdab is described below

commit 712b9fdabde0992c07f8203df83966295c3b8b9a
Author: Burak <bu...@gmail.com>
AuthorDate: Sun Nov 20 14:49:21 2022 +0300

     improve error messages while downcasting uint and boolean array (#4261)
---
 datafusion/common/src/cast.rs                      | 37 ++++++++++++++++++++--
 datafusion/common/src/scalar.rs                    | 24 +++++++-------
 datafusion/core/src/datasource/file_format/avro.rs | 12 +++----
 .../core/src/datasource/file_format/parquet.rs     | 12 +++----
 datafusion/core/src/datasource/listing/helpers.rs  | 13 ++++----
 .../file_format/parquet/row_filter.rs              | 18 ++++-------
 .../core/src/physical_plan/joins/hash_join.rs      |  8 ++---
 datafusion/physical-expr/src/expressions/binary.rs | 28 ++++++----------
 datafusion/physical-expr/src/expressions/case.rs   |  7 ++--
 .../physical-expr/src/expressions/in_list.rs       | 13 ++++----
 .../physical-expr/src/expressions/is_not_null.rs   |  7 ++--
 .../physical-expr/src/expressions/is_null.rs       |  7 ++--
 datafusion/physical-expr/src/expressions/not.rs    | 22 +++----------
 datafusion/physical-expr/src/expressions/nullif.rs | 11 +++----
 datafusion/physical-expr/src/functions.rs          | 15 ++++-----
 datafusion/physical-expr/src/hash_utils.rs         |  7 ++--
 datafusion/physical-expr/src/physical_expr.rs      |  7 ++--
 datafusion/physical-expr/src/window/rank.rs        |  4 +--
 datafusion/physical-expr/src/window/row_number.rs  |  6 ++--
 19 files changed, 128 insertions(+), 130 deletions(-)

diff --git a/datafusion/common/src/cast.rs b/datafusion/common/src/cast.rs
index 793274060..940168947 100644
--- a/datafusion/common/src/cast.rs
+++ b/datafusion/common/src/cast.rs
@@ -22,8 +22,8 @@
 
 use crate::DataFusionError;
 use arrow::array::{
-    Array, Date32Array, Decimal128Array, Float32Array, Float64Array, Int32Array,
-    Int64Array, StringArray, StructArray,
+    Array, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array,
+    Int32Array, Int64Array, StringArray, StructArray, UInt32Array, UInt64Array,
 };
 
 // Downcast ArrayRef to Date32Array
@@ -116,3 +116,36 @@ pub fn as_string_array(array: &dyn Array) -> Result<&StringArray, DataFusionErro
         ))
     })
 }
+
+// Downcast ArrayRef to UInt32Array
+pub fn as_uint32_array(array: &dyn Array) -> Result<&UInt32Array, DataFusionError> {
+    array.as_any().downcast_ref::<UInt32Array>().ok_or_else(|| {
+        DataFusionError::Internal(format!(
+            "Expected a UInt32Array, got: {}",
+            array.data_type()
+        ))
+    })
+}
+
+// Downcast ArrayRef to UInt64Array
+pub fn as_uint64_array(array: &dyn Array) -> Result<&UInt64Array, DataFusionError> {
+    array.as_any().downcast_ref::<UInt64Array>().ok_or_else(|| {
+        DataFusionError::Internal(format!(
+            "Expected a UInt64Array, got: {}",
+            array.data_type()
+        ))
+    })
+}
+
+// Downcast ArrayRef to BooleanArray
+pub fn as_boolean_array(array: &dyn Array) -> Result<&BooleanArray, DataFusionError> {
+    array
+        .as_any()
+        .downcast_ref::<BooleanArray>()
+        .ok_or_else(|| {
+            DataFusionError::Internal(format!(
+                "Expected a BooleanArray, got: {}",
+                array.data_type()
+            ))
+        })
+}
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 71cce82b2..36ea78580 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -2657,7 +2657,7 @@ mod tests {
     use arrow::compute::kernels;
     use arrow::datatypes::ArrowPrimitiveType;
 
-    use crate::cast::as_string_array;
+    use crate::cast::{as_string_array, as_uint32_array, as_uint64_array};
     use crate::from_slice::FromSlice;
 
     use super::*;
@@ -2792,35 +2792,37 @@ mod tests {
     }
 
     #[test]
-    fn scalar_value_to_array_u64() {
+    fn scalar_value_to_array_u64() -> Result<()> {
         let value = ScalarValue::UInt64(Some(13u64));
         let array = value.to_array();
-        let array = array.as_any().downcast_ref::<UInt64Array>().unwrap();
+        let array = as_uint64_array(&array)?;
         assert_eq!(array.len(), 1);
         assert!(!array.is_null(0));
         assert_eq!(array.value(0), 13);
 
         let value = ScalarValue::UInt64(None);
         let array = value.to_array();
-        let array = array.as_any().downcast_ref::<UInt64Array>().unwrap();
+        let array = as_uint64_array(&array)?;
         assert_eq!(array.len(), 1);
         assert!(array.is_null(0));
+        Ok(())
     }
 
     #[test]
-    fn scalar_value_to_array_u32() {
+    fn scalar_value_to_array_u32() -> Result<()> {
         let value = ScalarValue::UInt32(Some(13u32));
         let array = value.to_array();
-        let array = array.as_any().downcast_ref::<UInt32Array>().unwrap();
+        let array = as_uint32_array(&array)?;
         assert_eq!(array.len(), 1);
         assert!(!array.is_null(0));
         assert_eq!(array.value(0), 13);
 
         let value = ScalarValue::UInt32(None);
         let array = value.to_array();
-        let array = array.as_any().downcast_ref::<UInt32Array>().unwrap();
+        let array = as_uint32_array(&array)?;
         assert_eq!(array.len(), 1);
         assert!(array.is_null(0));
+        Ok(())
     }
 
     #[test]
@@ -2838,7 +2840,7 @@ mod tests {
     }
 
     #[test]
-    fn scalar_list_to_array() {
+    fn scalar_list_to_array() -> Result<()> {
         let list_array_ref = ScalarValue::List(
             Some(vec![
                 ScalarValue::UInt64(Some(100)),
@@ -2854,14 +2856,12 @@ mod tests {
         assert_eq!(list_array.values().len(), 3);
 
         let prim_array_ref = list_array.value(0);
-        let prim_array = prim_array_ref
-            .as_any()
-            .downcast_ref::<UInt64Array>()
-            .unwrap();
+        let prim_array = as_uint64_array(&prim_array_ref)?;
         assert_eq!(prim_array.len(), 3);
         assert_eq!(prim_array.value(0), 100);
         assert!(prim_array.is_null(1));
         assert_eq!(prim_array.value(2), 101);
+        Ok(())
     }
 
     /// Creates array directly and via ScalarValue and ensures they are the same
diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs
index cb1b29fc3..c4dbf873b 100644
--- a/datafusion/core/src/datasource/file_format/avro.rs
+++ b/datafusion/core/src/datasource/file_format/avro.rs
@@ -92,8 +92,10 @@ mod tests {
     use crate::datasource::file_format::test_util::scan_format;
     use crate::physical_plan::collect;
     use crate::prelude::{SessionConfig, SessionContext};
-    use arrow::array::{BinaryArray, BooleanArray, TimestampMicrosecondArray};
-    use datafusion_common::cast::{as_float32_array, as_float64_array, as_int32_array};
+    use arrow::array::{BinaryArray, TimestampMicrosecondArray};
+    use datafusion_common::cast::{
+        as_boolean_array, as_float32_array, as_float64_array, as_int32_array,
+    };
     use futures::StreamExt;
 
     #[tokio::test]
@@ -197,11 +199,7 @@ mod tests {
         assert_eq!(1, batches[0].num_columns());
         assert_eq!(8, batches[0].num_rows());
 
-        let array = batches[0]
-            .column(0)
-            .as_any()
-            .downcast_ref::<BooleanArray>()
-            .unwrap();
+        let array = as_boolean_array(batches[0].column(0))?;
         let mut values: Vec<bool> = vec![];
         for i in 0..batches[0].num_rows() {
             values.push(array.value(i));
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index b579e8384..715a29190 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -586,12 +586,14 @@ mod tests {
     use crate::physical_plan::metrics::MetricValue;
     use crate::prelude::{SessionConfig, SessionContext};
     use arrow::array::{
-        Array, ArrayRef, BinaryArray, BooleanArray, StringArray, TimestampNanosecondArray,
+        Array, ArrayRef, BinaryArray, StringArray, TimestampNanosecondArray,
     };
     use arrow::record_batch::RecordBatch;
     use async_trait::async_trait;
     use bytes::Bytes;
-    use datafusion_common::cast::{as_float32_array, as_float64_array, as_int32_array};
+    use datafusion_common::cast::{
+        as_boolean_array, as_float32_array, as_float64_array, as_int32_array,
+    };
     use datafusion_common::ScalarValue;
     use futures::stream::BoxStream;
     use futures::StreamExt;
@@ -945,11 +947,7 @@ mod tests {
         assert_eq!(1, batches[0].num_columns());
         assert_eq!(8, batches[0].num_rows());
 
-        let array = batches[0]
-            .column(0)
-            .as_any()
-            .downcast_ref::<BooleanArray>()
-            .unwrap();
+        let array = as_boolean_array(batches[0].column(0))?;
         let mut values: Vec<bool> = vec![];
         for i in 0..batches[0].num_rows() {
             values.push(array.value(i));
diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index 2fdae289f..f1a34e665 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
 use arrow::{
     array::{
         Array, ArrayBuilder, ArrayRef, Date64Array, Date64Builder, StringBuilder,
-        UInt64Array, UInt64Builder,
+        UInt64Builder,
     },
     datatypes::{DataType, Field, Schema},
     record_batch::RecordBatch,
@@ -38,7 +38,10 @@ use crate::{
 
 use super::PartitionedFile;
 use crate::datasource::listing::ListingTableUrl;
-use datafusion_common::{cast::as_string_array, Column, DataFusionError};
+use datafusion_common::{
+    cast::{as_string_array, as_uint64_array},
+    Column, DataFusionError,
+};
 use datafusion_expr::{
     expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
     Expr, Volatility,
@@ -300,11 +303,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Result<Vec<PartitionedFile>> {
         .iter()
         .flat_map(|batch| {
             let key_array = as_string_array(batch.column(0)).unwrap();
-            let length_array = batch
-                .column(1)
-                .as_any()
-                .downcast_ref::<UInt64Array>()
-                .unwrap();
+            let length_array = as_uint64_array(batch.column(1)).unwrap();
             let modified_array = batch
                 .column(2)
                 .as_any()
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
index eb8ca6ce5..5b193c4f8 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
@@ -19,6 +19,7 @@ use arrow::array::{Array, BooleanArray};
 use arrow::datatypes::{DataType, Schema};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
+use datafusion_common::cast::as_boolean_array;
 use datafusion_common::{Column, DataFusionError, Result, ScalarValue, ToDFSchema};
 use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion};
 use std::collections::BTreeSet;
@@ -134,17 +135,12 @@ impl ArrowPredicate for DatafusionArrowPredicate {
             .map(|v| v.into_array(batch.num_rows()))
         {
             Ok(array) => {
-                if let Some(mask) = array.as_any().downcast_ref::<BooleanArray>() {
-                    let bool_arr = BooleanArray::from(mask.data().clone());
-                    let num_filtered = bool_arr.len() - bool_arr.true_count();
-                    self.rows_filtered.add(num_filtered);
-                    timer.stop();
-                    Ok(bool_arr)
-                } else {
-                    Err(ArrowError::ComputeError(
-                        "Unexpected result of predicate evaluation, expected BooleanArray".to_owned(),
-                    ))
-                }
+                let mask = as_boolean_array(&array)?;
+                let bool_arr = BooleanArray::from(mask.data().clone());
+                let num_filtered = bool_arr.len() - bool_arr.true_count();
+                self.rows_filtered.add(num_filtered);
+                timer.stop();
+                Ok(bool_arr)
             }
             Err(e) => Err(ArrowError::ComputeError(format!(
                 "Error evaluating filter predicate: {:?}",
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index f0974c6a9..86448ff58 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -40,7 +40,7 @@ use std::{time::Instant, vec};
 
 use futures::{ready, Stream, StreamExt, TryStreamExt};
 
-use arrow::array::{as_boolean_array, new_null_array, Array};
+use arrow::array::{new_null_array, Array};
 use arrow::datatypes::{ArrowNativeType, DataType};
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
@@ -52,7 +52,7 @@ use arrow::array::{
     UInt8Array,
 };
 
-use datafusion_common::cast::as_string_array;
+use datafusion_common::cast::{as_boolean_array, as_string_array};
 
 use hashbrown::raw::RawTable;
 
@@ -1027,7 +1027,7 @@ fn apply_join_filter(
                 .expression()
                 .evaluate(&intermediate_batch)?
                 .into_array(intermediate_batch.num_rows());
-            let mask = as_boolean_array(&filter_result);
+            let mask = as_boolean_array(&filter_result)?;
 
             let left_filtered = PrimitiveArray::<UInt64Type>::from(
                 compute::filter(&left_indices, mask)?.data().clone(),
@@ -1050,7 +1050,7 @@ fn apply_join_filter(
                 .expression()
                 .evaluate_selection(&intermediate_batch, &has_match)?
                 .into_array(intermediate_batch.num_rows());
-            let mask = as_boolean_array(&filter_result);
+            let mask = as_boolean_array(&filter_result)?;
 
             let mut left_rebuilt = UInt64Builder::with_capacity(0);
             let mut right_rebuilt = UInt32Builder::with_capacity(0);
diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs
index df33cd0c9..e067e34b5 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -75,7 +75,7 @@ use arrow::record_batch::RecordBatch;
 
 use crate::physical_expr::down_cast_any_ref;
 use crate::{AnalysisContext, ExprBoundaries, PhysicalExpr};
-use datafusion_common::cast::as_decimal128_array;
+use datafusion_common::cast::{as_boolean_array, as_decimal128_array};
 use datafusion_common::ScalarValue;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::type_coercion::binary::binary_operator_data_type;
@@ -472,14 +472,8 @@ macro_rules! binary_array_op {
 /// Invoke a boolean kernel on a pair of arrays
 macro_rules! boolean_op {
     ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
-        let ll = $LEFT
-            .as_any()
-            .downcast_ref::<BooleanArray>()
-            .expect("boolean_op failed to downcast array");
-        let rr = $RIGHT
-            .as_any()
-            .downcast_ref::<BooleanArray>()
-            .expect("boolean_op failed to downcast array");
+        let ll = as_boolean_array($LEFT).expect("boolean_op failed to downcast array");
+        let rr = as_boolean_array($RIGHT).expect("boolean_op failed to downcast array");
         Ok(Arc::new($OP(&ll, &rr)?))
     }};
 }
@@ -1003,7 +997,7 @@ impl BinaryExpr {
             Operator::Modulo => binary_primitive_array_op!(left, right, modulus),
             Operator::And => {
                 if left_data_type == &DataType::Boolean {
-                    boolean_op!(left, right, and_kleene)
+                    boolean_op!(&left, &right, and_kleene)
                 } else {
                     Err(DataFusionError::Internal(format!(
                         "Cannot evaluate binary expression {:?} with types {:?} and {:?}",
@@ -1015,7 +1009,7 @@ impl BinaryExpr {
             }
             Operator::Or => {
                 if left_data_type == &DataType::Boolean {
-                    boolean_op!(left, right, or_kleene)
+                    boolean_op!(&left, &right, or_kleene)
                 } else {
                     Err(DataFusionError::Internal(format!(
                         "Cannot evaluate binary expression {:?} with types {:?} and {:?}",
@@ -1110,10 +1104,8 @@ mod tests {
         assert_eq!(result.len(), 5);
 
         let expected = vec![false, false, true, true, true];
-        let result = result
-            .as_any()
-            .downcast_ref::<BooleanArray>()
-            .expect("failed to downcast to BooleanArray");
+        let result =
+            as_boolean_array(&result).expect("failed to downcast to BooleanArray");
         for (i, &expected_item) in expected.iter().enumerate().take(5) {
             assert_eq!(result.value(i), expected_item);
         }
@@ -1156,10 +1148,8 @@ mod tests {
         assert_eq!(result.len(), 5);
 
         let expected = vec![true, true, false, true, false];
-        let result = result
-            .as_any()
-            .downcast_ref::<BooleanArray>()
-            .expect("failed to downcast to BooleanArray");
+        let result =
+            as_boolean_array(&result).expect("failed to downcast to BooleanArray");
         for (i, &expected_item) in expected.iter().enumerate().take(5) {
             assert_eq!(result.value(i), expected_item);
         }
diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs
index af3dfba7e..ddf1d6bae 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -26,7 +26,7 @@ use arrow::compute::kernels::zip::zip;
 use arrow::compute::{and, eq_dyn, is_null, not, or, or_kleene};
 use arrow::datatypes::{DataType, Schema};
 use arrow::record_batch::RecordBatch;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{cast::as_boolean_array, DataFusionError, Result};
 use datafusion_expr::ColumnarValue;
 
 use itertools::Itertools;
@@ -195,10 +195,7 @@ impl CaseExpr {
                 _ => when_value,
             };
             let when_value = when_value.into_array(batch.num_rows());
-            let when_value = when_value
-                .as_ref()
-                .as_any()
-                .downcast_ref::<BooleanArray>()
+            let when_value = as_boolean_array(&when_value)
                 .expect("WHEN expression did not return a BooleanArray");
 
             let then_value = self.when_then_expr[i]
diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs
index 8aff51d84..63fe2292a 100644
--- a/datafusion/physical-expr/src/expressions/in_list.rs
+++ b/datafusion/physical-expr/src/expressions/in_list.rs
@@ -32,7 +32,10 @@ use arrow::datatypes::*;
 use arrow::record_batch::RecordBatch;
 use arrow::util::bit_iterator::BitIndexIterator;
 use arrow::{downcast_dictionary_array, downcast_primitive_array};
-use datafusion_common::{cast::as_string_array, DataFusionError, Result, ScalarValue};
+use datafusion_common::{
+    cast::{as_boolean_array, as_string_array},
+    DataFusionError, Result, ScalarValue,
+};
 use datafusion_expr::ColumnarValue;
 use hashbrown::hash_map::RawEntryMut;
 use hashbrown::HashMap;
@@ -171,7 +174,7 @@ fn make_set(array: &dyn Array) -> Result<Box<dyn Set>> {
     Ok(downcast_primitive_array! {
         array => Box::new(ArraySet::new(array, make_hash_set(array))),
         DataType::Boolean => {
-            let array = as_boolean_array(array);
+            let array = as_boolean_array(array)?;
             Box::new(ArraySet::new(array, make_hash_set(array)))
         },
         DataType::Decimal128(_, _) => {
@@ -424,10 +427,8 @@ mod tests {
             let (cast_expr, cast_list_exprs) = in_list_cast($COL, $LIST, $SCHEMA)?;
             let expr = in_list(cast_expr, cast_list_exprs, $NEGATED, $SCHEMA).unwrap();
             let result = expr.evaluate(&$BATCH)?.into_array($BATCH.num_rows());
-            let result = result
-                .as_any()
-                .downcast_ref::<BooleanArray>()
-                .expect("failed to downcast to BooleanArray");
+            let result =
+                as_boolean_array(&result).expect("failed to downcast to BooleanArray");
             let expected = &BooleanArray::from($EXPECTED);
             assert_eq!(expected, result);
         }};
diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs
index b5003ac23..32e53e0c1 100644
--- a/datafusion/physical-expr/src/expressions/is_not_null.rs
+++ b/datafusion/physical-expr/src/expressions/is_not_null.rs
@@ -115,6 +115,7 @@ mod tests {
         datatypes::*,
         record_batch::RecordBatch,
     };
+    use datafusion_common::cast::as_boolean_array;
     use std::sync::Arc;
 
     #[test]
@@ -126,10 +127,8 @@ mod tests {
 
         // expression: "a is not null"
         let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
-        let result = result
-            .as_any()
-            .downcast_ref::<BooleanArray>()
-            .expect("failed to downcast to BooleanArray");
+        let result =
+            as_boolean_array(&result).expect("failed to downcast to BooleanArray");
 
         let expected = &BooleanArray::from(vec![true, false]);
 
diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs
index 976df2bc0..85e111440 100644
--- a/datafusion/physical-expr/src/expressions/is_null.rs
+++ b/datafusion/physical-expr/src/expressions/is_null.rs
@@ -116,6 +116,7 @@ mod tests {
         datatypes::*,
         record_batch::RecordBatch,
     };
+    use datafusion_common::cast::as_boolean_array;
     use std::sync::Arc;
 
     #[test]
@@ -128,10 +129,8 @@ mod tests {
         let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)])?;
 
         let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
-        let result = result
-            .as_any()
-            .downcast_ref::<BooleanArray>()
-            .expect("failed to downcast to BooleanArray");
+        let result =
+            as_boolean_array(&result).expect("failed to downcast to BooleanArray");
 
         let expected = &BooleanArray::from(vec![false, true]);
 
diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs
index 00f1670af..bf935aa97 100644
--- a/datafusion/physical-expr/src/expressions/not.rs
+++ b/datafusion/physical-expr/src/expressions/not.rs
@@ -23,11 +23,9 @@ use std::sync::Arc;
 
 use crate::physical_expr::down_cast_any_ref;
 use crate::PhysicalExpr;
-use arrow::array::BooleanArray;
 use arrow::datatypes::{DataType, Schema};
 use arrow::record_batch::RecordBatch;
-use datafusion_common::ScalarValue;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{cast::as_boolean_array, DataFusionError, Result, ScalarValue};
 use datafusion_expr::ColumnarValue;
 
 /// Not expression
@@ -73,15 +71,7 @@ impl PhysicalExpr for NotExpr {
         let evaluate_arg = self.arg.evaluate(batch)?;
         match evaluate_arg {
             ColumnarValue::Array(array) => {
-                let array =
-                    array
-                        .as_any()
-                        .downcast_ref::<BooleanArray>()
-                        .ok_or_else(|| {
-                            DataFusionError::Internal(
-                                "boolean_op failed to downcast array".to_owned(),
-                            )
-                        })?;
+                let array = as_boolean_array(&array)?;
                 Ok(ColumnarValue::Array(Arc::new(
                     arrow::compute::kernels::boolean::not(array)?,
                 )))
@@ -135,7 +125,7 @@ pub fn not(arg: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
 mod tests {
     use super::*;
     use crate::expressions::col;
-    use arrow::datatypes::*;
+    use arrow::{array::BooleanArray, datatypes::*};
     use datafusion_common::Result;
 
     #[test]
@@ -153,10 +143,8 @@ mod tests {
             RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(input)])?;
 
         let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
-        let result = result
-            .as_any()
-            .downcast_ref::<BooleanArray>()
-            .expect("failed to downcast to BooleanArray");
+        let result =
+            as_boolean_array(&result).expect("failed to downcast to BooleanArray");
         assert_eq!(result, expected);
 
         Ok(())
diff --git a/datafusion/physical-expr/src/expressions/nullif.rs b/datafusion/physical-expr/src/expressions/nullif.rs
index 2ef40272a..312246d35 100644
--- a/datafusion/physical-expr/src/expressions/nullif.rs
+++ b/datafusion/physical-expr/src/expressions/nullif.rs
@@ -22,7 +22,7 @@ use arrow::array::*;
 use arrow::compute::eq_dyn;
 use arrow::compute::kernels::boolean::nullif;
 use arrow::datatypes::DataType;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{cast::as_boolean_array, DataFusionError, Result};
 use datafusion_expr::ColumnarValue;
 
 use super::binary::array_eq_scalar;
@@ -34,10 +34,7 @@ macro_rules! compute_bool_array_op {
             .as_any()
             .downcast_ref::<$DT>()
             .expect("compute_op failed to downcast array");
-        let rr = $RIGHT
-            .as_any()
-            .downcast_ref::<BooleanArray>()
-            .expect("compute_op failed to downcast array");
+        let rr = as_boolean_array($RIGHT).expect("compute_op failed to downcast array");
         Ok(Arc::new($OP(&ll, &rr)?) as ArrayRef)
     }};
 }
@@ -82,7 +79,7 @@ pub fn nullif_func(args: &[ColumnarValue]) -> Result<ColumnarValue> {
         (ColumnarValue::Array(lhs), ColumnarValue::Scalar(rhs)) => {
             let cond_array = array_eq_scalar(lhs, rhs)?;
 
-            let array = primitive_bool_array_op!(lhs, *cond_array, nullif)?;
+            let array = primitive_bool_array_op!(lhs, &cond_array, nullif)?;
 
             Ok(ColumnarValue::Array(array))
         }
@@ -91,7 +88,7 @@ pub fn nullif_func(args: &[ColumnarValue]) -> Result<ColumnarValue> {
             let cond_array = eq_dyn(lhs, rhs)?;
 
             // Now, invoke nullif on the result
-            let array = primitive_bool_array_op!(lhs, cond_array, nullif)?;
+            let array = primitive_bool_array_op!(lhs, &cond_array, nullif)?;
             Ok(ColumnarValue::Array(array))
         }
         _ => Err(DataFusionError::NotImplemented(
diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs
index 7678e7dc5..c84ee24c5 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -784,6 +784,7 @@ mod tests {
         datatypes::Field,
         record_batch::RecordBatch,
     };
+    use datafusion_common::cast::as_uint64_array;
     use datafusion_common::{Result, ScalarValue};
 
     /// $FUNC function to test
@@ -2943,16 +2944,12 @@ mod tests {
     }
 
     fn unpack_uint64_array(col: Result<ColumnarValue>) -> Result<Vec<u64>> {
-        match col? {
-            ColumnarValue::Array(array) => Ok(array
-                .as_any()
-                .downcast_ref::<UInt64Array>()
-                .unwrap()
-                .values()
-                .to_vec()),
-            ColumnarValue::Scalar(_) => Err(DataFusionError::Internal(
+        if let ColumnarValue::Array(array) = col? {
+            Ok(as_uint64_array(&array)?.values().to_vec())
+        } else {
+            Err(DataFusionError::Internal(
                 "Unexpected scalar created by a test function".to_string(),
-            )),
+            ))
         }
     }
 
diff --git a/datafusion/physical-expr/src/hash_utils.rs b/datafusion/physical-expr/src/hash_utils.rs
index 4e697f84f..d6cde1e7e 100644
--- a/datafusion/physical-expr/src/hash_utils.rs
+++ b/datafusion/physical-expr/src/hash_utils.rs
@@ -22,7 +22,10 @@ use arrow::array::*;
 use arrow::datatypes::*;
 use arrow::{downcast_dictionary_array, downcast_primitive_array};
 use arrow_buffer::i256;
-use datafusion_common::{cast::as_string_array, DataFusionError, Result};
+use datafusion_common::{
+    cast::{as_boolean_array, as_string_array},
+    DataFusionError, Result,
+};
 use std::sync::Arc;
 
 // Combines two hashes into one hash
@@ -211,7 +214,7 @@ pub fn create_hashes<'a>(
         downcast_primitive_array! {
             array => hash_array(array, random_state, hashes_buffer, multi_col),
             DataType::Null => hash_null(random_state, hashes_buffer, multi_col),
-            DataType::Boolean => hash_array(as_boolean_array(array), random_state, hashes_buffer, multi_col),
+            DataType::Boolean => hash_array(as_boolean_array(array)?, random_state, hashes_buffer, multi_col),
             DataType::Utf8 => hash_array(as_string_array(array)?, random_state, hashes_buffer, multi_col),
             DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, multi_col),
             DataType::Binary => hash_array(as_generic_binary_array::<i32>(array), random_state, hashes_buffer, multi_col),
diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs
index 653f2f854..02b6eeb4e 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -273,7 +273,10 @@ mod tests {
 
     use super::*;
     use arrow::array::Int32Array;
-    use datafusion_common::{cast::as_int32_array, Result};
+    use datafusion_common::{
+        cast::{as_boolean_array, as_int32_array},
+        Result,
+    };
 
     #[test]
     fn scatter_int() -> Result<()> {
@@ -335,7 +338,7 @@ mod tests {
             Some(false),
         ]);
         let result = scatter(&mask, truthy.as_ref())?;
-        let result = result.as_any().downcast_ref::<BooleanArray>().unwrap();
+        let result = as_boolean_array(&result)?;
 
         assert_eq!(&expected, result);
         Ok(())
diff --git a/datafusion/physical-expr/src/window/rank.rs b/datafusion/physical-expr/src/window/rank.rs
index da6734543..ec9aca532 100644
--- a/datafusion/physical-expr/src/window/rank.rs
+++ b/datafusion/physical-expr/src/window/rank.rs
@@ -167,7 +167,7 @@ impl PartitionEvaluator for RankEvaluator {
 mod tests {
     use super::*;
     use arrow::{array::*, datatypes::*};
-    use datafusion_common::cast::as_float64_array;
+    use datafusion_common::cast::{as_float64_array, as_uint64_array};
 
     fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
         test_i32_result(
@@ -217,7 +217,7 @@ mod tests {
             .create_evaluator(&batch)?
             .evaluate_with_rank(vec![0..8], ranks)?;
         assert_eq!(1, result.len());
-        let result = result[0].as_any().downcast_ref::<UInt64Array>().unwrap();
+        let result = as_uint64_array(&result[0])?;
         let result = result.values();
         assert_eq!(expected, result);
         Ok(())
diff --git a/datafusion/physical-expr/src/window/row_number.rs b/datafusion/physical-expr/src/window/row_number.rs
index fb3bc7704..11f4f620d 100644
--- a/datafusion/physical-expr/src/window/row_number.rs
+++ b/datafusion/physical-expr/src/window/row_number.rs
@@ -86,7 +86,7 @@ mod tests {
     use super::*;
     use arrow::record_batch::RecordBatch;
     use arrow::{array::*, datatypes::*};
-    use datafusion_common::Result;
+    use datafusion_common::{cast::as_uint64_array, Result};
 
     #[test]
     fn row_number_all_null() -> Result<()> {
@@ -98,7 +98,7 @@ mod tests {
         let row_number = RowNumber::new("row_number".to_owned());
         let result = row_number.create_evaluator(&batch)?.evaluate(vec![0..8])?;
         assert_eq!(1, result.len());
-        let result = result[0].as_any().downcast_ref::<UInt64Array>().unwrap();
+        let result = as_uint64_array(&result[0])?;
         let result = result.values();
         assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result);
         Ok(())
@@ -114,7 +114,7 @@ mod tests {
         let row_number = RowNumber::new("row_number".to_owned());
         let result = row_number.create_evaluator(&batch)?.evaluate(vec![0..8])?;
         assert_eq!(1, result.len());
-        let result = result[0].as_any().downcast_ref::<UInt64Array>().unwrap();
+        let result = as_uint64_array(&result[0])?;
         let result = result.values();
         assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result);
         Ok(())