You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/11/11 14:40:26 UTC

(arrow-datafusion) branch main updated: chore: remove panics in datafusion-common::scalar (#7901)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e642cc2a94 chore: remove panics in datafusion-common::scalar (#7901)
e642cc2a94 is described below

commit e642cc2a94f38518d765d25c8113523aedc29198
Author: Junjun Dong <ju...@gmail.com>
AuthorDate: Sat Nov 11 06:40:20 2023 -0800

    chore: remove panics in datafusion-common::scalar (#7901)
---
 datafusion/common/src/pyarrow.rs                   |   2 +-
 datafusion/common/src/scalar.rs                    | 598 ++++++++++++---------
 datafusion/core/benches/scalar.rs                  |  10 +-
 datafusion/core/src/datasource/listing/helpers.rs  |   5 +-
 .../datasource/physical_plan/file_scan_config.rs   |  12 +-
 .../datasource/physical_plan/parquet/row_filter.rs |   2 +-
 .../datasource/physical_plan/parquet/row_groups.rs |   4 +-
 datafusion/expr/src/columnar_value.rs              |  14 +-
 datafusion/expr/src/window_state.rs                |   2 +-
 .../optimizer/src/unwrap_cast_in_comparison.rs     |   8 +-
 .../physical-expr/src/aggregate/correlation.rs     |  12 +-
 .../physical-expr/src/aggregate/covariance.rs      |  12 +-
 .../physical-expr/src/aggregate/first_last.rs      |  10 +-
 datafusion/physical-expr/src/aggregate/stddev.rs   |  12 +-
 datafusion/physical-expr/src/aggregate/utils.rs    |   4 +-
 datafusion/physical-expr/src/aggregate/variance.rs |  12 +-
 .../physical-expr/src/conditional_expressions.rs   |   2 +-
 .../physical-expr/src/datetime_expressions.rs      |   2 +-
 datafusion/physical-expr/src/expressions/binary.rs |  49 +-
 datafusion/physical-expr/src/expressions/case.rs   |  64 ++-
 datafusion/physical-expr/src/expressions/cast.rs   |  12 +-
 datafusion/physical-expr/src/expressions/datum.rs  |  14 +-
 .../src/expressions/get_indexed_field.rs           |  33 +-
 .../physical-expr/src/expressions/in_list.rs       |  11 +-
 .../physical-expr/src/expressions/is_not_null.rs   |   5 +-
 .../physical-expr/src/expressions/is_null.rs       |   5 +-
 datafusion/physical-expr/src/expressions/like.rs   |   5 +-
 .../physical-expr/src/expressions/literal.rs       |   5 +-
 datafusion/physical-expr/src/expressions/mod.rs    |  12 +-
 .../physical-expr/src/expressions/negative.rs      |   2 +-
 datafusion/physical-expr/src/expressions/not.rs    |   5 +-
 datafusion/physical-expr/src/expressions/nullif.rs |  20 +-
 .../physical-expr/src/expressions/try_cast.rs      |  12 +-
 datafusion/physical-expr/src/functions.rs          |  63 ++-
 .../src/intervals/interval_aritmetic.rs            |   2 +-
 datafusion/physical-expr/src/math_expressions.rs   |   3 +-
 datafusion/physical-expr/src/planner.rs            |   2 +-
 datafusion/physical-expr/src/struct_expressions.rs |  15 +-
 .../src/window/built_in_window_function_expr.rs    |   6 +-
 datafusion/physical-expr/src/window/lead_lag.rs    |   1 +
 datafusion/physical-expr/src/window/window_expr.rs |   6 +-
 datafusion/physical-plan/src/aggregates/mod.rs     |  29 +-
 .../physical-plan/src/aggregates/no_grouping.rs    |   6 +-
 datafusion/physical-plan/src/filter.rs             |   2 +-
 datafusion/physical-plan/src/joins/cross_join.rs   |   2 +-
 datafusion/physical-plan/src/joins/hash_join.rs    |   8 +-
 .../physical-plan/src/joins/hash_join_utils.rs     |   2 +-
 .../physical-plan/src/joins/symmetric_hash_join.rs |   6 +-
 datafusion/physical-plan/src/joins/utils.rs        |   2 +-
 datafusion/physical-plan/src/projection.rs         |   6 +-
 datafusion/physical-plan/src/repartition/mod.rs    |   4 +-
 datafusion/physical-plan/src/sorts/stream.rs       |   4 +-
 datafusion/physical-plan/src/topk/mod.rs           |   2 +-
 datafusion/physical-plan/src/unnest.rs             |   2 +-
 54 files changed, 723 insertions(+), 427 deletions(-)

diff --git a/datafusion/common/src/pyarrow.rs b/datafusion/common/src/pyarrow.rs
index 59a8b811e3..aa01539193 100644
--- a/datafusion/common/src/pyarrow.rs
+++ b/datafusion/common/src/pyarrow.rs
@@ -54,7 +54,7 @@ impl FromPyArrow for ScalarValue {
 
 impl ToPyArrow for ScalarValue {
     fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
-        let array = self.to_array();
+        let array = self.to_array()?;
         // convert to pyarrow array using C data interface
         let pyarray = array.to_data().to_pyarrow(py)?;
         let pyscalar = pyarray.call_method1(py, "__getitem__", (0,))?;
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 0d701eaad2..cdcc9aa4fb 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -330,9 +330,9 @@ impl PartialOrd for ScalarValue {
                         let arr2 = list_arr2.value(i);
 
                         let lt_res =
-                            arrow::compute::kernels::cmp::lt(&arr1, &arr2).unwrap();
+                            arrow::compute::kernels::cmp::lt(&arr1, &arr2).ok()?;
                         let eq_res =
-                            arrow::compute::kernels::cmp::eq(&arr1, &arr2).unwrap();
+                            arrow::compute::kernels::cmp::eq(&arr1, &arr2).ok()?;
 
                         for j in 0..lt_res.len() {
                             if lt_res.is_valid(j) && lt_res.value(j) {
@@ -431,6 +431,10 @@ macro_rules! hash_float_value {
 hash_float_value!((f64, u64), (f32, u32));
 
 // manual implementation of `Hash`
+//
+// # Panics
+//
+// Panics if there is an error when creating hash values for rows
 impl std::hash::Hash for ScalarValue {
     fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
         use ScalarValue::*;
@@ -506,15 +510,19 @@ impl std::hash::Hash for ScalarValue {
     }
 }
 
-/// return a reference to the values array and the index into it for a
+/// Return a reference to the values array and the index into it for a
 /// dictionary array
+///
+/// # Errors
+///
+/// Errors if the array cannot be downcasted to DictionaryArray
 #[inline]
 pub fn get_dict_value<K: ArrowDictionaryKeyType>(
     array: &dyn Array,
     index: usize,
-) -> (&ArrayRef, Option<usize>) {
-    let dict_array = as_dictionary_array::<K>(array).unwrap();
-    (dict_array.values(), dict_array.key(index))
+) -> Result<(&ArrayRef, Option<usize>)> {
+    let dict_array = as_dictionary_array::<K>(array)?;
+    Ok((dict_array.values(), dict_array.key(index)))
 }
 
 /// Create a dictionary array representing `value` repeated `size`
@@ -522,9 +530,9 @@ pub fn get_dict_value<K: ArrowDictionaryKeyType>(
 fn dict_from_scalar<K: ArrowDictionaryKeyType>(
     value: &ScalarValue,
     size: usize,
-) -> ArrayRef {
+) -> Result<ArrayRef> {
     // values array is one element long (the value)
-    let values_array = value.to_array_of_size(1);
+    let values_array = value.to_array_of_size(1)?;
 
     // Create a key array with `size` elements, each of 0
     let key_array: PrimitiveArray<K> = std::iter::repeat(Some(K::default_value()))
@@ -536,11 +544,9 @@ fn dict_from_scalar<K: ArrowDictionaryKeyType>(
     // Note: this path could be made faster by using the ArrayData
     // APIs and skipping validation, if it every comes up in
     // performance traces.
-    Arc::new(
-        DictionaryArray::<K>::try_new(key_array, values_array)
-            // should always be valid by construction above
-            .expect("Can not construct dictionary array"),
-    )
+    Ok(Arc::new(
+        DictionaryArray::<K>::try_new(key_array, values_array)?, // should always be valid by construction above
+    ))
 }
 
 /// Create a dictionary array representing all the values in values
@@ -579,24 +585,44 @@ fn dict_from_values<K: ArrowDictionaryKeyType>(
 
 macro_rules! typed_cast_tz {
     ($array:expr, $index:expr, $ARRAYTYPE:ident, $SCALAR:ident, $TZ:expr) => {{
-        let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
-        ScalarValue::$SCALAR(
+        use std::any::type_name;
+        let array = $array
+            .as_any()
+            .downcast_ref::<$ARRAYTYPE>()
+            .ok_or_else(|| {
+                DataFusionError::Internal(format!(
+                    "could not cast value to {}",
+                    type_name::<$ARRAYTYPE>()
+                ))
+            })?;
+        Ok::<ScalarValue, DataFusionError>(ScalarValue::$SCALAR(
             match array.is_null($index) {
                 true => None,
                 false => Some(array.value($index).into()),
             },
             $TZ.clone(),
-        )
+        ))
     }};
 }
 
 macro_rules! typed_cast {
     ($array:expr, $index:expr, $ARRAYTYPE:ident, $SCALAR:ident) => {{
-        let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
-        ScalarValue::$SCALAR(match array.is_null($index) {
-            true => None,
-            false => Some(array.value($index).into()),
-        })
+        use std::any::type_name;
+        let array = $array
+            .as_any()
+            .downcast_ref::<$ARRAYTYPE>()
+            .ok_or_else(|| {
+                DataFusionError::Internal(format!(
+                    "could not cast value to {}",
+                    type_name::<$ARRAYTYPE>()
+                ))
+            })?;
+        Ok::<ScalarValue, DataFusionError>(ScalarValue::$SCALAR(
+            match array.is_null($index) {
+                true => None,
+                false => Some(array.value($index).into()),
+            },
+        ))
     }};
 }
 
@@ -628,12 +654,21 @@ macro_rules! build_timestamp_array_from_option {
 
 macro_rules! eq_array_primitive {
     ($array:expr, $index:expr, $ARRAYTYPE:ident, $VALUE:expr) => {{
-        let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
+        use std::any::type_name;
+        let array = $array
+            .as_any()
+            .downcast_ref::<$ARRAYTYPE>()
+            .ok_or_else(|| {
+                DataFusionError::Internal(format!(
+                    "could not cast value to {}",
+                    type_name::<$ARRAYTYPE>()
+                ))
+            })?;
         let is_valid = array.is_valid($index);
-        match $VALUE {
+        Ok::<bool, DataFusionError>(match $VALUE {
             Some(val) => is_valid && &array.value($index) == val,
             None => !is_valid,
-        }
+        })
     }};
 }
 
@@ -935,7 +970,7 @@ impl ScalarValue {
     /// NB: operating on `ScalarValue` directly is not efficient, performance sensitive code
     /// should operate on Arrays directly, using vectorized array kernels
     pub fn add<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
-        let r = add_wrapping(&self.to_scalar(), &other.borrow().to_scalar())?;
+        let r = add_wrapping(&self.to_scalar()?, &other.borrow().to_scalar()?)?;
         Self::try_from_array(r.as_ref(), 0)
     }
     /// Checked addition of `ScalarValue`
@@ -943,7 +978,7 @@ impl ScalarValue {
     /// NB: operating on `ScalarValue` directly is not efficient, performance sensitive code
     /// should operate on Arrays directly, using vectorized array kernels
     pub fn add_checked<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
-        let r = add(&self.to_scalar(), &other.borrow().to_scalar())?;
+        let r = add(&self.to_scalar()?, &other.borrow().to_scalar()?)?;
         Self::try_from_array(r.as_ref(), 0)
     }
 
@@ -952,7 +987,7 @@ impl ScalarValue {
     /// NB: operating on `ScalarValue` directly is not efficient, performance sensitive code
     /// should operate on Arrays directly, using vectorized array kernels
     pub fn sub<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
-        let r = sub_wrapping(&self.to_scalar(), &other.borrow().to_scalar())?;
+        let r = sub_wrapping(&self.to_scalar()?, &other.borrow().to_scalar()?)?;
         Self::try_from_array(r.as_ref(), 0)
     }
 
@@ -961,7 +996,7 @@ impl ScalarValue {
     /// NB: operating on `ScalarValue` directly is not efficient, performance sensitive code
     /// should operate on Arrays directly, using vectorized array kernels
     pub fn sub_checked<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
-        let r = sub(&self.to_scalar(), &other.borrow().to_scalar())?;
+        let r = sub(&self.to_scalar()?, &other.borrow().to_scalar()?)?;
         Self::try_from_array(r.as_ref(), 0)
     }
 
@@ -1050,7 +1085,11 @@ impl ScalarValue {
     }
 
     /// Converts a scalar value into an 1-row array.
-    pub fn to_array(&self) -> ArrayRef {
+    ///
+    /// # Errors
+    ///
+    /// Errors if the ScalarValue cannot be converted into a 1-row array
+    pub fn to_array(&self) -> Result<ArrayRef> {
         self.to_array_of_size(1)
     }
 
@@ -1059,6 +1098,10 @@ impl ScalarValue {
     ///
     /// This can be used to call arrow compute kernels such as `lt`
     ///
+    /// # Errors
+    ///
+    /// Errors if the ScalarValue cannot be converted into a 1-row array
+    ///
     /// # Example
     /// ```
     /// use datafusion_common::ScalarValue;
@@ -1069,7 +1112,7 @@ impl ScalarValue {
     ///
     /// let result = arrow::compute::kernels::cmp::lt(
     ///   &arr,
-    ///   &five.to_scalar(),
+    ///   &five.to_scalar().unwrap(),
     /// ).unwrap();
     ///
     /// let expected = BooleanArray::from(vec![
@@ -1082,8 +1125,8 @@ impl ScalarValue {
     /// assert_eq!(&result, &expected);
     /// ```
     /// [`Datum`]: arrow_array::Datum
-    pub fn to_scalar(&self) -> Scalar<ArrayRef> {
-        Scalar::new(self.to_array_of_size(1))
+    pub fn to_scalar(&self) -> Result<Scalar<ArrayRef>> {
+        Ok(Scalar::new(self.to_array_of_size(1)?))
     }
 
     /// Converts an iterator of references [`ScalarValue`] into an [`ArrayRef`]
@@ -1093,6 +1136,10 @@ impl ScalarValue {
     /// Returns an error if the iterator is empty or if the
     /// [`ScalarValue`]s are not all the same type
     ///
+    /// # Panics
+    ///
+    /// Panics if `self` is a dictionary with invalid key type
+    ///
     /// # Example
     /// ```
     /// use datafusion_common::ScalarValue;
@@ -1199,28 +1246,29 @@ impl ScalarValue {
 
         macro_rules! build_array_list_primitive {
             ($ARRAY_TY:ident, $SCALAR_TY:ident, $NATIVE_TYPE:ident) => {{
-                Arc::new(ListArray::from_iter_primitive::<$ARRAY_TY, _, _>(
+                Ok::<ArrayRef, DataFusionError>(Arc::new(ListArray::from_iter_primitive::<$ARRAY_TY, _, _>(
                     scalars.into_iter().map(|x| match x {
                         ScalarValue::List(arr) => {
                             // `ScalarValue::List` contains a single element `ListArray`.
                             let list_arr = as_list_array(&arr);
                             if list_arr.is_null(0) {
-                                None
+                                Ok(None)
                             } else {
                                 let primitive_arr =
                                     list_arr.values().as_primitive::<$ARRAY_TY>();
-                                Some(
+                                Ok(Some(
                                     primitive_arr.into_iter().collect::<Vec<Option<_>>>(),
-                                )
+                                ))
                             }
                         }
-                        sv => panic!(
+                        sv => _internal_err!(
                             "Inconsistent types in ScalarValue::iter_to_array. \
                                 Expected {:?}, got {:?}",
                             data_type, sv
                         ),
-                    }),
-                ))
+                    })
+                    .collect::<Result<Vec<_>>>()?,
+                )))
             }};
         }
 
@@ -1273,7 +1321,7 @@ impl ScalarValue {
                     ScalarValue::iter_to_decimal256_array(scalars, *precision, *scale)?;
                 Arc::new(decimal_array)
             }
-            DataType::Null => ScalarValue::iter_to_null_array(scalars),
+            DataType::Null => ScalarValue::iter_to_null_array(scalars)?,
             DataType::Boolean => build_array_primitive!(BooleanArray, Boolean),
             DataType::Float32 => build_array_primitive!(Float32Array, Float32),
             DataType::Float64 => build_array_primitive!(Float64Array, Float64),
@@ -1337,34 +1385,34 @@ impl ScalarValue {
                 build_array_primitive!(IntervalMonthDayNanoArray, IntervalMonthDayNano)
             }
             DataType::List(fields) if fields.data_type() == &DataType::Int8 => {
-                build_array_list_primitive!(Int8Type, Int8, i8)
+                build_array_list_primitive!(Int8Type, Int8, i8)?
             }
             DataType::List(fields) if fields.data_type() == &DataType::Int16 => {
-                build_array_list_primitive!(Int16Type, Int16, i16)
+                build_array_list_primitive!(Int16Type, Int16, i16)?
             }
             DataType::List(fields) if fields.data_type() == &DataType::Int32 => {
-                build_array_list_primitive!(Int32Type, Int32, i32)
+                build_array_list_primitive!(Int32Type, Int32, i32)?
             }
             DataType::List(fields) if fields.data_type() == &DataType::Int64 => {
-                build_array_list_primitive!(Int64Type, Int64, i64)
+                build_array_list_primitive!(Int64Type, Int64, i64)?
             }
             DataType::List(fields) if fields.data_type() == &DataType::UInt8 => {
-                build_array_list_primitive!(UInt8Type, UInt8, u8)
+                build_array_list_primitive!(UInt8Type, UInt8, u8)?
             }
             DataType::List(fields) if fields.data_type() == &DataType::UInt16 => {
-                build_array_list_primitive!(UInt16Type, UInt16, u16)
+                build_array_list_primitive!(UInt16Type, UInt16, u16)?
             }
             DataType::List(fields) if fields.data_type() == &DataType::UInt32 => {
-                build_array_list_primitive!(UInt32Type, UInt32, u32)
+                build_array_list_primitive!(UInt32Type, UInt32, u32)?
             }
             DataType::List(fields) if fields.data_type() == &DataType::UInt64 => {
-                build_array_list_primitive!(UInt64Type, UInt64, u64)
+                build_array_list_primitive!(UInt64Type, UInt64, u64)?
             }
             DataType::List(fields) if fields.data_type() == &DataType::Float32 => {
-                build_array_list_primitive!(Float32Type, Float32, f32)
+                build_array_list_primitive!(Float32Type, Float32, f32)?
             }
             DataType::List(fields) if fields.data_type() == &DataType::Float64 => {
-                build_array_list_primitive!(Float64Type, Float64, f64)
+                build_array_list_primitive!(Float64Type, Float64, f64)?
             }
             DataType::List(fields) if fields.data_type() == &DataType::Utf8 => {
                 build_array_list_string!(StringBuilder, as_string_array)
@@ -1432,7 +1480,7 @@ impl ScalarValue {
                             if &inner_key_type == key_type {
                                 Ok(*scalar)
                             } else {
-                                panic!("Expected inner key type of {key_type} but found: {inner_key_type}, value was ({scalar:?})");
+                                _internal_err!("Expected inner key type of {key_type} but found: {inner_key_type}, value was ({scalar:?})")
                             }
                         }
                         _ => {
@@ -1504,15 +1552,19 @@ impl ScalarValue {
         Ok(array)
     }
 
-    fn iter_to_null_array(scalars: impl IntoIterator<Item = ScalarValue>) -> ArrayRef {
-        let length =
-            scalars
-                .into_iter()
-                .fold(0usize, |r, element: ScalarValue| match element {
-                    ScalarValue::Null => r + 1,
-                    _ => unreachable!(),
-                });
-        new_null_array(&DataType::Null, length)
+    fn iter_to_null_array(
+        scalars: impl IntoIterator<Item = ScalarValue>,
+    ) -> Result<ArrayRef> {
+        let length = scalars.into_iter().try_fold(
+            0usize,
+            |r, element: ScalarValue| match element {
+                ScalarValue::Null => Ok::<usize, DataFusionError>(r + 1),
+                s => {
+                    _internal_err!("Expected ScalarValue::Null element. Received {s:?}")
+                }
+            },
+        )?;
+        Ok(new_null_array(&DataType::Null, length))
     }
 
     fn iter_to_decimal_array(
@@ -1523,10 +1575,12 @@ impl ScalarValue {
         let array = scalars
             .into_iter()
             .map(|element: ScalarValue| match element {
-                ScalarValue::Decimal128(v1, _, _) => v1,
-                _ => unreachable!(),
+                ScalarValue::Decimal128(v1, _, _) => Ok(v1),
+                s => {
+                    _internal_err!("Expected ScalarValue::Null element. Received {s:?}")
+                }
             })
-            .collect::<Decimal128Array>()
+            .collect::<Result<Decimal128Array>>()?
             .with_precision_and_scale(precision, scale)?;
         Ok(array)
     }
@@ -1539,10 +1593,14 @@ impl ScalarValue {
         let array = scalars
             .into_iter()
             .map(|element: ScalarValue| match element {
-                ScalarValue::Decimal256(v1, _, _) => v1,
-                _ => unreachable!(),
+                ScalarValue::Decimal256(v1, _, _) => Ok(v1),
+                s => {
+                    _internal_err!(
+                        "Expected ScalarValue::Decimal256 element. Received {s:?}"
+                    )
+                }
             })
-            .collect::<Decimal256Array>()
+            .collect::<Result<Decimal256Array>>()?
             .with_precision_and_scale(precision, scale)?;
         Ok(array)
     }
@@ -1607,17 +1665,17 @@ impl ScalarValue {
         precision: u8,
         scale: i8,
         size: usize,
-    ) -> Decimal128Array {
+    ) -> Result<Decimal128Array> {
         match value {
             Some(val) => Decimal128Array::from(vec![val; size])
                 .with_precision_and_scale(precision, scale)
-                .unwrap(),
+                .map_err(DataFusionError::ArrowError),
             None => {
                 let mut builder = Decimal128Array::builder(size)
                     .with_precision_and_scale(precision, scale)
-                    .unwrap();
+                    .map_err(DataFusionError::ArrowError)?;
                 builder.append_nulls(size);
-                builder.finish()
+                Ok(builder.finish())
             }
         }
     }
@@ -1627,12 +1685,12 @@ impl ScalarValue {
         precision: u8,
         scale: i8,
         size: usize,
-    ) -> Decimal256Array {
+    ) -> Result<Decimal256Array> {
         std::iter::repeat(value)
             .take(size)
             .collect::<Decimal256Array>()
             .with_precision_and_scale(precision, scale)
-            .unwrap()
+            .map_err(DataFusionError::ArrowError)
     }
 
     /// Converts `Vec<ScalarValue>` where each element has type corresponding to
@@ -1671,13 +1729,21 @@ impl ScalarValue {
     }
 
     /// Converts a scalar value into an array of `size` rows.
-    pub fn to_array_of_size(&self, size: usize) -> ArrayRef {
-        match self {
+    ///
+    /// # Errors
+    ///
+    /// Errors if `self` is
+    /// - a decimal that fails be converted to a decimal array of size
+    /// - a `Fixedsizelist` that is not supported yet
+    /// - a `List` that fails to be concatenated into an array of size
+    /// - a `Dictionary` that fails be converted to a dictionary array of size
+    pub fn to_array_of_size(&self, size: usize) -> Result<ArrayRef> {
+        Ok(match self {
             ScalarValue::Decimal128(e, precision, scale) => Arc::new(
-                ScalarValue::build_decimal_array(*e, *precision, *scale, size),
+                ScalarValue::build_decimal_array(*e, *precision, *scale, size)?,
             ),
             ScalarValue::Decimal256(e, precision, scale) => Arc::new(
-                ScalarValue::build_decimal256_array(*e, *precision, *scale, size),
+                ScalarValue::build_decimal256_array(*e, *precision, *scale, size)?,
             ),
             ScalarValue::Boolean(e) => {
                 Arc::new(BooleanArray::from(vec![*e; size])) as ArrayRef
@@ -1790,13 +1856,14 @@ impl ScalarValue {
                 ),
             },
             ScalarValue::Fixedsizelist(..) => {
-                unimplemented!("FixedSizeList is not supported yet")
+                return _not_impl_err!("FixedSizeList is not supported yet")
             }
             ScalarValue::List(arr) => {
                 let arrays = std::iter::repeat(arr.as_ref())
                     .take(size)
                     .collect::<Vec<_>>();
-                arrow::compute::concat(arrays.as_slice()).unwrap()
+                arrow::compute::concat(arrays.as_slice())
+                    .map_err(DataFusionError::ArrowError)?
             }
             ScalarValue::Date32(e) => {
                 build_array_from_option!(Date32, Date32Array, e, size)
@@ -1891,13 +1958,13 @@ impl ScalarValue {
             ),
             ScalarValue::Struct(values, fields) => match values {
                 Some(values) => {
-                    let field_values: Vec<_> = fields
+                    let field_values = fields
                         .iter()
                         .zip(values.iter())
                         .map(|(field, value)| {
-                            (field.clone(), value.to_array_of_size(size))
+                            Ok((field.clone(), value.to_array_of_size(size)?))
                         })
-                        .collect();
+                        .collect::<Result<Vec<_>>>()?;
 
                     Arc::new(StructArray::from(field_values))
                 }
@@ -1909,19 +1976,19 @@ impl ScalarValue {
             ScalarValue::Dictionary(key_type, v) => {
                 // values array is one element long (the value)
                 match key_type.as_ref() {
-                    DataType::Int8 => dict_from_scalar::<Int8Type>(v, size),
-                    DataType::Int16 => dict_from_scalar::<Int16Type>(v, size),
-                    DataType::Int32 => dict_from_scalar::<Int32Type>(v, size),
-                    DataType::Int64 => dict_from_scalar::<Int64Type>(v, size),
-                    DataType::UInt8 => dict_from_scalar::<UInt8Type>(v, size),
-                    DataType::UInt16 => dict_from_scalar::<UInt16Type>(v, size),
-                    DataType::UInt32 => dict_from_scalar::<UInt32Type>(v, size),
-                    DataType::UInt64 => dict_from_scalar::<UInt64Type>(v, size),
+                    DataType::Int8 => dict_from_scalar::<Int8Type>(v, size)?,
+                    DataType::Int16 => dict_from_scalar::<Int16Type>(v, size)?,
+                    DataType::Int32 => dict_from_scalar::<Int32Type>(v, size)?,
+                    DataType::Int64 => dict_from_scalar::<Int64Type>(v, size)?,
+                    DataType::UInt8 => dict_from_scalar::<UInt8Type>(v, size)?,
+                    DataType::UInt16 => dict_from_scalar::<UInt16Type>(v, size)?,
+                    DataType::UInt32 => dict_from_scalar::<UInt32Type>(v, size)?,
+                    DataType::UInt64 => dict_from_scalar::<UInt64Type>(v, size)?,
                     _ => unreachable!("Invalid dictionary keys type: {:?}", key_type),
                 }
             }
             ScalarValue::Null => new_null_array(&DataType::Null, size),
-        }
+        })
     }
 
     fn get_decimal_value_from_array(
@@ -2037,23 +2104,25 @@ impl ScalarValue {
                     array, index, *precision, *scale,
                 )?
             }
-            DataType::Boolean => typed_cast!(array, index, BooleanArray, Boolean),
-            DataType::Float64 => typed_cast!(array, index, Float64Array, Float64),
-            DataType::Float32 => typed_cast!(array, index, Float32Array, Float32),
-            DataType::UInt64 => typed_cast!(array, index, UInt64Array, UInt64),
-            DataType::UInt32 => typed_cast!(array, index, UInt32Array, UInt32),
-            DataType::UInt16 => typed_cast!(array, index, UInt16Array, UInt16),
-            DataType::UInt8 => typed_cast!(array, index, UInt8Array, UInt8),
-            DataType::Int64 => typed_cast!(array, index, Int64Array, Int64),
-            DataType::Int32 => typed_cast!(array, index, Int32Array, Int32),
-            DataType::Int16 => typed_cast!(array, index, Int16Array, Int16),
-            DataType::Int8 => typed_cast!(array, index, Int8Array, Int8),
-            DataType::Binary => typed_cast!(array, index, BinaryArray, Binary),
+            DataType::Boolean => typed_cast!(array, index, BooleanArray, Boolean)?,
+            DataType::Float64 => typed_cast!(array, index, Float64Array, Float64)?,
+            DataType::Float32 => typed_cast!(array, index, Float32Array, Float32)?,
+            DataType::UInt64 => typed_cast!(array, index, UInt64Array, UInt64)?,
+            DataType::UInt32 => typed_cast!(array, index, UInt32Array, UInt32)?,
+            DataType::UInt16 => typed_cast!(array, index, UInt16Array, UInt16)?,
+            DataType::UInt8 => typed_cast!(array, index, UInt8Array, UInt8)?,
+            DataType::Int64 => typed_cast!(array, index, Int64Array, Int64)?,
+            DataType::Int32 => typed_cast!(array, index, Int32Array, Int32)?,
+            DataType::Int16 => typed_cast!(array, index, Int16Array, Int16)?,
+            DataType::Int8 => typed_cast!(array, index, Int8Array, Int8)?,
+            DataType::Binary => typed_cast!(array, index, BinaryArray, Binary)?,
             DataType::LargeBinary => {
-                typed_cast!(array, index, LargeBinaryArray, LargeBinary)
+                typed_cast!(array, index, LargeBinaryArray, LargeBinary)?
+            }
+            DataType::Utf8 => typed_cast!(array, index, StringArray, Utf8)?,
+            DataType::LargeUtf8 => {
+                typed_cast!(array, index, LargeStringArray, LargeUtf8)?
             }
-            DataType::Utf8 => typed_cast!(array, index, StringArray, Utf8),
-            DataType::LargeUtf8 => typed_cast!(array, index, LargeStringArray, LargeUtf8),
             DataType::List(_) => {
                 let list_array = as_list_array(array);
                 let nested_array = list_array.value(index);
@@ -2071,70 +2140,58 @@ impl ScalarValue {
 
                 ScalarValue::List(arr)
             }
-            DataType::Date32 => {
-                typed_cast!(array, index, Date32Array, Date32)
-            }
-            DataType::Date64 => {
-                typed_cast!(array, index, Date64Array, Date64)
-            }
+            DataType::Date32 => typed_cast!(array, index, Date32Array, Date32)?,
+            DataType::Date64 => typed_cast!(array, index, Date64Array, Date64)?,
             DataType::Time32(TimeUnit::Second) => {
-                typed_cast!(array, index, Time32SecondArray, Time32Second)
+                typed_cast!(array, index, Time32SecondArray, Time32Second)?
             }
             DataType::Time32(TimeUnit::Millisecond) => {
-                typed_cast!(array, index, Time32MillisecondArray, Time32Millisecond)
+                typed_cast!(array, index, Time32MillisecondArray, Time32Millisecond)?
             }
             DataType::Time64(TimeUnit::Microsecond) => {
-                typed_cast!(array, index, Time64MicrosecondArray, Time64Microsecond)
+                typed_cast!(array, index, Time64MicrosecondArray, Time64Microsecond)?
             }
             DataType::Time64(TimeUnit::Nanosecond) => {
-                typed_cast!(array, index, Time64NanosecondArray, Time64Nanosecond)
-            }
-            DataType::Timestamp(TimeUnit::Second, tz_opt) => {
-                typed_cast_tz!(
-                    array,
-                    index,
-                    TimestampSecondArray,
-                    TimestampSecond,
-                    tz_opt
-                )
-            }
-            DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
-                typed_cast_tz!(
-                    array,
-                    index,
-                    TimestampMillisecondArray,
-                    TimestampMillisecond,
-                    tz_opt
-                )
-            }
-            DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
-                typed_cast_tz!(
-                    array,
-                    index,
-                    TimestampMicrosecondArray,
-                    TimestampMicrosecond,
-                    tz_opt
-                )
-            }
-            DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
-                typed_cast_tz!(
-                    array,
-                    index,
-                    TimestampNanosecondArray,
-                    TimestampNanosecond,
-                    tz_opt
-                )
+                typed_cast!(array, index, Time64NanosecondArray, Time64Nanosecond)?
             }
+            DataType::Timestamp(TimeUnit::Second, tz_opt) => typed_cast_tz!(
+                array,
+                index,
+                TimestampSecondArray,
+                TimestampSecond,
+                tz_opt
+            )?,
+            DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_cast_tz!(
+                array,
+                index,
+                TimestampMillisecondArray,
+                TimestampMillisecond,
+                tz_opt
+            )?,
+            DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_cast_tz!(
+                array,
+                index,
+                TimestampMicrosecondArray,
+                TimestampMicrosecond,
+                tz_opt
+            )?,
+            DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_cast_tz!(
+                array,
+                index,
+                TimestampNanosecondArray,
+                TimestampNanosecond,
+                tz_opt
+            )?,
             DataType::Dictionary(key_type, _) => {
                 let (values_array, values_index) = match key_type.as_ref() {
-                    DataType::Int8 => get_dict_value::<Int8Type>(array, index),
-                    DataType::Int16 => get_dict_value::<Int16Type>(array, index),
-                    DataType::Int32 => get_dict_value::<Int32Type>(array, index),
-                    DataType::Int64 => get_dict_value::<Int64Type>(array, index),
-                    DataType::UInt8 => get_dict_value::<UInt8Type>(array, index),
-                    DataType::UInt16 => get_dict_value::<UInt16Type>(array, index),
-                    DataType::UInt32 => get_dict_value::<UInt32Type>(array, index),
-                    DataType::UInt64 => get_dict_value::<UInt64Type>(array, index),
+                    DataType::Int8 => get_dict_value::<Int8Type>(array, index)?,
+                    DataType::Int16 => get_dict_value::<Int16Type>(array, index)?,
+                    DataType::Int32 => get_dict_value::<Int32Type>(array, index)?,
+                    DataType::Int64 => get_dict_value::<Int64Type>(array, index)?,
+                    DataType::UInt8 => get_dict_value::<UInt8Type>(array, index)?,
+                    DataType::UInt16 => get_dict_value::<UInt16Type>(array, index)?,
+                    DataType::UInt32 => get_dict_value::<UInt32Type>(array, index)?,
+                    DataType::UInt64 => get_dict_value::<UInt64Type>(array, index)?,
                     _ => unreachable!("Invalid dictionary keys type: {:?}", key_type),
                 };
                 // look up the index in the values dictionary
@@ -2173,31 +2230,29 @@ impl ScalarValue {
                 )
             }
             DataType::Interval(IntervalUnit::DayTime) => {
-                typed_cast!(array, index, IntervalDayTimeArray, IntervalDayTime)
+                typed_cast!(array, index, IntervalDayTimeArray, IntervalDayTime)?
             }
             DataType::Interval(IntervalUnit::YearMonth) => {
-                typed_cast!(array, index, IntervalYearMonthArray, IntervalYearMonth)
-            }
-            DataType::Interval(IntervalUnit::MonthDayNano) => {
-                typed_cast!(
-                    array,
-                    index,
-                    IntervalMonthDayNanoArray,
-                    IntervalMonthDayNano
-                )
+                typed_cast!(array, index, IntervalYearMonthArray, IntervalYearMonth)?
             }
+            DataType::Interval(IntervalUnit::MonthDayNano) => typed_cast!(
+                array,
+                index,
+                IntervalMonthDayNanoArray,
+                IntervalMonthDayNano
+            )?,
 
             DataType::Duration(TimeUnit::Second) => {
-                typed_cast!(array, index, DurationSecondArray, DurationSecond)
+                typed_cast!(array, index, DurationSecondArray, DurationSecond)?
             }
             DataType::Duration(TimeUnit::Millisecond) => {
-                typed_cast!(array, index, DurationMillisecondArray, DurationMillisecond)
+                typed_cast!(array, index, DurationMillisecondArray, DurationMillisecond)?
             }
             DataType::Duration(TimeUnit::Microsecond) => {
-                typed_cast!(array, index, DurationMicrosecondArray, DurationMicrosecond)
+                typed_cast!(array, index, DurationMicrosecondArray, DurationMicrosecond)?
             }
             DataType::Duration(TimeUnit::Nanosecond) => {
-                typed_cast!(array, index, DurationNanosecondArray, DurationNanosecond)
+                typed_cast!(array, index, DurationNanosecondArray, DurationNanosecond)?
             }
 
             other => {
@@ -2215,7 +2270,7 @@ impl ScalarValue {
             safe: false,
             format_options: Default::default(),
         };
-        let cast_arr = cast_with_options(&value.to_array(), target_type, &cast_options)?;
+        let cast_arr = cast_with_options(&value.to_array()?, target_type, &cast_options)?;
         ScalarValue::try_from_array(&cast_arr, 0)
     }
 
@@ -2273,9 +2328,21 @@ impl ScalarValue {
     ///
     /// This function has a few narrow usescases such as hash table key
     /// comparisons where comparing a single row at a time is necessary.
+    ///
+    /// # Errors
+    ///
+    /// Errors if
+    /// - it fails to downcast `array` to the data type of `self`
+    /// - `self` is a `Fixedsizelist`
+    /// - `self` is a `List`
+    /// - `self` is a `Struct`
+    ///
+    /// # Panics
+    ///
+    /// Panics if `self` is a dictionary with invalid key type
     #[inline]
-    pub fn eq_array(&self, array: &ArrayRef, index: usize) -> bool {
-        match self {
+    pub fn eq_array(&self, array: &ArrayRef, index: usize) -> Result<bool> {
+        Ok(match self {
             ScalarValue::Decimal128(v, precision, scale) => {
                 ScalarValue::eq_array_decimal(
                     array,
@@ -2283,8 +2350,7 @@ impl ScalarValue {
                     v.as_ref(),
                     *precision,
                     *scale,
-                )
-                .unwrap()
+                )?
             }
             ScalarValue::Decimal256(v, precision, scale) => {
                 ScalarValue::eq_array_decimal256(
@@ -2293,119 +2359,132 @@ impl ScalarValue {
                     v.as_ref(),
                     *precision,
                     *scale,
-                )
-                .unwrap()
+                )?
             }
             ScalarValue::Boolean(val) => {
-                eq_array_primitive!(array, index, BooleanArray, val)
+                eq_array_primitive!(array, index, BooleanArray, val)?
             }
             ScalarValue::Float32(val) => {
-                eq_array_primitive!(array, index, Float32Array, val)
+                eq_array_primitive!(array, index, Float32Array, val)?
             }
             ScalarValue::Float64(val) => {
-                eq_array_primitive!(array, index, Float64Array, val)
+                eq_array_primitive!(array, index, Float64Array, val)?
+            }
+            ScalarValue::Int8(val) => eq_array_primitive!(array, index, Int8Array, val)?,
+            ScalarValue::Int16(val) => {
+                eq_array_primitive!(array, index, Int16Array, val)?
+            }
+            ScalarValue::Int32(val) => {
+                eq_array_primitive!(array, index, Int32Array, val)?
+            }
+            ScalarValue::Int64(val) => {
+                eq_array_primitive!(array, index, Int64Array, val)?
+            }
+            ScalarValue::UInt8(val) => {
+                eq_array_primitive!(array, index, UInt8Array, val)?
             }
-            ScalarValue::Int8(val) => eq_array_primitive!(array, index, Int8Array, val),
-            ScalarValue::Int16(val) => eq_array_primitive!(array, index, Int16Array, val),
-            ScalarValue::Int32(val) => eq_array_primitive!(array, index, Int32Array, val),
-            ScalarValue::Int64(val) => eq_array_primitive!(array, index, Int64Array, val),
-            ScalarValue::UInt8(val) => eq_array_primitive!(array, index, UInt8Array, val),
             ScalarValue::UInt16(val) => {
-                eq_array_primitive!(array, index, UInt16Array, val)
+                eq_array_primitive!(array, index, UInt16Array, val)?
             }
             ScalarValue::UInt32(val) => {
-                eq_array_primitive!(array, index, UInt32Array, val)
+                eq_array_primitive!(array, index, UInt32Array, val)?
             }
             ScalarValue::UInt64(val) => {
-                eq_array_primitive!(array, index, UInt64Array, val)
+                eq_array_primitive!(array, index, UInt64Array, val)?
+            }
+            ScalarValue::Utf8(val) => {
+                eq_array_primitive!(array, index, StringArray, val)?
             }
-            ScalarValue::Utf8(val) => eq_array_primitive!(array, index, StringArray, val),
             ScalarValue::LargeUtf8(val) => {
-                eq_array_primitive!(array, index, LargeStringArray, val)
+                eq_array_primitive!(array, index, LargeStringArray, val)?
             }
             ScalarValue::Binary(val) => {
-                eq_array_primitive!(array, index, BinaryArray, val)
+                eq_array_primitive!(array, index, BinaryArray, val)?
             }
             ScalarValue::FixedSizeBinary(_, val) => {
-                eq_array_primitive!(array, index, FixedSizeBinaryArray, val)
+                eq_array_primitive!(array, index, FixedSizeBinaryArray, val)?
             }
             ScalarValue::LargeBinary(val) => {
-                eq_array_primitive!(array, index, LargeBinaryArray, val)
+                eq_array_primitive!(array, index, LargeBinaryArray, val)?
+            }
+            ScalarValue::Fixedsizelist(..) => {
+                return _not_impl_err!("FixedSizeList is not supported yet")
             }
-            ScalarValue::Fixedsizelist(..) => unimplemented!(),
-            ScalarValue::List(_) => unimplemented!("ListArr"),
+            ScalarValue::List(_) => return _not_impl_err!("List is not supported yet"),
             ScalarValue::Date32(val) => {
-                eq_array_primitive!(array, index, Date32Array, val)
+                eq_array_primitive!(array, index, Date32Array, val)?
             }
             ScalarValue::Date64(val) => {
-                eq_array_primitive!(array, index, Date64Array, val)
+                eq_array_primitive!(array, index, Date64Array, val)?
             }
             ScalarValue::Time32Second(val) => {
-                eq_array_primitive!(array, index, Time32SecondArray, val)
+                eq_array_primitive!(array, index, Time32SecondArray, val)?
             }
             ScalarValue::Time32Millisecond(val) => {
-                eq_array_primitive!(array, index, Time32MillisecondArray, val)
+                eq_array_primitive!(array, index, Time32MillisecondArray, val)?
             }
             ScalarValue::Time64Microsecond(val) => {
-                eq_array_primitive!(array, index, Time64MicrosecondArray, val)
+                eq_array_primitive!(array, index, Time64MicrosecondArray, val)?
             }
             ScalarValue::Time64Nanosecond(val) => {
-                eq_array_primitive!(array, index, Time64NanosecondArray, val)
+                eq_array_primitive!(array, index, Time64NanosecondArray, val)?
             }
             ScalarValue::TimestampSecond(val, _) => {
-                eq_array_primitive!(array, index, TimestampSecondArray, val)
+                eq_array_primitive!(array, index, TimestampSecondArray, val)?
             }
             ScalarValue::TimestampMillisecond(val, _) => {
-                eq_array_primitive!(array, index, TimestampMillisecondArray, val)
+                eq_array_primitive!(array, index, TimestampMillisecondArray, val)?
             }
             ScalarValue::TimestampMicrosecond(val, _) => {
-                eq_array_primitive!(array, index, TimestampMicrosecondArray, val)
+                eq_array_primitive!(array, index, TimestampMicrosecondArray, val)?
             }
             ScalarValue::TimestampNanosecond(val, _) => {
-                eq_array_primitive!(array, index, TimestampNanosecondArray, val)
+                eq_array_primitive!(array, index, TimestampNanosecondArray, val)?
             }
             ScalarValue::IntervalYearMonth(val) => {
-                eq_array_primitive!(array, index, IntervalYearMonthArray, val)
+                eq_array_primitive!(array, index, IntervalYearMonthArray, val)?
             }
             ScalarValue::IntervalDayTime(val) => {
-                eq_array_primitive!(array, index, IntervalDayTimeArray, val)
+                eq_array_primitive!(array, index, IntervalDayTimeArray, val)?
             }
             ScalarValue::IntervalMonthDayNano(val) => {
-                eq_array_primitive!(array, index, IntervalMonthDayNanoArray, val)
+                eq_array_primitive!(array, index, IntervalMonthDayNanoArray, val)?
             }
             ScalarValue::DurationSecond(val) => {
-                eq_array_primitive!(array, index, DurationSecondArray, val)
+                eq_array_primitive!(array, index, DurationSecondArray, val)?
             }
             ScalarValue::DurationMillisecond(val) => {
-                eq_array_primitive!(array, index, DurationMillisecondArray, val)
+                eq_array_primitive!(array, index, DurationMillisecondArray, val)?
             }
             ScalarValue::DurationMicrosecond(val) => {
-                eq_array_primitive!(array, index, DurationMicrosecondArray, val)
+                eq_array_primitive!(array, index, DurationMicrosecondArray, val)?
             }
             ScalarValue::DurationNanosecond(val) => {
-                eq_array_primitive!(array, index, DurationNanosecondArray, val)
+                eq_array_primitive!(array, index, DurationNanosecondArray, val)?
+            }
+            ScalarValue::Struct(_, _) => {
+                return _not_impl_err!("Struct is not supported yet")
             }
-            ScalarValue::Struct(_, _) => unimplemented!(),
             ScalarValue::Dictionary(key_type, v) => {
                 let (values_array, values_index) = match key_type.as_ref() {
-                    DataType::Int8 => get_dict_value::<Int8Type>(array, index),
-                    DataType::Int16 => get_dict_value::<Int16Type>(array, index),
-                    DataType::Int32 => get_dict_value::<Int32Type>(array, index),
-                    DataType::Int64 => get_dict_value::<Int64Type>(array, index),
-                    DataType::UInt8 => get_dict_value::<UInt8Type>(array, index),
-                    DataType::UInt16 => get_dict_value::<UInt16Type>(array, index),
-                    DataType::UInt32 => get_dict_value::<UInt32Type>(array, index),
-                    DataType::UInt64 => get_dict_value::<UInt64Type>(array, index),
+                    DataType::Int8 => get_dict_value::<Int8Type>(array, index)?,
+                    DataType::Int16 => get_dict_value::<Int16Type>(array, index)?,
+                    DataType::Int32 => get_dict_value::<Int32Type>(array, index)?,
+                    DataType::Int64 => get_dict_value::<Int64Type>(array, index)?,
+                    DataType::UInt8 => get_dict_value::<UInt8Type>(array, index)?,
+                    DataType::UInt16 => get_dict_value::<UInt16Type>(array, index)?,
+                    DataType::UInt32 => get_dict_value::<UInt32Type>(array, index)?,
+                    DataType::UInt64 => get_dict_value::<UInt64Type>(array, index)?,
                     _ => unreachable!("Invalid dictionary keys type: {:?}", key_type),
                 };
                 // was the value in the array non null?
                 match values_index {
-                    Some(values_index) => v.eq_array(values_array, values_index),
+                    Some(values_index) => v.eq_array(values_array, values_index)?,
                     None => v.is_null(),
                 }
             }
             ScalarValue::Null => array.is_null(index),
-        }
+        })
     }
 
     /// Estimate size if bytes including `Self`. For values with internal containers such as `String`
@@ -2785,6 +2864,11 @@ macro_rules! format_option {
     }};
 }
 
+// Implement Display trait for ScalarValue
+//
+// # Panics
+//
+// Panics if there is an error when creating a visual representation of columns via `arrow::util::pretty`
 impl fmt::Display for ScalarValue {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         match self {
@@ -3031,7 +3115,9 @@ mod tests {
         ])]);
 
         let sv = ScalarValue::List(Arc::new(arr));
-        let actual_arr = sv.to_array_of_size(2);
+        let actual_arr = sv
+            .to_array_of_size(2)
+            .expect("Failed to convert to array of size");
         let actual_list_arr = as_list_array(&actual_arr);
 
         let arr = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
@@ -3238,8 +3324,8 @@ mod tests {
     {
         let scalar_result = left.add_checked(&right);
 
-        let left_array = left.to_array();
-        let right_array = right.to_array();
+        let left_array = left.to_array().expect("Failed to convert to array");
+        let right_array = right.to_array().expect("Failed to convert to array");
         let arrow_left_array = left_array.as_primitive::<T>();
         let arrow_right_array = right_array.as_primitive::<T>();
         let arrow_result = kernels::numeric::add(arrow_left_array, arrow_right_array);
@@ -3287,22 +3373,30 @@ mod tests {
         }
 
         // decimal scalar to array
-        let array = decimal_value.to_array();
+        let array = decimal_value
+            .to_array()
+            .expect("Failed to convert to array");
         let array = as_decimal128_array(&array)?;
         assert_eq!(1, array.len());
         assert_eq!(DataType::Decimal128(10, 1), array.data_type().clone());
         assert_eq!(123i128, array.value(0));
 
         // decimal scalar to array with size
-        let array = decimal_value.to_array_of_size(10);
+        let array = decimal_value
+            .to_array_of_size(10)
+            .expect("Failed to convert to array of size");
         let array_decimal = as_decimal128_array(&array)?;
         assert_eq!(10, array.len());
         assert_eq!(DataType::Decimal128(10, 1), array.data_type().clone());
         assert_eq!(123i128, array_decimal.value(0));
         assert_eq!(123i128, array_decimal.value(9));
         // test eq array
-        assert!(decimal_value.eq_array(&array, 1));
-        assert!(decimal_value.eq_array(&array, 5));
+        assert!(decimal_value
+            .eq_array(&array, 1)
+            .expect("Failed to compare arrays"));
+        assert!(decimal_value
+            .eq_array(&array, 5)
+            .expect("Failed to compare arrays"));
         // test try from array
         assert_eq!(
             decimal_value,
@@ -3349,13 +3443,16 @@ mod tests {
 
         assert!(ScalarValue::try_new_decimal128(1, 10, 2)
             .unwrap()
-            .eq_array(&array, 0));
+            .eq_array(&array, 0)
+            .expect("Failed to compare arrays"));
         assert!(ScalarValue::try_new_decimal128(2, 10, 2)
             .unwrap()
-            .eq_array(&array, 1));
+            .eq_array(&array, 1)
+            .expect("Failed to compare arrays"));
         assert!(ScalarValue::try_new_decimal128(3, 10, 2)
             .unwrap()
-            .eq_array(&array, 2));
+            .eq_array(&array, 2)
+            .expect("Failed to compare arrays"));
         assert_eq!(
             ScalarValue::Decimal128(None, 10, 2),
             ScalarValue::try_from_array(&array, 3).unwrap()
@@ -3442,14 +3539,14 @@ mod tests {
     #[test]
     fn scalar_value_to_array_u64() -> Result<()> {
         let value = ScalarValue::UInt64(Some(13u64));
-        let array = value.to_array();
+        let array = value.to_array().expect("Failed to convert to array");
         let array = as_uint64_array(&array)?;
         assert_eq!(array.len(), 1);
         assert!(!array.is_null(0));
         assert_eq!(array.value(0), 13);
 
         let value = ScalarValue::UInt64(None);
-        let array = value.to_array();
+        let array = value.to_array().expect("Failed to convert to array");
         let array = as_uint64_array(&array)?;
         assert_eq!(array.len(), 1);
         assert!(array.is_null(0));
@@ -3459,14 +3556,14 @@ mod tests {
     #[test]
     fn scalar_value_to_array_u32() -> Result<()> {
         let value = ScalarValue::UInt32(Some(13u32));
-        let array = value.to_array();
+        let array = value.to_array().expect("Failed to convert to array");
         let array = as_uint32_array(&array)?;
         assert_eq!(array.len(), 1);
         assert!(!array.is_null(0));
         assert_eq!(array.value(0), 13);
 
         let value = ScalarValue::UInt32(None);
-        let array = value.to_array();
+        let array = value.to_array().expect("Failed to convert to array");
         let array = as_uint32_array(&array)?;
         assert_eq!(array.len(), 1);
         assert!(array.is_null(0));
@@ -4025,7 +4122,9 @@ mod tests {
 
             for (index, scalar) in scalars.into_iter().enumerate() {
                 assert!(
-                    scalar.eq_array(&array, index),
+                    scalar
+                        .eq_array(&array, index)
+                        .expect("Failed to compare arrays"),
                     "Expected {scalar:?} to be equal to {array:?} at index {index}"
                 );
 
@@ -4033,7 +4132,7 @@ mod tests {
                 for other_index in 0..array.len() {
                     if index != other_index {
                         assert!(
-                            !scalar.eq_array(&array, other_index),
+                            !scalar.eq_array(&array, other_index).expect("Failed to compare arrays"),
                             "Expected {scalar:?} to be NOT equal to {array:?} at index {other_index}"
                         );
                     }
@@ -4136,7 +4235,9 @@ mod tests {
         );
 
         // Convert to length-2 array
-        let array = scalar.to_array_of_size(2);
+        let array = scalar
+            .to_array_of_size(2)
+            .expect("Failed to convert to array of size");
 
         let expected = Arc::new(StructArray::from(vec![
             (
@@ -4570,7 +4671,7 @@ mod tests {
             DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
         );
 
-        let array = scalar.to_array();
+        let array = scalar.to_array().expect("Failed to convert to array");
         assert_eq!(array.len(), 1);
         assert_eq!(
             array.data_type(),
@@ -4607,7 +4708,7 @@ mod tests {
     // mimics how casting work on scalar values by `casting` `scalar` to `desired_type`
     fn check_scalar_cast(scalar: ScalarValue, desired_type: DataType) {
         // convert from scalar --> Array to call cast
-        let scalar_array = scalar.to_array();
+        let scalar_array = scalar.to_array().expect("Failed to convert to array");
         // cast the actual value
         let cast_array = kernels::cast::cast(&scalar_array, &desired_type).unwrap();
 
@@ -4616,7 +4717,9 @@ mod tests {
         assert_eq!(cast_scalar.data_type(), desired_type);
 
         // Some time later the "cast" scalar is turned back into an array:
-        let array = cast_scalar.to_array_of_size(10);
+        let array = cast_scalar
+            .to_array_of_size(10)
+            .expect("Failed to convert to array of size");
 
         // The datatype should be "Dictionary" but is actually Utf8!!!
         assert_eq!(array.data_type(), &desired_type)
@@ -5065,7 +5168,8 @@ mod tests {
         let arrays = scalars
             .iter()
             .map(ScalarValue::to_array)
-            .collect::<Vec<_>>();
+            .collect::<Result<Vec<_>>>()
+            .expect("Failed to convert to array");
         let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
         let array = concat(&arrays).unwrap();
         check_array(array);
diff --git a/datafusion/core/benches/scalar.rs b/datafusion/core/benches/scalar.rs
index 30f21a964d..540f7212e9 100644
--- a/datafusion/core/benches/scalar.rs
+++ b/datafusion/core/benches/scalar.rs
@@ -22,7 +22,15 @@ fn criterion_benchmark(c: &mut Criterion) {
     c.bench_function("to_array_of_size 100000", |b| {
         let scalar = ScalarValue::Int32(Some(100));
 
-        b.iter(|| assert_eq!(scalar.to_array_of_size(100000).null_count(), 0))
+        b.iter(|| {
+            assert_eq!(
+                scalar
+                    .to_array_of_size(100000)
+                    .expect("Failed to convert to array of size")
+                    .null_count(),
+                0
+            )
+        })
     });
 }
 
diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index d6a0add9b2..986e54ebbe 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -276,7 +276,10 @@ async fn prune_partitions(
     // Applies `filter` to `batch` returning `None` on error
     let do_filter = |filter| -> Option<ArrayRef> {
         let expr = create_physical_expr(filter, &df_schema, &schema, &props).ok()?;
-        Some(expr.evaluate(&batch).ok()?.into_array(partitions.len()))
+        expr.evaluate(&batch)
+            .ok()?
+            .into_array(partitions.len())
+            .ok()
     };
 
     //.Compute the conjunction of the filters, ignoring errors
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index 3efb0df9df..68e996391c 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -336,7 +336,7 @@ impl PartitionColumnProjector {
                     &mut self.key_buffer_cache,
                     partition_value.as_ref(),
                     file_batch.num_rows(),
-                ),
+                )?,
             )
         }
 
@@ -396,11 +396,11 @@ fn create_dict_array<T>(
     dict_val: &ScalarValue,
     len: usize,
     data_type: DataType,
-) -> ArrayRef
+) -> Result<ArrayRef>
 where
     T: ArrowNativeType,
 {
-    let dict_vals = dict_val.to_array();
+    let dict_vals = dict_val.to_array()?;
 
     let sliced_key_buffer = buffer_gen.get_buffer(len);
 
@@ -409,16 +409,16 @@ where
         .len(len)
         .add_buffer(sliced_key_buffer);
     builder = builder.add_child_data(dict_vals.to_data());
-    Arc::new(DictionaryArray::<UInt16Type>::from(
+    Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
         builder.build().unwrap(),
-    ))
+    )))
 }
 
 fn create_output_array(
     key_buffer_cache: &mut ZeroBufferGenerators,
     val: &ScalarValue,
     len: usize,
-) -> ArrayRef {
+) -> Result<ArrayRef> {
     if let ScalarValue::Dictionary(key_type, dict_val) = &val {
         match key_type.as_ref() {
             DataType::Int8 => {
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
index 0f4b09caed..5fe0a0a13a 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
@@ -126,7 +126,7 @@ impl ArrowPredicate for DatafusionArrowPredicate {
         match self
             .physical_expr
             .evaluate(&batch)
-            .map(|v| v.into_array(batch.num_rows()))
+            .and_then(|v| v.into_array(batch.num_rows()))
         {
             Ok(array) => {
                 let bool_arr = as_boolean_array(&array)?.clone();
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 91bceed916..dc6ef50bc1 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -405,7 +405,7 @@ macro_rules! get_min_max_values {
             .flatten()
             // column either didn't have statistics at all or didn't have min/max values
             .or_else(|| Some(null_scalar.clone()))
-            .map(|s| s.to_array())
+            .and_then(|s| s.to_array().ok())
     }}
 }
 
@@ -425,7 +425,7 @@ macro_rules! get_null_count_values {
             },
         );
 
-        Some(value.to_array())
+        value.to_array().ok()
     }};
 }
 
diff --git a/datafusion/expr/src/columnar_value.rs b/datafusion/expr/src/columnar_value.rs
index c72aae69c8..7a28839281 100644
--- a/datafusion/expr/src/columnar_value.rs
+++ b/datafusion/expr/src/columnar_value.rs
@@ -20,7 +20,7 @@
 use arrow::array::ArrayRef;
 use arrow::array::NullArray;
 use arrow::datatypes::DataType;
-use datafusion_common::ScalarValue;
+use datafusion_common::{Result, ScalarValue};
 use std::sync::Arc;
 
 /// Represents the result of evaluating an expression: either a single
@@ -47,11 +47,15 @@ impl ColumnarValue {
 
     /// Convert a columnar value into an ArrayRef. [`Self::Scalar`] is
     /// converted by repeating the same scalar multiple times.
-    pub fn into_array(self, num_rows: usize) -> ArrayRef {
-        match self {
+    ///
+    /// # Errors
+    ///
+    /// Errors if `self` is a Scalar that fails to be converted into an array of size
+    pub fn into_array(self, num_rows: usize) -> Result<ArrayRef> {
+        Ok(match self {
             ColumnarValue::Array(array) => array,
-            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
-        }
+            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
+        })
     }
 
     /// null columnar values are implemented as a null array in order to pass batch
diff --git a/datafusion/expr/src/window_state.rs b/datafusion/expr/src/window_state.rs
index 4ea9ecea5f..de88396d9b 100644
--- a/datafusion/expr/src/window_state.rs
+++ b/datafusion/expr/src/window_state.rs
@@ -98,7 +98,7 @@ impl WindowAggState {
     }
 
     pub fn new(out_type: &DataType) -> Result<Self> {
-        let empty_out_col = ScalarValue::try_from(out_type)?.to_array_of_size(0);
+        let empty_out_col = ScalarValue::try_from(out_type)?.to_array_of_size(0)?;
         Ok(Self {
             window_frame_range: Range { start: 0, end: 0 },
             window_frame_ctx: None,
diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
index 468981a5fb..907c12b7af 100644
--- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
+++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
@@ -1089,8 +1089,12 @@ mod tests {
                 // Verify that calling the arrow
                 // cast kernel yields the same results
                 // input array
-                let literal_array = literal.to_array_of_size(1);
-                let expected_array = expected_value.to_array_of_size(1);
+                let literal_array = literal
+                    .to_array_of_size(1)
+                    .expect("Failed to convert to array of size");
+                let expected_array = expected_value
+                    .to_array_of_size(1)
+                    .expect("Failed to convert to array of size");
                 let cast_array = cast_with_options(
                     &literal_array,
                     &target_type,
diff --git a/datafusion/physical-expr/src/aggregate/correlation.rs b/datafusion/physical-expr/src/aggregate/correlation.rs
index 475bfa4ce0..61f2db5c8e 100644
--- a/datafusion/physical-expr/src/aggregate/correlation.rs
+++ b/datafusion/physical-expr/src/aggregate/correlation.rs
@@ -505,13 +505,17 @@ mod tests {
 
         let values1 = expr1
             .iter()
-            .map(|e| e.evaluate(batch1))
-            .map(|r| r.map(|v| v.into_array(batch1.num_rows())))
+            .map(|e| {
+                e.evaluate(batch1)
+                    .and_then(|v| v.into_array(batch1.num_rows()))
+            })
             .collect::<Result<Vec<_>>>()?;
         let values2 = expr2
             .iter()
-            .map(|e| e.evaluate(batch2))
-            .map(|r| r.map(|v| v.into_array(batch2.num_rows())))
+            .map(|e| {
+                e.evaluate(batch2)
+                    .and_then(|v| v.into_array(batch2.num_rows()))
+            })
             .collect::<Result<Vec<_>>>()?;
         accum1.update_batch(&values1)?;
         accum2.update_batch(&values2)?;
diff --git a/datafusion/physical-expr/src/aggregate/covariance.rs b/datafusion/physical-expr/src/aggregate/covariance.rs
index 5e589d4e39..0f838eb6fa 100644
--- a/datafusion/physical-expr/src/aggregate/covariance.rs
+++ b/datafusion/physical-expr/src/aggregate/covariance.rs
@@ -754,13 +754,17 @@ mod tests {
 
         let values1 = expr1
             .iter()
-            .map(|e| e.evaluate(batch1))
-            .map(|r| r.map(|v| v.into_array(batch1.num_rows())))
+            .map(|e| {
+                e.evaluate(batch1)
+                    .and_then(|v| v.into_array(batch1.num_rows()))
+            })
             .collect::<Result<Vec<_>>>()?;
         let values2 = expr2
             .iter()
-            .map(|e| e.evaluate(batch2))
-            .map(|r| r.map(|v| v.into_array(batch2.num_rows())))
+            .map(|e| {
+                e.evaluate(batch2)
+                    .and_then(|v| v.into_array(batch2.num_rows()))
+            })
             .collect::<Result<Vec<_>>>()?;
         accum1.update_batch(&values1)?;
         accum2.update_batch(&values2)?;
diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs
index a4e0a6dc49..0dc27dede8 100644
--- a/datafusion/physical-expr/src/aggregate/first_last.rs
+++ b/datafusion/physical-expr/src/aggregate/first_last.rs
@@ -587,7 +587,10 @@ mod tests {
         let mut states = vec![];
 
         for idx in 0..state1.len() {
-            states.push(concat(&[&state1[idx].to_array(), &state2[idx].to_array()])?);
+            states.push(concat(&[
+                &state1[idx].to_array()?,
+                &state2[idx].to_array()?,
+            ])?);
         }
 
         let mut first_accumulator =
@@ -614,7 +617,10 @@ mod tests {
         let mut states = vec![];
 
         for idx in 0..state1.len() {
-            states.push(concat(&[&state1[idx].to_array(), &state2[idx].to_array()])?);
+            states.push(concat(&[
+                &state1[idx].to_array()?,
+                &state2[idx].to_array()?,
+            ])?);
         }
 
         let mut last_accumulator =
diff --git a/datafusion/physical-expr/src/aggregate/stddev.rs b/datafusion/physical-expr/src/aggregate/stddev.rs
index 330507d6ff..64e19ef502 100644
--- a/datafusion/physical-expr/src/aggregate/stddev.rs
+++ b/datafusion/physical-expr/src/aggregate/stddev.rs
@@ -445,13 +445,17 @@ mod tests {
 
         let values1 = expr1
             .iter()
-            .map(|e| e.evaluate(batch1))
-            .map(|r| r.map(|v| v.into_array(batch1.num_rows())))
+            .map(|e| {
+                e.evaluate(batch1)
+                    .and_then(|v| v.into_array(batch1.num_rows()))
+            })
             .collect::<Result<Vec<_>>>()?;
         let values2 = expr2
             .iter()
-            .map(|e| e.evaluate(batch2))
-            .map(|r| r.map(|v| v.into_array(batch2.num_rows())))
+            .map(|e| {
+                e.evaluate(batch2)
+                    .and_then(|v| v.into_array(batch2.num_rows()))
+            })
             .collect::<Result<Vec<_>>>()?;
         accum1.update_batch(&values1)?;
         accum2.update_batch(&values2)?;
diff --git a/datafusion/physical-expr/src/aggregate/utils.rs b/datafusion/physical-expr/src/aggregate/utils.rs
index da3a527132..e5421ef5ab 100644
--- a/datafusion/physical-expr/src/aggregate/utils.rs
+++ b/datafusion/physical-expr/src/aggregate/utils.rs
@@ -36,11 +36,11 @@ use std::sync::Arc;
 pub fn get_accum_scalar_values_as_arrays(
     accum: &dyn Accumulator,
 ) -> Result<Vec<ArrayRef>> {
-    Ok(accum
+    accum
         .state()?
         .iter()
         .map(|s| s.to_array_of_size(1))
-        .collect::<Vec<_>>())
+        .collect::<Result<Vec<_>>>()
 }
 
 /// Computes averages for `Decimal128`/`Decimal256` values, checking for overflow
diff --git a/datafusion/physical-expr/src/aggregate/variance.rs b/datafusion/physical-expr/src/aggregate/variance.rs
index a720dd833a..d82c5ad562 100644
--- a/datafusion/physical-expr/src/aggregate/variance.rs
+++ b/datafusion/physical-expr/src/aggregate/variance.rs
@@ -519,13 +519,17 @@ mod tests {
 
         let values1 = expr1
             .iter()
-            .map(|e| e.evaluate(batch1))
-            .map(|r| r.map(|v| v.into_array(batch1.num_rows())))
+            .map(|e| {
+                e.evaluate(batch1)
+                    .and_then(|v| v.into_array(batch1.num_rows()))
+            })
             .collect::<Result<Vec<_>>>()?;
         let values2 = expr2
             .iter()
-            .map(|e| e.evaluate(batch2))
-            .map(|r| r.map(|v| v.into_array(batch2.num_rows())))
+            .map(|e| {
+                e.evaluate(batch2)
+                    .and_then(|v| v.into_array(batch2.num_rows()))
+            })
             .collect::<Result<Vec<_>>>()?;
         accum1.update_batch(&values1)?;
         accum2.update_batch(&values2)?;
diff --git a/datafusion/physical-expr/src/conditional_expressions.rs b/datafusion/physical-expr/src/conditional_expressions.rs
index 37adb2d71c..a9a25ffe2e 100644
--- a/datafusion/physical-expr/src/conditional_expressions.rs
+++ b/datafusion/physical-expr/src/conditional_expressions.rs
@@ -54,7 +54,7 @@ pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
                     if value.is_null() {
                         continue;
                     } else {
-                        let last_value = value.to_array_of_size(size);
+                        let last_value = value.to_array_of_size(size)?;
                         current_value =
                             zip(&remainder, &last_value, current_value.as_ref())?;
                         break;
diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs
index 3b61e7f48d..5b597de78a 100644
--- a/datafusion/physical-expr/src/datetime_expressions.rs
+++ b/datafusion/physical-expr/src/datetime_expressions.rs
@@ -852,7 +852,7 @@ pub fn date_part(args: &[ColumnarValue]) -> Result<ColumnarValue> {
 
     let array = match array {
         ColumnarValue::Array(array) => array.clone(),
-        ColumnarValue::Scalar(scalar) => scalar.to_array(),
+        ColumnarValue::Scalar(scalar) => scalar.to_array()?,
     };
 
     let arr = match date_part.to_lowercase().as_str() {
diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs
index 63fa98011f..0a05a479e5 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -304,8 +304,8 @@ impl PhysicalExpr for BinaryExpr {
 
         // if both arrays or both literals - extract arrays and continue execution
         let (left, right) = (
-            lhs.into_array(batch.num_rows()),
-            rhs.into_array(batch.num_rows()),
+            lhs.into_array(batch.num_rows())?,
+            rhs.into_array(batch.num_rows())?,
         );
         self.evaluate_with_resolved_args(left, &left_data_type, right, &right_data_type)
             .map(ColumnarValue::Array)
@@ -597,7 +597,10 @@ mod tests {
         let batch =
             RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?;
 
-        let result = lt.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = lt
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         assert_eq!(result.len(), 5);
 
         let expected = [false, false, true, true, true];
@@ -641,7 +644,10 @@ mod tests {
 
         assert_eq!("a@0 < b@1 OR a@0 = b@1", format!("{expr}"));
 
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         assert_eq!(result.len(), 5);
 
         let expected = [true, true, false, true, false];
@@ -685,7 +691,7 @@ mod tests {
             assert_eq!(expression.data_type(&schema)?, $C_TYPE);
 
             // compute
-            let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
+            let result = expression.evaluate(&batch)?.into_array(batch.num_rows()).expect("Failed to convert to array");
 
             // verify that the array's data_type is correct
             assert_eq!(*result.data_type(), $C_TYPE);
@@ -2138,7 +2144,10 @@ mod tests {
         let arithmetic_op =
             binary_op(col("a", &schema)?, op, col("b", &schema)?, &schema)?;
         let batch = RecordBatch::try_new(schema, data)?;
-        let result = arithmetic_op.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = arithmetic_op
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
 
         assert_eq!(result.as_ref(), &expected);
         Ok(())
@@ -2154,7 +2163,10 @@ mod tests {
         let lit = Arc::new(Literal::new(literal));
         let arithmetic_op = binary_op(col("a", &schema)?, op, lit, &schema)?;
         let batch = RecordBatch::try_new(schema, data)?;
-        let result = arithmetic_op.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = arithmetic_op
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
 
         assert_eq!(&result, &expected);
         Ok(())
@@ -2170,7 +2182,10 @@ mod tests {
         let op = binary_op(col("a", schema)?, op, col("b", schema)?, schema)?;
         let data: Vec<ArrayRef> = vec![left.clone(), right.clone()];
         let batch = RecordBatch::try_new(schema.clone(), data)?;
-        let result = op.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = op
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
 
         assert_eq!(result.as_ref(), &expected);
         Ok(())
@@ -2187,7 +2202,10 @@ mod tests {
         let scalar = lit(scalar.clone());
         let op = binary_op(scalar, op, col("a", schema)?, schema)?;
         let batch = RecordBatch::try_new(Arc::clone(schema), vec![Arc::clone(arr)])?;
-        let result = op.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = op
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         assert_eq!(result.as_ref(), expected);
 
         Ok(())
@@ -2204,7 +2222,10 @@ mod tests {
         let scalar = lit(scalar.clone());
         let op = binary_op(col("a", schema)?, op, scalar, schema)?;
         let batch = RecordBatch::try_new(Arc::clone(schema), vec![Arc::clone(arr)])?;
-        let result = op.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = op
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         assert_eq!(result.as_ref(), expected);
 
         Ok(())
@@ -2776,7 +2797,8 @@ mod tests {
         let result = expr
             .evaluate(&batch)
             .expect("evaluation")
-            .into_array(batch.num_rows());
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
 
         let expected: Int32Array = input
             .into_iter()
@@ -3255,7 +3277,10 @@ mod tests {
         let arithmetic_op = binary_op(col("a", schema)?, op, col("b", schema)?, schema)?;
         let data: Vec<ArrayRef> = vec![left.clone(), right.clone()];
         let batch = RecordBatch::try_new(schema.clone(), data)?;
-        let result = arithmetic_op.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = arithmetic_op
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
 
         assert_eq!(result.as_ref(), expected.as_ref());
         Ok(())
diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs
index a2395c4a0c..5fcfd61d90 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -126,7 +126,7 @@ impl CaseExpr {
         let return_type = self.data_type(&batch.schema())?;
         let expr = self.expr.as_ref().unwrap();
         let base_value = expr.evaluate(batch)?;
-        let base_value = base_value.into_array(batch.num_rows());
+        let base_value = base_value.into_array(batch.num_rows())?;
         let base_nulls = is_null(base_value.as_ref())?;
 
         // start with nulls as default output
@@ -137,7 +137,7 @@ impl CaseExpr {
             let when_value = self.when_then_expr[i]
                 .0
                 .evaluate_selection(batch, &remainder)?;
-            let when_value = when_value.into_array(batch.num_rows());
+            let when_value = when_value.into_array(batch.num_rows())?;
             // build boolean array representing which rows match the "when" value
             let when_match = eq(&when_value, &base_value)?;
             // Treat nulls as false
@@ -153,7 +153,7 @@ impl CaseExpr {
                 ColumnarValue::Scalar(value) if value.is_null() => {
                     new_null_array(&return_type, batch.num_rows())
                 }
-                _ => then_value.into_array(batch.num_rows()),
+                _ => then_value.into_array(batch.num_rows())?,
             };
 
             current_value =
@@ -170,7 +170,7 @@ impl CaseExpr {
             remainder = or(&base_nulls, &remainder)?;
             let else_ = expr
                 .evaluate_selection(batch, &remainder)?
-                .into_array(batch.num_rows());
+                .into_array(batch.num_rows())?;
             current_value = zip(&remainder, else_.as_ref(), current_value.as_ref())?;
         }
 
@@ -194,7 +194,7 @@ impl CaseExpr {
             let when_value = self.when_then_expr[i]
                 .0
                 .evaluate_selection(batch, &remainder)?;
-            let when_value = when_value.into_array(batch.num_rows());
+            let when_value = when_value.into_array(batch.num_rows())?;
             let when_value = as_boolean_array(&when_value).map_err(|e| {
                 DataFusionError::Context(
                     "WHEN expression did not return a BooleanArray".to_string(),
@@ -214,7 +214,7 @@ impl CaseExpr {
                 ColumnarValue::Scalar(value) if value.is_null() => {
                     new_null_array(&return_type, batch.num_rows())
                 }
-                _ => then_value.into_array(batch.num_rows()),
+                _ => then_value.into_array(batch.num_rows())?,
             };
 
             current_value =
@@ -231,7 +231,7 @@ impl CaseExpr {
                 .unwrap_or_else(|_| e.clone());
             let else_ = expr
                 .evaluate_selection(batch, &remainder)?
-                .into_array(batch.num_rows());
+                .into_array(batch.num_rows())?;
             current_value = zip(&remainder, else_.as_ref(), current_value.as_ref())?;
         }
 
@@ -425,7 +425,10 @@ mod tests {
             None,
             schema.as_ref(),
         )?;
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         let result = as_int32_array(&result)?;
 
         let expected = &Int32Array::from(vec![Some(123), None, None, Some(456)]);
@@ -453,7 +456,10 @@ mod tests {
             Some(else_value),
             schema.as_ref(),
         )?;
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         let result = as_int32_array(&result)?;
 
         let expected =
@@ -485,7 +491,10 @@ mod tests {
             Some(else_value),
             schema.as_ref(),
         )?;
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         let result =
             as_float64_array(&result).expect("failed to downcast to Float64Array");
 
@@ -523,7 +532,10 @@ mod tests {
             None,
             schema.as_ref(),
         )?;
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         let result = as_int32_array(&result)?;
 
         let expected = &Int32Array::from(vec![Some(123), None, None, Some(456)]);
@@ -551,7 +563,10 @@ mod tests {
             Some(else_value),
             schema.as_ref(),
         )?;
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         let result = as_int32_array(&result)?;
 
         let expected =
@@ -583,7 +598,10 @@ mod tests {
             Some(x),
             schema.as_ref(),
         )?;
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         let result =
             as_float64_array(&result).expect("failed to downcast to Float64Array");
 
@@ -629,7 +647,10 @@ mod tests {
             Some(else_value),
             schema.as_ref(),
         )?;
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         let result = as_int32_array(&result)?;
 
         let expected =
@@ -661,7 +682,10 @@ mod tests {
             Some(else_value),
             schema.as_ref(),
         )?;
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         let result =
             as_float64_array(&result).expect("failed to downcast to Float64Array");
 
@@ -693,7 +717,10 @@ mod tests {
             None,
             schema.as_ref(),
         )?;
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         let result =
             as_float64_array(&result).expect("failed to downcast to Float64Array");
 
@@ -721,7 +748,10 @@ mod tests {
             None,
             schema.as_ref(),
         )?;
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         let result =
             as_float64_array(&result).expect("failed to downcast to Float64Array");
 
diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs
index 5d56af3646..780e042156 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -178,7 +178,7 @@ pub fn cast_column(
             kernels::cast::cast_with_options(array, cast_type, &cast_options)?,
         )),
         ColumnarValue::Scalar(scalar) => {
-            let scalar_array = scalar.to_array();
+            let scalar_array = scalar.to_array()?;
             let cast_array = kernels::cast::cast_with_options(
                 &scalar_array,
                 cast_type,
@@ -263,7 +263,10 @@ mod tests {
             assert_eq!(expression.data_type(&schema)?, $TYPE);
 
             // compute
-            let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
+            let result = expression
+                .evaluate(&batch)?
+                .into_array(batch.num_rows())
+                .expect("Failed to convert to array");
 
             // verify that the array's data_type is correct
             assert_eq!(*result.data_type(), $TYPE);
@@ -312,7 +315,10 @@ mod tests {
             assert_eq!(expression.data_type(&schema)?, $TYPE);
 
             // compute
-            let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
+            let result = expression
+                .evaluate(&batch)?
+                .into_array(batch.num_rows())
+                .expect("Failed to convert to array");
 
             // verify that the array's data_type is correct
             assert_eq!(*result.data_type(), $TYPE);
diff --git a/datafusion/physical-expr/src/expressions/datum.rs b/datafusion/physical-expr/src/expressions/datum.rs
index f57cbbd4ff..2bb79922cf 100644
--- a/datafusion/physical-expr/src/expressions/datum.rs
+++ b/datafusion/physical-expr/src/expressions/datum.rs
@@ -34,14 +34,14 @@ pub(crate) fn apply(
         (ColumnarValue::Array(left), ColumnarValue::Array(right)) => {
             Ok(ColumnarValue::Array(f(&left.as_ref(), &right.as_ref())?))
         }
-        (ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => {
-            Ok(ColumnarValue::Array(f(&left.to_scalar(), &right.as_ref())?))
-        }
-        (ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => {
-            Ok(ColumnarValue::Array(f(&left.as_ref(), &right.to_scalar())?))
-        }
+        (ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => Ok(
+            ColumnarValue::Array(f(&left.to_scalar()?, &right.as_ref())?),
+        ),
+        (ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => Ok(
+            ColumnarValue::Array(f(&left.as_ref(), &right.to_scalar()?)?),
+        ),
         (ColumnarValue::Scalar(left), ColumnarValue::Scalar(right)) => {
-            let array = f(&left.to_scalar(), &right.to_scalar())?;
+            let array = f(&left.to_scalar()?, &right.to_scalar()?)?;
             let scalar = ScalarValue::try_from_array(array.as_ref(), 0)?;
             Ok(ColumnarValue::Scalar(scalar))
         }
diff --git a/datafusion/physical-expr/src/expressions/get_indexed_field.rs b/datafusion/physical-expr/src/expressions/get_indexed_field.rs
index df79e28358..7d5f16c454 100644
--- a/datafusion/physical-expr/src/expressions/get_indexed_field.rs
+++ b/datafusion/physical-expr/src/expressions/get_indexed_field.rs
@@ -183,7 +183,7 @@ impl PhysicalExpr for GetIndexedFieldExpr {
     }
 
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
-        let array = self.arg.evaluate(batch)?.into_array(batch.num_rows());
+        let array = self.arg.evaluate(batch)?.into_array(batch.num_rows())?;
         match &self.field {
             GetFieldAccessExpr::NamedStructField{name} => match (array.data_type(), name) {
                 (DataType::Map(_, _), ScalarValue::Utf8(Some(k))) => {
@@ -210,7 +210,7 @@ impl PhysicalExpr for GetIndexedFieldExpr {
                                          with utf8 indexes. Tried {dt:?} with {name:?} index"),
             },
             GetFieldAccessExpr::ListIndex{key} => {
-            let key = key.evaluate(batch)?.into_array(batch.num_rows());
+            let key = key.evaluate(batch)?.into_array(batch.num_rows())?;
             match (array.data_type(), key.data_type()) {
                 (DataType::List(_), DataType::Int64) => Ok(ColumnarValue::Array(array_element(&[
                     array, key
@@ -224,8 +224,8 @@ impl PhysicalExpr for GetIndexedFieldExpr {
                         }
                 },
             GetFieldAccessExpr::ListRange{start, stop} => {
-                let start = start.evaluate(batch)?.into_array(batch.num_rows());
-                let stop = stop.evaluate(batch)?.into_array(batch.num_rows());
+                let start = start.evaluate(batch)?.into_array(batch.num_rows())?;
+                let stop = stop.evaluate(batch)?.into_array(batch.num_rows())?;
                 match (array.data_type(), start.data_type(), stop.data_type()) {
                     (DataType::List(_), DataType::Int64, DataType::Int64) => Ok(ColumnarValue::Array(array_slice(&[
                         array, start, stop
@@ -326,7 +326,10 @@ mod tests {
         // only one row should be processed
         let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)])?;
         let expr = Arc::new(GetIndexedFieldExpr::new_field(expr, "a"));
-        let result = expr.evaluate(&batch)?.into_array(1);
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(1)
+            .expect("Failed to convert to array");
         let result =
             as_boolean_array(&result).expect("failed to downcast to BooleanArray");
         assert_eq!(boolean, result.clone());
@@ -383,7 +386,10 @@ mod tests {
             vec![Arc::new(list_col), Arc::new(key_col)],
         )?;
         let expr = Arc::new(GetIndexedFieldExpr::new_index(expr, key));
-        let result = expr.evaluate(&batch)?.into_array(1);
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(1)
+            .expect("Failed to convert to array");
         let result = as_string_array(&result).expect("failed to downcast to ListArray");
         let expected = StringArray::from(expected_list);
         assert_eq!(expected, result.clone());
@@ -419,7 +425,10 @@ mod tests {
             vec![Arc::new(list_col), Arc::new(start_col), Arc::new(stop_col)],
         )?;
         let expr = Arc::new(GetIndexedFieldExpr::new_range(expr, start, stop));
-        let result = expr.evaluate(&batch)?.into_array(1);
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(1)
+            .expect("Failed to convert to array");
         let result = as_list_array(&result).expect("failed to downcast to ListArray");
         let (expected, _, _) =
             build_list_arguments(expected_list, vec![None], vec![None]);
@@ -440,7 +449,10 @@ mod tests {
             vec![Arc::new(list_builder.finish()), key_array],
         )?;
         let expr = Arc::new(GetIndexedFieldExpr::new_index(expr, key));
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         assert!(result.is_null(0));
         Ok(())
     }
@@ -461,7 +473,10 @@ mod tests {
             vec![Arc::new(list_builder.finish()), Arc::new(key_array)],
         )?;
         let expr = Arc::new(GetIndexedFieldExpr::new_index(expr, key));
-        let result = expr.evaluate(&batch)?.into_array(1);
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(1)
+            .expect("Failed to convert to array");
         assert!(result.is_null(0));
         Ok(())
     }
diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs
index 8d55fb70bd..625b01ec9a 100644
--- a/datafusion/physical-expr/src/expressions/in_list.rs
+++ b/datafusion/physical-expr/src/expressions/in_list.rs
@@ -351,15 +351,15 @@ impl PhysicalExpr for InListExpr {
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
         let value = self.expr.evaluate(batch)?;
         let r = match &self.static_filter {
-            Some(f) => f.contains(value.into_array(1).as_ref(), self.negated)?,
+            Some(f) => f.contains(value.into_array(1)?.as_ref(), self.negated)?,
             None => {
-                let value = value.into_array(batch.num_rows());
+                let value = value.into_array(batch.num_rows())?;
                 let found = self.list.iter().map(|expr| expr.evaluate(batch)).try_fold(
                     BooleanArray::new(BooleanBuffer::new_unset(batch.num_rows()), None),
                     |result, expr| -> Result<BooleanArray> {
                         Ok(or_kleene(
                             &result,
-                            &eq(&value, &expr?.into_array(batch.num_rows()))?,
+                            &eq(&value, &expr?.into_array(batch.num_rows())?)?,
                         )?)
                     },
                 )?;
@@ -501,7 +501,10 @@ mod tests {
         ($BATCH:expr, $LIST:expr, $NEGATED:expr, $EXPECTED:expr, $COL:expr, $SCHEMA:expr) => {{
             let (cast_expr, cast_list_exprs) = in_list_cast($COL, $LIST, $SCHEMA)?;
             let expr = in_list(cast_expr, cast_list_exprs, $NEGATED, $SCHEMA).unwrap();
-            let result = expr.evaluate(&$BATCH)?.into_array($BATCH.num_rows());
+            let result = expr
+                .evaluate(&$BATCH)?
+                .into_array($BATCH.num_rows())
+                .expect("Failed to convert to array");
             let result =
                 as_boolean_array(&result).expect("failed to downcast to BooleanArray");
             let expected = &BooleanArray::from($EXPECTED);
diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs
index da717a517f..2e6a2bec9c 100644
--- a/datafusion/physical-expr/src/expressions/is_not_null.rs
+++ b/datafusion/physical-expr/src/expressions/is_not_null.rs
@@ -132,7 +132,10 @@ mod tests {
         let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)])?;
 
         // expression: "a is not null"
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         let result =
             as_boolean_array(&result).expect("failed to downcast to BooleanArray");
 
diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs
index ee7897edd4..3ad4058dd6 100644
--- a/datafusion/physical-expr/src/expressions/is_null.rs
+++ b/datafusion/physical-expr/src/expressions/is_null.rs
@@ -134,7 +134,10 @@ mod tests {
         let expr = is_null(col("a", &schema)?).unwrap();
         let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)])?;
 
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         let result =
             as_boolean_array(&result).expect("failed to downcast to BooleanArray");
 
diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs
index e833eabbff..37452e2784 100644
--- a/datafusion/physical-expr/src/expressions/like.rs
+++ b/datafusion/physical-expr/src/expressions/like.rs
@@ -201,7 +201,10 @@ mod test {
             )?;
 
             // compute
-            let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
+            let result = expression
+                .evaluate(&batch)?
+                .into_array(batch.num_rows())
+                .expect("Failed to convert to array");
             let result =
                 as_boolean_array(&result).expect("failed to downcast to BooleanArray");
             let expected = &BooleanArray::from($VEC);
diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs
index 91cb23d586..cd3b51f091 100644
--- a/datafusion/physical-expr/src/expressions/literal.rs
+++ b/datafusion/physical-expr/src/expressions/literal.rs
@@ -131,7 +131,10 @@ mod tests {
         let literal_expr = lit(42i32);
         assert_eq!("42", format!("{literal_expr}"));
 
-        let literal_array = literal_expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let literal_array = literal_expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         let literal_array = as_int32_array(&literal_array)?;
 
         // note that the contents of the literal array are unrelated to the batch contents except for the length of the array
diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs
index c44b3cf01d..1919cac979 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -247,8 +247,10 @@ pub(crate) mod tests {
         let expr = agg.expressions();
         let values = expr
             .iter()
-            .map(|e| e.evaluate(batch))
-            .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+            .map(|e| {
+                e.evaluate(batch)
+                    .and_then(|v| v.into_array(batch.num_rows()))
+            })
             .collect::<Result<Vec<_>>>()?;
         accum.update_batch(&values)?;
         accum.evaluate()
@@ -262,8 +264,10 @@ pub(crate) mod tests {
         let expr = agg.expressions();
         let values = expr
             .iter()
-            .map(|e| e.evaluate(batch))
-            .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+            .map(|e| {
+                e.evaluate(batch)
+                    .and_then(|v| v.into_array(batch.num_rows()))
+            })
             .collect::<Result<Vec<_>>>()?;
         let indices = vec![0; batch.num_rows()];
         accum.update_batch(&values, &indices, None, 1)?;
diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs
index 86b000e76a..65b3479411 100644
--- a/datafusion/physical-expr/src/expressions/negative.rs
+++ b/datafusion/physical-expr/src/expressions/negative.rs
@@ -195,7 +195,7 @@ mod tests {
             let expected = &paste!{[<$DATA_TY Array>]::from(arr_expected)};
             let batch =
                 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(input)])?;
-            let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+            let result = expr.evaluate(&batch)?.into_array(batch.num_rows()).expect("Failed to convert to array");
             let result =
                 as_primitive_array(&result).expect(format!("failed to downcast to {:?}Array", $DATA_TY).as_str());
             assert_eq!(result, expected);
diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs
index c154fad100..4ceccc6932 100644
--- a/datafusion/physical-expr/src/expressions/not.rs
+++ b/datafusion/physical-expr/src/expressions/not.rs
@@ -150,7 +150,10 @@ mod tests {
         let batch =
             RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(input)])?;
 
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
         let result =
             as_boolean_array(&result).expect("failed to downcast to BooleanArray");
         assert_eq!(result, expected);
diff --git a/datafusion/physical-expr/src/expressions/nullif.rs b/datafusion/physical-expr/src/expressions/nullif.rs
index 7bbe9d73d4..252bd10c3e 100644
--- a/datafusion/physical-expr/src/expressions/nullif.rs
+++ b/datafusion/physical-expr/src/expressions/nullif.rs
@@ -37,7 +37,7 @@ pub fn nullif_func(args: &[ColumnarValue]) -> Result<ColumnarValue> {
 
     match (lhs, rhs) {
         (ColumnarValue::Array(lhs), ColumnarValue::Scalar(rhs)) => {
-            let rhs = rhs.to_scalar();
+            let rhs = rhs.to_scalar()?;
             let array = nullif(lhs, &eq(&lhs, &rhs)?)?;
 
             Ok(ColumnarValue::Array(array))
@@ -47,7 +47,7 @@ pub fn nullif_func(args: &[ColumnarValue]) -> Result<ColumnarValue> {
             Ok(ColumnarValue::Array(array))
         }
         (ColumnarValue::Scalar(lhs), ColumnarValue::Array(rhs)) => {
-            let lhs = lhs.to_array_of_size(rhs.len());
+            let lhs = lhs.to_array_of_size(rhs.len())?;
             let array = nullif(&lhs, &eq(&lhs, &rhs)?)?;
             Ok(ColumnarValue::Array(array))
         }
@@ -89,7 +89,7 @@ mod tests {
         let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));
 
         let result = nullif_func(&[a, lit_array])?;
-        let result = result.into_array(0);
+        let result = result.into_array(0).expect("Failed to convert to array");
 
         let expected = Arc::new(Int32Array::from(vec![
             Some(1),
@@ -115,7 +115,7 @@ mod tests {
         let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(1i32)));
 
         let result = nullif_func(&[a, lit_array])?;
-        let result = result.into_array(0);
+        let result = result.into_array(0).expect("Failed to convert to array");
 
         let expected = Arc::new(Int32Array::from(vec![
             None,
@@ -140,7 +140,7 @@ mod tests {
         let lit_array = ColumnarValue::Scalar(ScalarValue::Boolean(Some(false)));
 
         let result = nullif_func(&[a, lit_array])?;
-        let result = result.into_array(0);
+        let result = result.into_array(0).expect("Failed to convert to array");
 
         let expected =
             Arc::new(BooleanArray::from(vec![Some(true), None, None])) as ArrayRef;
@@ -157,7 +157,7 @@ mod tests {
         let lit_array = ColumnarValue::Scalar(ScalarValue::Utf8(Some("bar".to_string())));
 
         let result = nullif_func(&[a, lit_array])?;
-        let result = result.into_array(0);
+        let result = result.into_array(0).expect("Failed to convert to array");
 
         let expected = Arc::new(StringArray::from(vec![
             Some("foo"),
@@ -178,7 +178,7 @@ mod tests {
         let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));
 
         let result = nullif_func(&[lit_array, a])?;
-        let result = result.into_array(0);
+        let result = result.into_array(0).expect("Failed to convert to array");
 
         let expected = Arc::new(Int32Array::from(vec![
             Some(2),
@@ -198,7 +198,7 @@ mod tests {
         let b_eq = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));
 
         let result_eq = nullif_func(&[a_eq, b_eq])?;
-        let result_eq = result_eq.into_array(1);
+        let result_eq = result_eq.into_array(1).expect("Failed to convert to array");
 
         let expected_eq = Arc::new(Int32Array::from(vec![None])) as ArrayRef;
 
@@ -208,7 +208,9 @@ mod tests {
         let b_neq = ColumnarValue::Scalar(ScalarValue::Int32(Some(1i32)));
 
         let result_neq = nullif_func(&[a_neq, b_neq])?;
-        let result_neq = result_neq.into_array(1);
+        let result_neq = result_neq
+            .into_array(1)
+            .expect("Failed to convert to array");
 
         let expected_neq = Arc::new(Int32Array::from(vec![Some(2i32)])) as ArrayRef;
         assert_eq!(expected_neq.as_ref(), result_neq.as_ref());
diff --git a/datafusion/physical-expr/src/expressions/try_cast.rs b/datafusion/physical-expr/src/expressions/try_cast.rs
index cba026c565..dea7f9f86a 100644
--- a/datafusion/physical-expr/src/expressions/try_cast.rs
+++ b/datafusion/physical-expr/src/expressions/try_cast.rs
@@ -89,7 +89,7 @@ impl PhysicalExpr for TryCastExpr {
                 Ok(ColumnarValue::Array(cast))
             }
             ColumnarValue::Scalar(scalar) => {
-                let array = scalar.to_array();
+                let array = scalar.to_array()?;
                 let cast_array = cast_with_options(&array, &self.cast_type, &options)?;
                 let cast_scalar = ScalarValue::try_from_array(&cast_array, 0)?;
                 Ok(ColumnarValue::Scalar(cast_scalar))
@@ -187,7 +187,10 @@ mod tests {
             assert_eq!(expression.data_type(&schema)?, $TYPE);
 
             // compute
-            let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
+            let result = expression
+                .evaluate(&batch)?
+                .into_array(batch.num_rows())
+                .expect("Failed to convert to array");
 
             // verify that the array's data_type is correct
             assert_eq!(*result.data_type(), $TYPE);
@@ -235,7 +238,10 @@ mod tests {
             assert_eq!(expression.data_type(&schema)?, $TYPE);
 
             // compute
-            let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
+            let result = expression
+                .evaluate(&batch)?
+                .into_array(batch.num_rows())
+                .expect("Failed to convert to array");
 
             // verify that the array's data_type is correct
             assert_eq!(*result.data_type(), $TYPE);
diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs
index c973232c75..9185ade313 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -239,7 +239,7 @@ where
                 };
                 arg.clone().into_array(expansion_len)
             })
-            .collect::<Vec<ArrayRef>>();
+            .collect::<Result<Vec<_>>>()?;
 
         let result = (inner)(&args);
 
@@ -937,7 +937,7 @@ mod tests {
             match expected {
                 Ok(expected) => {
                     let result = expr.evaluate(&batch)?;
-                    let result = result.into_array(batch.num_rows());
+                    let result = result.into_array(batch.num_rows()).expect("Failed to convert to array");
                     let result = result.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
 
                     // value is correct
@@ -2906,7 +2906,10 @@ mod tests {
 
         // evaluate works
         let batch = RecordBatch::try_new(Arc::new(schema.clone()), columns)?;
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
 
         // downcast works
         let result = as_list_array(&result)?;
@@ -2945,7 +2948,10 @@ mod tests {
 
         // evaluate works
         let batch = RecordBatch::try_new(Arc::new(schema.clone()), columns)?;
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+        let result = expr
+            .evaluate(&batch)?
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
 
         // downcast works
         let result = as_list_array(&result)?;
@@ -3017,8 +3023,11 @@ mod tests {
         let adapter_func = make_scalar_function(dummy_function);
 
         let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
-        let array_arg =
-            ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+        let array_arg = ColumnarValue::Array(
+            ScalarValue::Int64(Some(1))
+                .to_array_of_size(5)
+                .expect("Failed to convert to array of size"),
+        );
         let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?;
         assert_eq!(result, vec![5, 5]);
 
@@ -3030,8 +3039,11 @@ mod tests {
         let adapter_func = make_scalar_function_with_hints(dummy_function, vec![]);
 
         let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
-        let array_arg =
-            ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+        let array_arg = ColumnarValue::Array(
+            ScalarValue::Int64(Some(1))
+                .to_array_of_size(5)
+                .expect("Failed to convert to array of size"),
+        );
         let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?;
         assert_eq!(result, vec![5, 5]);
 
@@ -3046,8 +3058,11 @@ mod tests {
         );
 
         let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
-        let array_arg =
-            ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+        let array_arg = ColumnarValue::Array(
+            ScalarValue::Int64(Some(1))
+                .to_array_of_size(5)
+                .expect("Failed to convert to array of size"),
+        );
         let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?;
         assert_eq!(result, vec![5, 1]);
 
@@ -3056,8 +3071,11 @@ mod tests {
 
     #[test]
     fn test_make_scalar_function_with_hints_on_arrays() -> Result<()> {
-        let array_arg =
-            ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+        let array_arg = ColumnarValue::Array(
+            ScalarValue::Int64(Some(1))
+                .to_array_of_size(5)
+                .expect("Failed to convert to array of size"),
+        );
         let adapter_func = make_scalar_function_with_hints(
             dummy_function,
             vec![Hint::Pad, Hint::AcceptsSingular],
@@ -3077,8 +3095,11 @@ mod tests {
         );
 
         let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
-        let array_arg =
-            ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+        let array_arg = ColumnarValue::Array(
+            ScalarValue::Int64(Some(1))
+                .to_array_of_size(5)
+                .expect("Failed to convert to array of size"),
+        );
         let result = unpack_uint64_array(adapter_func(&[
             array_arg,
             scalar_arg.clone(),
@@ -3097,8 +3118,11 @@ mod tests {
         );
 
         let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
-        let array_arg =
-            ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+        let array_arg = ColumnarValue::Array(
+            ScalarValue::Int64(Some(1))
+                .to_array_of_size(5)
+                .expect("Failed to convert to array of size"),
+        );
         let result = unpack_uint64_array(adapter_func(&[
             array_arg.clone(),
             scalar_arg.clone(),
@@ -3125,8 +3149,11 @@ mod tests {
         );
 
         let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
-        let array_arg =
-            ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+        let array_arg = ColumnarValue::Array(
+            ScalarValue::Int64(Some(1))
+                .to_array_of_size(5)
+                .expect("Failed to convert to array of size"),
+        );
         let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?;
         assert_eq!(result, vec![5, 1]);
 
diff --git a/datafusion/physical-expr/src/intervals/interval_aritmetic.rs b/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
index 1ea9b2d9ae..4b81adfbb1 100644
--- a/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
+++ b/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
@@ -750,7 +750,7 @@ fn cast_scalar_value(
     data_type: &DataType,
     cast_options: &CastOptions,
 ) -> Result<ScalarValue> {
-    let cast_array = cast_with_options(&value.to_array(), data_type, cast_options)?;
+    let cast_array = cast_with_options(&value.to_array()?, data_type, cast_options)?;
     ScalarValue::try_from_array(&cast_array, 0)
 }
 
diff --git a/datafusion/physical-expr/src/math_expressions.rs b/datafusion/physical-expr/src/math_expressions.rs
index 0b7bc34014..af66862aec 100644
--- a/datafusion/physical-expr/src/math_expressions.rs
+++ b/datafusion/physical-expr/src/math_expressions.rs
@@ -769,7 +769,8 @@ mod tests {
         let args = vec![ColumnarValue::Array(Arc::new(NullArray::new(1)))];
         let array = random(&args)
             .expect("failed to initialize function random")
-            .into_array(1);
+            .into_array(1)
+            .expect("Failed to convert to array");
         let floats =
             as_float64_array(&array).expect("failed to initialize function random");
 
diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs
index 64c1d0be04..f318cd3b0f 100644
--- a/datafusion/physical-expr/src/planner.rs
+++ b/datafusion/physical-expr/src/planner.rs
@@ -472,7 +472,7 @@ mod tests {
             ]))],
         )?;
         let result = p.evaluate(&batch)?;
-        let result = result.into_array(4);
+        let result = result.into_array(4).expect("Failed to convert to array");
 
         assert_eq!(
             &result,
diff --git a/datafusion/physical-expr/src/struct_expressions.rs b/datafusion/physical-expr/src/struct_expressions.rs
index baa29d668e..0eed1d16fb 100644
--- a/datafusion/physical-expr/src/struct_expressions.rs
+++ b/datafusion/physical-expr/src/struct_expressions.rs
@@ -67,13 +67,15 @@ fn array_struct(args: &[ArrayRef]) -> Result<ArrayRef> {
 
 /// put values in a struct array.
 pub fn struct_expr(values: &[ColumnarValue]) -> Result<ColumnarValue> {
-    let arrays: Vec<ArrayRef> = values
+    let arrays = values
         .iter()
-        .map(|x| match x {
-            ColumnarValue::Array(array) => array.clone(),
-            ColumnarValue::Scalar(scalar) => scalar.to_array().clone(),
+        .map(|x| {
+            Ok(match x {
+                ColumnarValue::Array(array) => array.clone(),
+                ColumnarValue::Scalar(scalar) => scalar.to_array()?.clone(),
+            })
         })
-        .collect();
+        .collect::<Result<Vec<ArrayRef>>>()?;
     Ok(ColumnarValue::Array(array_struct(arrays.as_slice())?))
 }
 
@@ -93,7 +95,8 @@ mod tests {
         ];
         let struc = struct_expr(&args)
             .expect("failed to initialize function struct")
-            .into_array(1);
+            .into_array(1)
+            .expect("Failed to convert to array");
         let result =
             as_struct_array(&struc).expect("failed to initialize function struct");
         assert_eq!(
diff --git a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
index 66ffa990b7..7aa4f6536a 100644
--- a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
+++ b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
@@ -60,8 +60,10 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
     fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
         self.expressions()
             .iter()
-            .map(|e| e.evaluate(batch))
-            .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+            .map(|e| {
+                e.evaluate(batch)
+                    .and_then(|v| v.into_array(batch.num_rows()))
+            })
             .collect()
     }
 
diff --git a/datafusion/physical-expr/src/window/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs
index f55f1600b9..d22660d41e 100644
--- a/datafusion/physical-expr/src/window/lead_lag.rs
+++ b/datafusion/physical-expr/src/window/lead_lag.rs
@@ -139,6 +139,7 @@ fn create_empty_array(
     let array = value
         .as_ref()
         .map(|scalar| scalar.to_array_of_size(size))
+        .transpose()?
         .unwrap_or_else(|| new_null_array(data_type, size));
     if array.data_type() != data_type {
         cast(&array, data_type).map_err(DataFusionError::ArrowError)
diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs
index 9b0a02d329..b282e35797 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -82,8 +82,10 @@ pub trait WindowExpr: Send + Sync + Debug {
     fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
         self.expressions()
             .iter()
-            .map(|e| e.evaluate(batch))
-            .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+            .map(|e| {
+                e.evaluate(batch)
+                    .and_then(|v| v.into_array(batch.num_rows()))
+            })
             .collect()
     }
 
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs
index 4052d6aef0..3ac8129297 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -1064,10 +1064,11 @@ fn finalize_aggregation(
             // build the vector of states
             let a = accumulators
                 .iter()
-                .map(|accumulator| accumulator.state())
-                .map(|value| {
-                    value.map(|e| {
-                        e.iter().map(|v| v.to_array()).collect::<Vec<ArrayRef>>()
+                .map(|accumulator| {
+                    accumulator.state().and_then(|e| {
+                        e.iter()
+                            .map(|v| v.to_array())
+                            .collect::<Result<Vec<ArrayRef>>>()
                     })
                 })
                 .collect::<Result<Vec<_>>>()?;
@@ -1080,7 +1081,7 @@ fn finalize_aggregation(
             // merge the state to the final value
             accumulators
                 .iter()
-                .map(|accumulator| accumulator.evaluate().map(|v| v.to_array()))
+                .map(|accumulator| accumulator.evaluate().and_then(|v| v.to_array()))
                 .collect::<Result<Vec<ArrayRef>>>()
         }
     }
@@ -1092,9 +1093,11 @@ fn evaluate(
     batch: &RecordBatch,
 ) -> Result<Vec<ArrayRef>> {
     expr.iter()
-        .map(|expr| expr.evaluate(batch))
-        .map(|r| r.map(|v| v.into_array(batch.num_rows())))
-        .collect::<Result<Vec<_>>>()
+        .map(|expr| {
+            expr.evaluate(batch)
+                .and_then(|v| v.into_array(batch.num_rows()))
+        })
+        .collect()
 }
 
 /// Evaluates expressions against a record batch.
@@ -1114,9 +1117,11 @@ fn evaluate_optional(
     expr.iter()
         .map(|expr| {
             expr.as_ref()
-                .map(|expr| expr.evaluate(batch))
+                .map(|expr| {
+                    expr.evaluate(batch)
+                        .and_then(|v| v.into_array(batch.num_rows()))
+                })
                 .transpose()
-                .map(|r| r.map(|v| v.into_array(batch.num_rows())))
         })
         .collect::<Result<Vec<_>>>()
 }
@@ -1140,7 +1145,7 @@ pub(crate) fn evaluate_group_by(
         .iter()
         .map(|(expr, _)| {
             let value = expr.evaluate(batch)?;
-            Ok(value.into_array(batch.num_rows()))
+            value.into_array(batch.num_rows())
         })
         .collect::<Result<Vec<_>>>()?;
 
@@ -1149,7 +1154,7 @@ pub(crate) fn evaluate_group_by(
         .iter()
         .map(|(expr, _)| {
             let value = expr.evaluate(batch)?;
-            Ok(value.into_array(batch.num_rows()))
+            value.into_array(batch.num_rows())
         })
         .collect::<Result<Vec<_>>>()?;
 
diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs
index 32c0bbc78a..90eb488a2e 100644
--- a/datafusion/physical-plan/src/aggregates/no_grouping.rs
+++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs
@@ -217,8 +217,10 @@ fn aggregate_batch(
             // 1.3
             let values = &expr
                 .iter()
-                .map(|e| e.evaluate(&batch))
-                .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+                .map(|e| {
+                    e.evaluate(&batch)
+                        .and_then(|v| v.into_array(batch.num_rows()))
+                })
                 .collect::<Result<Vec<_>>>()?;
 
             // 1.4
diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs
index 0c44b367e5..d560a219f2 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -300,7 +300,7 @@ pub(crate) fn batch_filter(
 ) -> Result<RecordBatch> {
     predicate
         .evaluate(batch)
-        .map(|v| v.into_array(batch.num_rows()))
+        .and_then(|v| v.into_array(batch.num_rows()))
         .and_then(|array| {
             Ok(as_boolean_array(&array)?)
                 // apply filter array to record batch
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs
index 102f0c42e9..4c928d44ca 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -344,7 +344,7 @@ fn build_batch(
         .iter()
         .map(|arr| {
             let scalar = ScalarValue::try_from_array(arr, left_index)?;
-            Ok(scalar.to_array_of_size(batch.num_rows()))
+            scalar.to_array_of_size(batch.num_rows())
         })
         .collect::<Result<Vec<_>>>()?;
 
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs
index 1a2db87d98..546a929bf9 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -713,7 +713,7 @@ where
     // evaluate the keys
     let keys_values = on
         .iter()
-        .map(|c| Ok(c.evaluate(batch)?.into_array(batch.num_rows())))
+        .map(|c| c.evaluate(batch)?.into_array(batch.num_rows()))
         .collect::<Result<Vec<_>>>()?;
 
     // calculate the hash values
@@ -857,13 +857,13 @@ pub fn build_equal_condition_join_indices<T: JoinHashMapType>(
 ) -> Result<(UInt64Array, UInt32Array)> {
     let keys_values = probe_on
         .iter()
-        .map(|c| Ok(c.evaluate(probe_batch)?.into_array(probe_batch.num_rows())))
+        .map(|c| c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))
         .collect::<Result<Vec<_>>>()?;
     let build_join_values = build_on
         .iter()
         .map(|c| {
-            Ok(c.evaluate(build_input_buffer)?
-                .into_array(build_input_buffer.num_rows()))
+            c.evaluate(build_input_buffer)?
+                .into_array(build_input_buffer.num_rows())
         })
         .collect::<Result<Vec<_>>>()?;
     hashes_buffer.clear();
diff --git a/datafusion/physical-plan/src/joins/hash_join_utils.rs b/datafusion/physical-plan/src/joins/hash_join_utils.rs
index c134b23d78..5ebf370b6d 100644
--- a/datafusion/physical-plan/src/joins/hash_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/hash_join_utils.rs
@@ -607,7 +607,7 @@ pub fn update_filter_expr_interval(
         .origin_sorted_expr()
         .expr
         .evaluate(batch)?
-        .into_array(1);
+        .into_array(1)?;
     // Convert the array to a ScalarValue:
     let value = ScalarValue::try_from_array(&array, 0)?;
     // Create a ScalarValue representing positive or negative infinity for the same data type:
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 1306a48744..51561f5dab 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -626,7 +626,9 @@ impl Stream for SymmetricHashJoinStream {
 /// # Returns
 ///
 /// A [Result] object that contains the pruning length. The function will return
-/// an error if there is an issue evaluating the build side filter expression.
+/// an error if
+/// - there is an issue evaluating the build side filter expression;
+/// - there is an issue converting the build side filter expression into an array
 fn determine_prune_length(
     buffer: &RecordBatch,
     build_side_filter_expr: &SortedFilterExpr,
@@ -637,7 +639,7 @@ fn determine_prune_length(
     let batch_arr = origin_sorted_expr
         .expr
         .evaluate(buffer)?
-        .into_array(buffer.num_rows());
+        .into_array(buffer.num_rows())?;
 
     // Get the lower or upper interval based on the sort direction
     let target = if origin_sorted_expr.options.descending {
diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs
index 5efeedfe65..f93f08255e 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -778,7 +778,7 @@ pub(crate) fn apply_join_filter_to_indices(
     let filter_result = filter
         .expression()
         .evaluate(&intermediate_batch)?
-        .into_array(intermediate_batch.num_rows());
+        .into_array(intermediate_batch.num_rows())?;
     let mask = as_boolean_array(&filter_result)?;
 
     let left_filtered = compute::filter(&build_indices, mask)?;
diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs
index bbf0d6d4b3..b8e2d0e425 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -310,8 +310,10 @@ impl ProjectionStream {
         let arrays = self
             .expr
             .iter()
-            .map(|expr| expr.evaluate(batch))
-            .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+            .map(|expr| {
+                expr.evaluate(batch)
+                    .and_then(|v| v.into_array(batch.num_rows()))
+            })
             .collect::<Result<Vec<_>>>()?;
 
         if arrays.is_empty() {
diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs
index 66f7037e5c..9836e057ff 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -169,9 +169,7 @@ impl BatchPartitioner {
 
                     let arrays = exprs
                         .iter()
-                        .map(|expr| {
-                            Ok(expr.evaluate(&batch)?.into_array(batch.num_rows()))
-                        })
+                        .map(|expr| expr.evaluate(&batch)?.into_array(batch.num_rows()))
                         .collect::<Result<Vec<_>>>()?;
 
                     hash_buffer.clear();
diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs
index 4cabdc6e17..135b4fbdec 100644
--- a/datafusion/physical-plan/src/sorts/stream.rs
+++ b/datafusion/physical-plan/src/sorts/stream.rs
@@ -118,7 +118,7 @@ impl RowCursorStream {
         let cols = self
             .column_expressions
             .iter()
-            .map(|expr| Ok(expr.evaluate(batch)?.into_array(batch.num_rows())))
+            .map(|expr| expr.evaluate(batch)?.into_array(batch.num_rows()))
             .collect::<Result<Vec<_>>>()?;
 
         let rows = self.converter.convert_columns(&cols)?;
@@ -181,7 +181,7 @@ impl<T: CursorArray> FieldCursorStream<T> {
 
     fn convert_batch(&mut self, batch: &RecordBatch) -> Result<ArrayValues<T::Values>> {
         let value = self.sort.expr.evaluate(batch)?;
-        let array = value.into_array(batch.num_rows());
+        let array = value.into_array(batch.num_rows())?;
         let array = array.as_any().downcast_ref::<T>().expect("field values");
         Ok(ArrayValues::new(self.sort.options, array))
     }
diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs
index 4638c0dcf2..9120566273 100644
--- a/datafusion/physical-plan/src/topk/mod.rs
+++ b/datafusion/physical-plan/src/topk/mod.rs
@@ -153,7 +153,7 @@ impl TopK {
             .iter()
             .map(|expr| {
                 let value = expr.expr.evaluate(&batch)?;
-                Ok(value.into_array(batch.num_rows()))
+                value.into_array(batch.num_rows())
             })
             .collect::<Result<Vec<_>>>()?;
 
diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs
index c9f3fb76c2..af4a81626c 100644
--- a/datafusion/physical-plan/src/unnest.rs
+++ b/datafusion/physical-plan/src/unnest.rs
@@ -242,7 +242,7 @@ fn build_batch(
     column: &Column,
     options: &UnnestOptions,
 ) -> Result<RecordBatch> {
-    let list_array = column.evaluate(batch)?.into_array(batch.num_rows());
+    let list_array = column.evaluate(batch)?.into_array(batch.num_rows())?;
     match list_array.data_type() {
         DataType::List(_) => {
             let list_array = list_array.as_any().downcast_ref::<ListArray>().unwrap();