You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/11/11 14:40:26 UTC
(arrow-datafusion) branch main updated: chore: remove panics in datafusion-common::scalar (#7901)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e642cc2a94 chore: remove panics in datafusion-common::scalar (#7901)
e642cc2a94 is described below
commit e642cc2a94f38518d765d25c8113523aedc29198
Author: Junjun Dong <ju...@gmail.com>
AuthorDate: Sat Nov 11 06:40:20 2023 -0800
chore: remove panics in datafusion-common::scalar (#7901)
---
datafusion/common/src/pyarrow.rs | 2 +-
datafusion/common/src/scalar.rs | 598 ++++++++++++---------
datafusion/core/benches/scalar.rs | 10 +-
datafusion/core/src/datasource/listing/helpers.rs | 5 +-
.../datasource/physical_plan/file_scan_config.rs | 12 +-
.../datasource/physical_plan/parquet/row_filter.rs | 2 +-
.../datasource/physical_plan/parquet/row_groups.rs | 4 +-
datafusion/expr/src/columnar_value.rs | 14 +-
datafusion/expr/src/window_state.rs | 2 +-
.../optimizer/src/unwrap_cast_in_comparison.rs | 8 +-
.../physical-expr/src/aggregate/correlation.rs | 12 +-
.../physical-expr/src/aggregate/covariance.rs | 12 +-
.../physical-expr/src/aggregate/first_last.rs | 10 +-
datafusion/physical-expr/src/aggregate/stddev.rs | 12 +-
datafusion/physical-expr/src/aggregate/utils.rs | 4 +-
datafusion/physical-expr/src/aggregate/variance.rs | 12 +-
.../physical-expr/src/conditional_expressions.rs | 2 +-
.../physical-expr/src/datetime_expressions.rs | 2 +-
datafusion/physical-expr/src/expressions/binary.rs | 49 +-
datafusion/physical-expr/src/expressions/case.rs | 64 ++-
datafusion/physical-expr/src/expressions/cast.rs | 12 +-
datafusion/physical-expr/src/expressions/datum.rs | 14 +-
.../src/expressions/get_indexed_field.rs | 33 +-
.../physical-expr/src/expressions/in_list.rs | 11 +-
.../physical-expr/src/expressions/is_not_null.rs | 5 +-
.../physical-expr/src/expressions/is_null.rs | 5 +-
datafusion/physical-expr/src/expressions/like.rs | 5 +-
.../physical-expr/src/expressions/literal.rs | 5 +-
datafusion/physical-expr/src/expressions/mod.rs | 12 +-
.../physical-expr/src/expressions/negative.rs | 2 +-
datafusion/physical-expr/src/expressions/not.rs | 5 +-
datafusion/physical-expr/src/expressions/nullif.rs | 20 +-
.../physical-expr/src/expressions/try_cast.rs | 12 +-
datafusion/physical-expr/src/functions.rs | 63 ++-
.../src/intervals/interval_aritmetic.rs | 2 +-
datafusion/physical-expr/src/math_expressions.rs | 3 +-
datafusion/physical-expr/src/planner.rs | 2 +-
datafusion/physical-expr/src/struct_expressions.rs | 15 +-
.../src/window/built_in_window_function_expr.rs | 6 +-
datafusion/physical-expr/src/window/lead_lag.rs | 1 +
datafusion/physical-expr/src/window/window_expr.rs | 6 +-
datafusion/physical-plan/src/aggregates/mod.rs | 29 +-
.../physical-plan/src/aggregates/no_grouping.rs | 6 +-
datafusion/physical-plan/src/filter.rs | 2 +-
datafusion/physical-plan/src/joins/cross_join.rs | 2 +-
datafusion/physical-plan/src/joins/hash_join.rs | 8 +-
.../physical-plan/src/joins/hash_join_utils.rs | 2 +-
.../physical-plan/src/joins/symmetric_hash_join.rs | 6 +-
datafusion/physical-plan/src/joins/utils.rs | 2 +-
datafusion/physical-plan/src/projection.rs | 6 +-
datafusion/physical-plan/src/repartition/mod.rs | 4 +-
datafusion/physical-plan/src/sorts/stream.rs | 4 +-
datafusion/physical-plan/src/topk/mod.rs | 2 +-
datafusion/physical-plan/src/unnest.rs | 2 +-
54 files changed, 723 insertions(+), 427 deletions(-)
diff --git a/datafusion/common/src/pyarrow.rs b/datafusion/common/src/pyarrow.rs
index 59a8b811e3..aa01539193 100644
--- a/datafusion/common/src/pyarrow.rs
+++ b/datafusion/common/src/pyarrow.rs
@@ -54,7 +54,7 @@ impl FromPyArrow for ScalarValue {
impl ToPyArrow for ScalarValue {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
- let array = self.to_array();
+ let array = self.to_array()?;
// convert to pyarrow array using C data interface
let pyarray = array.to_data().to_pyarrow(py)?;
let pyscalar = pyarray.call_method1(py, "__getitem__", (0,))?;
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 0d701eaad2..cdcc9aa4fb 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -330,9 +330,9 @@ impl PartialOrd for ScalarValue {
let arr2 = list_arr2.value(i);
let lt_res =
- arrow::compute::kernels::cmp::lt(&arr1, &arr2).unwrap();
+ arrow::compute::kernels::cmp::lt(&arr1, &arr2).ok()?;
let eq_res =
- arrow::compute::kernels::cmp::eq(&arr1, &arr2).unwrap();
+ arrow::compute::kernels::cmp::eq(&arr1, &arr2).ok()?;
for j in 0..lt_res.len() {
if lt_res.is_valid(j) && lt_res.value(j) {
@@ -431,6 +431,10 @@ macro_rules! hash_float_value {
hash_float_value!((f64, u64), (f32, u32));
// manual implementation of `Hash`
+//
+// # Panics
+//
+// Panics if there is an error when creating hash values for rows
impl std::hash::Hash for ScalarValue {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
use ScalarValue::*;
@@ -506,15 +510,19 @@ impl std::hash::Hash for ScalarValue {
}
}
-/// return a reference to the values array and the index into it for a
+/// Return a reference to the values array and the index into it for a
/// dictionary array
+///
+/// # Errors
+///
+/// Errors if the array cannot be downcasted to DictionaryArray
#[inline]
pub fn get_dict_value<K: ArrowDictionaryKeyType>(
array: &dyn Array,
index: usize,
-) -> (&ArrayRef, Option<usize>) {
- let dict_array = as_dictionary_array::<K>(array).unwrap();
- (dict_array.values(), dict_array.key(index))
+) -> Result<(&ArrayRef, Option<usize>)> {
+ let dict_array = as_dictionary_array::<K>(array)?;
+ Ok((dict_array.values(), dict_array.key(index)))
}
/// Create a dictionary array representing `value` repeated `size`
@@ -522,9 +530,9 @@ pub fn get_dict_value<K: ArrowDictionaryKeyType>(
fn dict_from_scalar<K: ArrowDictionaryKeyType>(
value: &ScalarValue,
size: usize,
-) -> ArrayRef {
+) -> Result<ArrayRef> {
// values array is one element long (the value)
- let values_array = value.to_array_of_size(1);
+ let values_array = value.to_array_of_size(1)?;
// Create a key array with `size` elements, each of 0
let key_array: PrimitiveArray<K> = std::iter::repeat(Some(K::default_value()))
@@ -536,11 +544,9 @@ fn dict_from_scalar<K: ArrowDictionaryKeyType>(
// Note: this path could be made faster by using the ArrayData
// APIs and skipping validation, if it every comes up in
// performance traces.
- Arc::new(
- DictionaryArray::<K>::try_new(key_array, values_array)
- // should always be valid by construction above
- .expect("Can not construct dictionary array"),
- )
+ Ok(Arc::new(
+ DictionaryArray::<K>::try_new(key_array, values_array)?, // should always be valid by construction above
+ ))
}
/// Create a dictionary array representing all the values in values
@@ -579,24 +585,44 @@ fn dict_from_values<K: ArrowDictionaryKeyType>(
macro_rules! typed_cast_tz {
($array:expr, $index:expr, $ARRAYTYPE:ident, $SCALAR:ident, $TZ:expr) => {{
- let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
- ScalarValue::$SCALAR(
+ use std::any::type_name;
+ let array = $array
+ .as_any()
+ .downcast_ref::<$ARRAYTYPE>()
+ .ok_or_else(|| {
+ DataFusionError::Internal(format!(
+ "could not cast value to {}",
+ type_name::<$ARRAYTYPE>()
+ ))
+ })?;
+ Ok::<ScalarValue, DataFusionError>(ScalarValue::$SCALAR(
match array.is_null($index) {
true => None,
false => Some(array.value($index).into()),
},
$TZ.clone(),
- )
+ ))
}};
}
macro_rules! typed_cast {
($array:expr, $index:expr, $ARRAYTYPE:ident, $SCALAR:ident) => {{
- let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
- ScalarValue::$SCALAR(match array.is_null($index) {
- true => None,
- false => Some(array.value($index).into()),
- })
+ use std::any::type_name;
+ let array = $array
+ .as_any()
+ .downcast_ref::<$ARRAYTYPE>()
+ .ok_or_else(|| {
+ DataFusionError::Internal(format!(
+ "could not cast value to {}",
+ type_name::<$ARRAYTYPE>()
+ ))
+ })?;
+ Ok::<ScalarValue, DataFusionError>(ScalarValue::$SCALAR(
+ match array.is_null($index) {
+ true => None,
+ false => Some(array.value($index).into()),
+ },
+ ))
}};
}
@@ -628,12 +654,21 @@ macro_rules! build_timestamp_array_from_option {
macro_rules! eq_array_primitive {
($array:expr, $index:expr, $ARRAYTYPE:ident, $VALUE:expr) => {{
- let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
+ use std::any::type_name;
+ let array = $array
+ .as_any()
+ .downcast_ref::<$ARRAYTYPE>()
+ .ok_or_else(|| {
+ DataFusionError::Internal(format!(
+ "could not cast value to {}",
+ type_name::<$ARRAYTYPE>()
+ ))
+ })?;
let is_valid = array.is_valid($index);
- match $VALUE {
+ Ok::<bool, DataFusionError>(match $VALUE {
Some(val) => is_valid && &array.value($index) == val,
None => !is_valid,
- }
+ })
}};
}
@@ -935,7 +970,7 @@ impl ScalarValue {
/// NB: operating on `ScalarValue` directly is not efficient, performance sensitive code
/// should operate on Arrays directly, using vectorized array kernels
pub fn add<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
- let r = add_wrapping(&self.to_scalar(), &other.borrow().to_scalar())?;
+ let r = add_wrapping(&self.to_scalar()?, &other.borrow().to_scalar()?)?;
Self::try_from_array(r.as_ref(), 0)
}
/// Checked addition of `ScalarValue`
@@ -943,7 +978,7 @@ impl ScalarValue {
/// NB: operating on `ScalarValue` directly is not efficient, performance sensitive code
/// should operate on Arrays directly, using vectorized array kernels
pub fn add_checked<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
- let r = add(&self.to_scalar(), &other.borrow().to_scalar())?;
+ let r = add(&self.to_scalar()?, &other.borrow().to_scalar()?)?;
Self::try_from_array(r.as_ref(), 0)
}
@@ -952,7 +987,7 @@ impl ScalarValue {
/// NB: operating on `ScalarValue` directly is not efficient, performance sensitive code
/// should operate on Arrays directly, using vectorized array kernels
pub fn sub<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
- let r = sub_wrapping(&self.to_scalar(), &other.borrow().to_scalar())?;
+ let r = sub_wrapping(&self.to_scalar()?, &other.borrow().to_scalar()?)?;
Self::try_from_array(r.as_ref(), 0)
}
@@ -961,7 +996,7 @@ impl ScalarValue {
/// NB: operating on `ScalarValue` directly is not efficient, performance sensitive code
/// should operate on Arrays directly, using vectorized array kernels
pub fn sub_checked<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
- let r = sub(&self.to_scalar(), &other.borrow().to_scalar())?;
+ let r = sub(&self.to_scalar()?, &other.borrow().to_scalar()?)?;
Self::try_from_array(r.as_ref(), 0)
}
@@ -1050,7 +1085,11 @@ impl ScalarValue {
}
/// Converts a scalar value into an 1-row array.
- pub fn to_array(&self) -> ArrayRef {
+ ///
+ /// # Errors
+ ///
+ /// Errors if the ScalarValue cannot be converted into a 1-row array
+ pub fn to_array(&self) -> Result<ArrayRef> {
self.to_array_of_size(1)
}
@@ -1059,6 +1098,10 @@ impl ScalarValue {
///
/// This can be used to call arrow compute kernels such as `lt`
///
+ /// # Errors
+ ///
+ /// Errors if the ScalarValue cannot be converted into a 1-row array
+ ///
/// # Example
/// ```
/// use datafusion_common::ScalarValue;
@@ -1069,7 +1112,7 @@ impl ScalarValue {
///
/// let result = arrow::compute::kernels::cmp::lt(
/// &arr,
- /// &five.to_scalar(),
+ /// &five.to_scalar().unwrap(),
/// ).unwrap();
///
/// let expected = BooleanArray::from(vec![
@@ -1082,8 +1125,8 @@ impl ScalarValue {
/// assert_eq!(&result, &expected);
/// ```
/// [`Datum`]: arrow_array::Datum
- pub fn to_scalar(&self) -> Scalar<ArrayRef> {
- Scalar::new(self.to_array_of_size(1))
+ pub fn to_scalar(&self) -> Result<Scalar<ArrayRef>> {
+ Ok(Scalar::new(self.to_array_of_size(1)?))
}
/// Converts an iterator of references [`ScalarValue`] into an [`ArrayRef`]
@@ -1093,6 +1136,10 @@ impl ScalarValue {
/// Returns an error if the iterator is empty or if the
/// [`ScalarValue`]s are not all the same type
///
+ /// # Panics
+ ///
+ /// Panics if `self` is a dictionary with invalid key type
+ ///
/// # Example
/// ```
/// use datafusion_common::ScalarValue;
@@ -1199,28 +1246,29 @@ impl ScalarValue {
macro_rules! build_array_list_primitive {
($ARRAY_TY:ident, $SCALAR_TY:ident, $NATIVE_TYPE:ident) => {{
- Arc::new(ListArray::from_iter_primitive::<$ARRAY_TY, _, _>(
+ Ok::<ArrayRef, DataFusionError>(Arc::new(ListArray::from_iter_primitive::<$ARRAY_TY, _, _>(
scalars.into_iter().map(|x| match x {
ScalarValue::List(arr) => {
// `ScalarValue::List` contains a single element `ListArray`.
let list_arr = as_list_array(&arr);
if list_arr.is_null(0) {
- None
+ Ok(None)
} else {
let primitive_arr =
list_arr.values().as_primitive::<$ARRAY_TY>();
- Some(
+ Ok(Some(
primitive_arr.into_iter().collect::<Vec<Option<_>>>(),
- )
+ ))
}
}
- sv => panic!(
+ sv => _internal_err!(
"Inconsistent types in ScalarValue::iter_to_array. \
Expected {:?}, got {:?}",
data_type, sv
),
- }),
- ))
+ })
+ .collect::<Result<Vec<_>>>()?,
+ )))
}};
}
@@ -1273,7 +1321,7 @@ impl ScalarValue {
ScalarValue::iter_to_decimal256_array(scalars, *precision, *scale)?;
Arc::new(decimal_array)
}
- DataType::Null => ScalarValue::iter_to_null_array(scalars),
+ DataType::Null => ScalarValue::iter_to_null_array(scalars)?,
DataType::Boolean => build_array_primitive!(BooleanArray, Boolean),
DataType::Float32 => build_array_primitive!(Float32Array, Float32),
DataType::Float64 => build_array_primitive!(Float64Array, Float64),
@@ -1337,34 +1385,34 @@ impl ScalarValue {
build_array_primitive!(IntervalMonthDayNanoArray, IntervalMonthDayNano)
}
DataType::List(fields) if fields.data_type() == &DataType::Int8 => {
- build_array_list_primitive!(Int8Type, Int8, i8)
+ build_array_list_primitive!(Int8Type, Int8, i8)?
}
DataType::List(fields) if fields.data_type() == &DataType::Int16 => {
- build_array_list_primitive!(Int16Type, Int16, i16)
+ build_array_list_primitive!(Int16Type, Int16, i16)?
}
DataType::List(fields) if fields.data_type() == &DataType::Int32 => {
- build_array_list_primitive!(Int32Type, Int32, i32)
+ build_array_list_primitive!(Int32Type, Int32, i32)?
}
DataType::List(fields) if fields.data_type() == &DataType::Int64 => {
- build_array_list_primitive!(Int64Type, Int64, i64)
+ build_array_list_primitive!(Int64Type, Int64, i64)?
}
DataType::List(fields) if fields.data_type() == &DataType::UInt8 => {
- build_array_list_primitive!(UInt8Type, UInt8, u8)
+ build_array_list_primitive!(UInt8Type, UInt8, u8)?
}
DataType::List(fields) if fields.data_type() == &DataType::UInt16 => {
- build_array_list_primitive!(UInt16Type, UInt16, u16)
+ build_array_list_primitive!(UInt16Type, UInt16, u16)?
}
DataType::List(fields) if fields.data_type() == &DataType::UInt32 => {
- build_array_list_primitive!(UInt32Type, UInt32, u32)
+ build_array_list_primitive!(UInt32Type, UInt32, u32)?
}
DataType::List(fields) if fields.data_type() == &DataType::UInt64 => {
- build_array_list_primitive!(UInt64Type, UInt64, u64)
+ build_array_list_primitive!(UInt64Type, UInt64, u64)?
}
DataType::List(fields) if fields.data_type() == &DataType::Float32 => {
- build_array_list_primitive!(Float32Type, Float32, f32)
+ build_array_list_primitive!(Float32Type, Float32, f32)?
}
DataType::List(fields) if fields.data_type() == &DataType::Float64 => {
- build_array_list_primitive!(Float64Type, Float64, f64)
+ build_array_list_primitive!(Float64Type, Float64, f64)?
}
DataType::List(fields) if fields.data_type() == &DataType::Utf8 => {
build_array_list_string!(StringBuilder, as_string_array)
@@ -1432,7 +1480,7 @@ impl ScalarValue {
if &inner_key_type == key_type {
Ok(*scalar)
} else {
- panic!("Expected inner key type of {key_type} but found: {inner_key_type}, value was ({scalar:?})");
+ _internal_err!("Expected inner key type of {key_type} but found: {inner_key_type}, value was ({scalar:?})")
}
}
_ => {
@@ -1504,15 +1552,19 @@ impl ScalarValue {
Ok(array)
}
- fn iter_to_null_array(scalars: impl IntoIterator<Item = ScalarValue>) -> ArrayRef {
- let length =
- scalars
- .into_iter()
- .fold(0usize, |r, element: ScalarValue| match element {
- ScalarValue::Null => r + 1,
- _ => unreachable!(),
- });
- new_null_array(&DataType::Null, length)
+ fn iter_to_null_array(
+ scalars: impl IntoIterator<Item = ScalarValue>,
+ ) -> Result<ArrayRef> {
+ let length = scalars.into_iter().try_fold(
+ 0usize,
+ |r, element: ScalarValue| match element {
+ ScalarValue::Null => Ok::<usize, DataFusionError>(r + 1),
+ s => {
+ _internal_err!("Expected ScalarValue::Null element. Received {s:?}")
+ }
+ },
+ )?;
+ Ok(new_null_array(&DataType::Null, length))
}
fn iter_to_decimal_array(
@@ -1523,10 +1575,12 @@ impl ScalarValue {
let array = scalars
.into_iter()
.map(|element: ScalarValue| match element {
- ScalarValue::Decimal128(v1, _, _) => v1,
- _ => unreachable!(),
+ ScalarValue::Decimal128(v1, _, _) => Ok(v1),
+ s => {
+ _internal_err!("Expected ScalarValue::Null element. Received {s:?}")
+ }
})
- .collect::<Decimal128Array>()
+ .collect::<Result<Decimal128Array>>()?
.with_precision_and_scale(precision, scale)?;
Ok(array)
}
@@ -1539,10 +1593,14 @@ impl ScalarValue {
let array = scalars
.into_iter()
.map(|element: ScalarValue| match element {
- ScalarValue::Decimal256(v1, _, _) => v1,
- _ => unreachable!(),
+ ScalarValue::Decimal256(v1, _, _) => Ok(v1),
+ s => {
+ _internal_err!(
+ "Expected ScalarValue::Decimal256 element. Received {s:?}"
+ )
+ }
})
- .collect::<Decimal256Array>()
+ .collect::<Result<Decimal256Array>>()?
.with_precision_and_scale(precision, scale)?;
Ok(array)
}
@@ -1607,17 +1665,17 @@ impl ScalarValue {
precision: u8,
scale: i8,
size: usize,
- ) -> Decimal128Array {
+ ) -> Result<Decimal128Array> {
match value {
Some(val) => Decimal128Array::from(vec![val; size])
.with_precision_and_scale(precision, scale)
- .unwrap(),
+ .map_err(DataFusionError::ArrowError),
None => {
let mut builder = Decimal128Array::builder(size)
.with_precision_and_scale(precision, scale)
- .unwrap();
+ .map_err(DataFusionError::ArrowError)?;
builder.append_nulls(size);
- builder.finish()
+ Ok(builder.finish())
}
}
}
@@ -1627,12 +1685,12 @@ impl ScalarValue {
precision: u8,
scale: i8,
size: usize,
- ) -> Decimal256Array {
+ ) -> Result<Decimal256Array> {
std::iter::repeat(value)
.take(size)
.collect::<Decimal256Array>()
.with_precision_and_scale(precision, scale)
- .unwrap()
+ .map_err(DataFusionError::ArrowError)
}
/// Converts `Vec<ScalarValue>` where each element has type corresponding to
@@ -1671,13 +1729,21 @@ impl ScalarValue {
}
/// Converts a scalar value into an array of `size` rows.
- pub fn to_array_of_size(&self, size: usize) -> ArrayRef {
- match self {
+ ///
+ /// # Errors
+ ///
+ /// Errors if `self` is
+ /// - a decimal that fails be converted to a decimal array of size
+ /// - a `Fixedsizelist` that is not supported yet
+ /// - a `List` that fails to be concatenated into an array of size
+ /// - a `Dictionary` that fails be converted to a dictionary array of size
+ pub fn to_array_of_size(&self, size: usize) -> Result<ArrayRef> {
+ Ok(match self {
ScalarValue::Decimal128(e, precision, scale) => Arc::new(
- ScalarValue::build_decimal_array(*e, *precision, *scale, size),
+ ScalarValue::build_decimal_array(*e, *precision, *scale, size)?,
),
ScalarValue::Decimal256(e, precision, scale) => Arc::new(
- ScalarValue::build_decimal256_array(*e, *precision, *scale, size),
+ ScalarValue::build_decimal256_array(*e, *precision, *scale, size)?,
),
ScalarValue::Boolean(e) => {
Arc::new(BooleanArray::from(vec![*e; size])) as ArrayRef
@@ -1790,13 +1856,14 @@ impl ScalarValue {
),
},
ScalarValue::Fixedsizelist(..) => {
- unimplemented!("FixedSizeList is not supported yet")
+ return _not_impl_err!("FixedSizeList is not supported yet")
}
ScalarValue::List(arr) => {
let arrays = std::iter::repeat(arr.as_ref())
.take(size)
.collect::<Vec<_>>();
- arrow::compute::concat(arrays.as_slice()).unwrap()
+ arrow::compute::concat(arrays.as_slice())
+ .map_err(DataFusionError::ArrowError)?
}
ScalarValue::Date32(e) => {
build_array_from_option!(Date32, Date32Array, e, size)
@@ -1891,13 +1958,13 @@ impl ScalarValue {
),
ScalarValue::Struct(values, fields) => match values {
Some(values) => {
- let field_values: Vec<_> = fields
+ let field_values = fields
.iter()
.zip(values.iter())
.map(|(field, value)| {
- (field.clone(), value.to_array_of_size(size))
+ Ok((field.clone(), value.to_array_of_size(size)?))
})
- .collect();
+ .collect::<Result<Vec<_>>>()?;
Arc::new(StructArray::from(field_values))
}
@@ -1909,19 +1976,19 @@ impl ScalarValue {
ScalarValue::Dictionary(key_type, v) => {
// values array is one element long (the value)
match key_type.as_ref() {
- DataType::Int8 => dict_from_scalar::<Int8Type>(v, size),
- DataType::Int16 => dict_from_scalar::<Int16Type>(v, size),
- DataType::Int32 => dict_from_scalar::<Int32Type>(v, size),
- DataType::Int64 => dict_from_scalar::<Int64Type>(v, size),
- DataType::UInt8 => dict_from_scalar::<UInt8Type>(v, size),
- DataType::UInt16 => dict_from_scalar::<UInt16Type>(v, size),
- DataType::UInt32 => dict_from_scalar::<UInt32Type>(v, size),
- DataType::UInt64 => dict_from_scalar::<UInt64Type>(v, size),
+ DataType::Int8 => dict_from_scalar::<Int8Type>(v, size)?,
+ DataType::Int16 => dict_from_scalar::<Int16Type>(v, size)?,
+ DataType::Int32 => dict_from_scalar::<Int32Type>(v, size)?,
+ DataType::Int64 => dict_from_scalar::<Int64Type>(v, size)?,
+ DataType::UInt8 => dict_from_scalar::<UInt8Type>(v, size)?,
+ DataType::UInt16 => dict_from_scalar::<UInt16Type>(v, size)?,
+ DataType::UInt32 => dict_from_scalar::<UInt32Type>(v, size)?,
+ DataType::UInt64 => dict_from_scalar::<UInt64Type>(v, size)?,
_ => unreachable!("Invalid dictionary keys type: {:?}", key_type),
}
}
ScalarValue::Null => new_null_array(&DataType::Null, size),
- }
+ })
}
fn get_decimal_value_from_array(
@@ -2037,23 +2104,25 @@ impl ScalarValue {
array, index, *precision, *scale,
)?
}
- DataType::Boolean => typed_cast!(array, index, BooleanArray, Boolean),
- DataType::Float64 => typed_cast!(array, index, Float64Array, Float64),
- DataType::Float32 => typed_cast!(array, index, Float32Array, Float32),
- DataType::UInt64 => typed_cast!(array, index, UInt64Array, UInt64),
- DataType::UInt32 => typed_cast!(array, index, UInt32Array, UInt32),
- DataType::UInt16 => typed_cast!(array, index, UInt16Array, UInt16),
- DataType::UInt8 => typed_cast!(array, index, UInt8Array, UInt8),
- DataType::Int64 => typed_cast!(array, index, Int64Array, Int64),
- DataType::Int32 => typed_cast!(array, index, Int32Array, Int32),
- DataType::Int16 => typed_cast!(array, index, Int16Array, Int16),
- DataType::Int8 => typed_cast!(array, index, Int8Array, Int8),
- DataType::Binary => typed_cast!(array, index, BinaryArray, Binary),
+ DataType::Boolean => typed_cast!(array, index, BooleanArray, Boolean)?,
+ DataType::Float64 => typed_cast!(array, index, Float64Array, Float64)?,
+ DataType::Float32 => typed_cast!(array, index, Float32Array, Float32)?,
+ DataType::UInt64 => typed_cast!(array, index, UInt64Array, UInt64)?,
+ DataType::UInt32 => typed_cast!(array, index, UInt32Array, UInt32)?,
+ DataType::UInt16 => typed_cast!(array, index, UInt16Array, UInt16)?,
+ DataType::UInt8 => typed_cast!(array, index, UInt8Array, UInt8)?,
+ DataType::Int64 => typed_cast!(array, index, Int64Array, Int64)?,
+ DataType::Int32 => typed_cast!(array, index, Int32Array, Int32)?,
+ DataType::Int16 => typed_cast!(array, index, Int16Array, Int16)?,
+ DataType::Int8 => typed_cast!(array, index, Int8Array, Int8)?,
+ DataType::Binary => typed_cast!(array, index, BinaryArray, Binary)?,
DataType::LargeBinary => {
- typed_cast!(array, index, LargeBinaryArray, LargeBinary)
+ typed_cast!(array, index, LargeBinaryArray, LargeBinary)?
+ }
+ DataType::Utf8 => typed_cast!(array, index, StringArray, Utf8)?,
+ DataType::LargeUtf8 => {
+ typed_cast!(array, index, LargeStringArray, LargeUtf8)?
}
- DataType::Utf8 => typed_cast!(array, index, StringArray, Utf8),
- DataType::LargeUtf8 => typed_cast!(array, index, LargeStringArray, LargeUtf8),
DataType::List(_) => {
let list_array = as_list_array(array);
let nested_array = list_array.value(index);
@@ -2071,70 +2140,58 @@ impl ScalarValue {
ScalarValue::List(arr)
}
- DataType::Date32 => {
- typed_cast!(array, index, Date32Array, Date32)
- }
- DataType::Date64 => {
- typed_cast!(array, index, Date64Array, Date64)
- }
+ DataType::Date32 => typed_cast!(array, index, Date32Array, Date32)?,
+ DataType::Date64 => typed_cast!(array, index, Date64Array, Date64)?,
DataType::Time32(TimeUnit::Second) => {
- typed_cast!(array, index, Time32SecondArray, Time32Second)
+ typed_cast!(array, index, Time32SecondArray, Time32Second)?
}
DataType::Time32(TimeUnit::Millisecond) => {
- typed_cast!(array, index, Time32MillisecondArray, Time32Millisecond)
+ typed_cast!(array, index, Time32MillisecondArray, Time32Millisecond)?
}
DataType::Time64(TimeUnit::Microsecond) => {
- typed_cast!(array, index, Time64MicrosecondArray, Time64Microsecond)
+ typed_cast!(array, index, Time64MicrosecondArray, Time64Microsecond)?
}
DataType::Time64(TimeUnit::Nanosecond) => {
- typed_cast!(array, index, Time64NanosecondArray, Time64Nanosecond)
- }
- DataType::Timestamp(TimeUnit::Second, tz_opt) => {
- typed_cast_tz!(
- array,
- index,
- TimestampSecondArray,
- TimestampSecond,
- tz_opt
- )
- }
- DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
- typed_cast_tz!(
- array,
- index,
- TimestampMillisecondArray,
- TimestampMillisecond,
- tz_opt
- )
- }
- DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
- typed_cast_tz!(
- array,
- index,
- TimestampMicrosecondArray,
- TimestampMicrosecond,
- tz_opt
- )
- }
- DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
- typed_cast_tz!(
- array,
- index,
- TimestampNanosecondArray,
- TimestampNanosecond,
- tz_opt
- )
+ typed_cast!(array, index, Time64NanosecondArray, Time64Nanosecond)?
}
+ DataType::Timestamp(TimeUnit::Second, tz_opt) => typed_cast_tz!(
+ array,
+ index,
+ TimestampSecondArray,
+ TimestampSecond,
+ tz_opt
+ )?,
+ DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_cast_tz!(
+ array,
+ index,
+ TimestampMillisecondArray,
+ TimestampMillisecond,
+ tz_opt
+ )?,
+ DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_cast_tz!(
+ array,
+ index,
+ TimestampMicrosecondArray,
+ TimestampMicrosecond,
+ tz_opt
+ )?,
+ DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_cast_tz!(
+ array,
+ index,
+ TimestampNanosecondArray,
+ TimestampNanosecond,
+ tz_opt
+ )?,
DataType::Dictionary(key_type, _) => {
let (values_array, values_index) = match key_type.as_ref() {
- DataType::Int8 => get_dict_value::<Int8Type>(array, index),
- DataType::Int16 => get_dict_value::<Int16Type>(array, index),
- DataType::Int32 => get_dict_value::<Int32Type>(array, index),
- DataType::Int64 => get_dict_value::<Int64Type>(array, index),
- DataType::UInt8 => get_dict_value::<UInt8Type>(array, index),
- DataType::UInt16 => get_dict_value::<UInt16Type>(array, index),
- DataType::UInt32 => get_dict_value::<UInt32Type>(array, index),
- DataType::UInt64 => get_dict_value::<UInt64Type>(array, index),
+ DataType::Int8 => get_dict_value::<Int8Type>(array, index)?,
+ DataType::Int16 => get_dict_value::<Int16Type>(array, index)?,
+ DataType::Int32 => get_dict_value::<Int32Type>(array, index)?,
+ DataType::Int64 => get_dict_value::<Int64Type>(array, index)?,
+ DataType::UInt8 => get_dict_value::<UInt8Type>(array, index)?,
+ DataType::UInt16 => get_dict_value::<UInt16Type>(array, index)?,
+ DataType::UInt32 => get_dict_value::<UInt32Type>(array, index)?,
+ DataType::UInt64 => get_dict_value::<UInt64Type>(array, index)?,
_ => unreachable!("Invalid dictionary keys type: {:?}", key_type),
};
// look up the index in the values dictionary
@@ -2173,31 +2230,29 @@ impl ScalarValue {
)
}
DataType::Interval(IntervalUnit::DayTime) => {
- typed_cast!(array, index, IntervalDayTimeArray, IntervalDayTime)
+ typed_cast!(array, index, IntervalDayTimeArray, IntervalDayTime)?
}
DataType::Interval(IntervalUnit::YearMonth) => {
- typed_cast!(array, index, IntervalYearMonthArray, IntervalYearMonth)
- }
- DataType::Interval(IntervalUnit::MonthDayNano) => {
- typed_cast!(
- array,
- index,
- IntervalMonthDayNanoArray,
- IntervalMonthDayNano
- )
+ typed_cast!(array, index, IntervalYearMonthArray, IntervalYearMonth)?
}
+ DataType::Interval(IntervalUnit::MonthDayNano) => typed_cast!(
+ array,
+ index,
+ IntervalMonthDayNanoArray,
+ IntervalMonthDayNano
+ )?,
DataType::Duration(TimeUnit::Second) => {
- typed_cast!(array, index, DurationSecondArray, DurationSecond)
+ typed_cast!(array, index, DurationSecondArray, DurationSecond)?
}
DataType::Duration(TimeUnit::Millisecond) => {
- typed_cast!(array, index, DurationMillisecondArray, DurationMillisecond)
+ typed_cast!(array, index, DurationMillisecondArray, DurationMillisecond)?
}
DataType::Duration(TimeUnit::Microsecond) => {
- typed_cast!(array, index, DurationMicrosecondArray, DurationMicrosecond)
+ typed_cast!(array, index, DurationMicrosecondArray, DurationMicrosecond)?
}
DataType::Duration(TimeUnit::Nanosecond) => {
- typed_cast!(array, index, DurationNanosecondArray, DurationNanosecond)
+ typed_cast!(array, index, DurationNanosecondArray, DurationNanosecond)?
}
other => {
@@ -2215,7 +2270,7 @@ impl ScalarValue {
safe: false,
format_options: Default::default(),
};
- let cast_arr = cast_with_options(&value.to_array(), target_type, &cast_options)?;
+ let cast_arr = cast_with_options(&value.to_array()?, target_type, &cast_options)?;
ScalarValue::try_from_array(&cast_arr, 0)
}
@@ -2273,9 +2328,21 @@ impl ScalarValue {
///
/// This function has a few narrow usescases such as hash table key
/// comparisons where comparing a single row at a time is necessary.
+ ///
+ /// # Errors
+ ///
+ /// Errors if
+ /// - it fails to downcast `array` to the data type of `self`
+ /// - `self` is a `Fixedsizelist`
+ /// - `self` is a `List`
+ /// - `self` is a `Struct`
+ ///
+ /// # Panics
+ ///
+ /// Panics if `self` is a dictionary with invalid key type
#[inline]
- pub fn eq_array(&self, array: &ArrayRef, index: usize) -> bool {
- match self {
+ pub fn eq_array(&self, array: &ArrayRef, index: usize) -> Result<bool> {
+ Ok(match self {
ScalarValue::Decimal128(v, precision, scale) => {
ScalarValue::eq_array_decimal(
array,
@@ -2283,8 +2350,7 @@ impl ScalarValue {
v.as_ref(),
*precision,
*scale,
- )
- .unwrap()
+ )?
}
ScalarValue::Decimal256(v, precision, scale) => {
ScalarValue::eq_array_decimal256(
@@ -2293,119 +2359,132 @@ impl ScalarValue {
v.as_ref(),
*precision,
*scale,
- )
- .unwrap()
+ )?
}
ScalarValue::Boolean(val) => {
- eq_array_primitive!(array, index, BooleanArray, val)
+ eq_array_primitive!(array, index, BooleanArray, val)?
}
ScalarValue::Float32(val) => {
- eq_array_primitive!(array, index, Float32Array, val)
+ eq_array_primitive!(array, index, Float32Array, val)?
}
ScalarValue::Float64(val) => {
- eq_array_primitive!(array, index, Float64Array, val)
+ eq_array_primitive!(array, index, Float64Array, val)?
+ }
+ ScalarValue::Int8(val) => eq_array_primitive!(array, index, Int8Array, val)?,
+ ScalarValue::Int16(val) => {
+ eq_array_primitive!(array, index, Int16Array, val)?
+ }
+ ScalarValue::Int32(val) => {
+ eq_array_primitive!(array, index, Int32Array, val)?
+ }
+ ScalarValue::Int64(val) => {
+ eq_array_primitive!(array, index, Int64Array, val)?
+ }
+ ScalarValue::UInt8(val) => {
+ eq_array_primitive!(array, index, UInt8Array, val)?
}
- ScalarValue::Int8(val) => eq_array_primitive!(array, index, Int8Array, val),
- ScalarValue::Int16(val) => eq_array_primitive!(array, index, Int16Array, val),
- ScalarValue::Int32(val) => eq_array_primitive!(array, index, Int32Array, val),
- ScalarValue::Int64(val) => eq_array_primitive!(array, index, Int64Array, val),
- ScalarValue::UInt8(val) => eq_array_primitive!(array, index, UInt8Array, val),
ScalarValue::UInt16(val) => {
- eq_array_primitive!(array, index, UInt16Array, val)
+ eq_array_primitive!(array, index, UInt16Array, val)?
}
ScalarValue::UInt32(val) => {
- eq_array_primitive!(array, index, UInt32Array, val)
+ eq_array_primitive!(array, index, UInt32Array, val)?
}
ScalarValue::UInt64(val) => {
- eq_array_primitive!(array, index, UInt64Array, val)
+ eq_array_primitive!(array, index, UInt64Array, val)?
+ }
+ ScalarValue::Utf8(val) => {
+ eq_array_primitive!(array, index, StringArray, val)?
}
- ScalarValue::Utf8(val) => eq_array_primitive!(array, index, StringArray, val),
ScalarValue::LargeUtf8(val) => {
- eq_array_primitive!(array, index, LargeStringArray, val)
+ eq_array_primitive!(array, index, LargeStringArray, val)?
}
ScalarValue::Binary(val) => {
- eq_array_primitive!(array, index, BinaryArray, val)
+ eq_array_primitive!(array, index, BinaryArray, val)?
}
ScalarValue::FixedSizeBinary(_, val) => {
- eq_array_primitive!(array, index, FixedSizeBinaryArray, val)
+ eq_array_primitive!(array, index, FixedSizeBinaryArray, val)?
}
ScalarValue::LargeBinary(val) => {
- eq_array_primitive!(array, index, LargeBinaryArray, val)
+ eq_array_primitive!(array, index, LargeBinaryArray, val)?
+ }
+ ScalarValue::Fixedsizelist(..) => {
+ return _not_impl_err!("FixedSizeList is not supported yet")
}
- ScalarValue::Fixedsizelist(..) => unimplemented!(),
- ScalarValue::List(_) => unimplemented!("ListArr"),
+ ScalarValue::List(_) => return _not_impl_err!("List is not supported yet"),
ScalarValue::Date32(val) => {
- eq_array_primitive!(array, index, Date32Array, val)
+ eq_array_primitive!(array, index, Date32Array, val)?
}
ScalarValue::Date64(val) => {
- eq_array_primitive!(array, index, Date64Array, val)
+ eq_array_primitive!(array, index, Date64Array, val)?
}
ScalarValue::Time32Second(val) => {
- eq_array_primitive!(array, index, Time32SecondArray, val)
+ eq_array_primitive!(array, index, Time32SecondArray, val)?
}
ScalarValue::Time32Millisecond(val) => {
- eq_array_primitive!(array, index, Time32MillisecondArray, val)
+ eq_array_primitive!(array, index, Time32MillisecondArray, val)?
}
ScalarValue::Time64Microsecond(val) => {
- eq_array_primitive!(array, index, Time64MicrosecondArray, val)
+ eq_array_primitive!(array, index, Time64MicrosecondArray, val)?
}
ScalarValue::Time64Nanosecond(val) => {
- eq_array_primitive!(array, index, Time64NanosecondArray, val)
+ eq_array_primitive!(array, index, Time64NanosecondArray, val)?
}
ScalarValue::TimestampSecond(val, _) => {
- eq_array_primitive!(array, index, TimestampSecondArray, val)
+ eq_array_primitive!(array, index, TimestampSecondArray, val)?
}
ScalarValue::TimestampMillisecond(val, _) => {
- eq_array_primitive!(array, index, TimestampMillisecondArray, val)
+ eq_array_primitive!(array, index, TimestampMillisecondArray, val)?
}
ScalarValue::TimestampMicrosecond(val, _) => {
- eq_array_primitive!(array, index, TimestampMicrosecondArray, val)
+ eq_array_primitive!(array, index, TimestampMicrosecondArray, val)?
}
ScalarValue::TimestampNanosecond(val, _) => {
- eq_array_primitive!(array, index, TimestampNanosecondArray, val)
+ eq_array_primitive!(array, index, TimestampNanosecondArray, val)?
}
ScalarValue::IntervalYearMonth(val) => {
- eq_array_primitive!(array, index, IntervalYearMonthArray, val)
+ eq_array_primitive!(array, index, IntervalYearMonthArray, val)?
}
ScalarValue::IntervalDayTime(val) => {
- eq_array_primitive!(array, index, IntervalDayTimeArray, val)
+ eq_array_primitive!(array, index, IntervalDayTimeArray, val)?
}
ScalarValue::IntervalMonthDayNano(val) => {
- eq_array_primitive!(array, index, IntervalMonthDayNanoArray, val)
+ eq_array_primitive!(array, index, IntervalMonthDayNanoArray, val)?
}
ScalarValue::DurationSecond(val) => {
- eq_array_primitive!(array, index, DurationSecondArray, val)
+ eq_array_primitive!(array, index, DurationSecondArray, val)?
}
ScalarValue::DurationMillisecond(val) => {
- eq_array_primitive!(array, index, DurationMillisecondArray, val)
+ eq_array_primitive!(array, index, DurationMillisecondArray, val)?
}
ScalarValue::DurationMicrosecond(val) => {
- eq_array_primitive!(array, index, DurationMicrosecondArray, val)
+ eq_array_primitive!(array, index, DurationMicrosecondArray, val)?
}
ScalarValue::DurationNanosecond(val) => {
- eq_array_primitive!(array, index, DurationNanosecondArray, val)
+ eq_array_primitive!(array, index, DurationNanosecondArray, val)?
+ }
+ ScalarValue::Struct(_, _) => {
+ return _not_impl_err!("Struct is not supported yet")
}
- ScalarValue::Struct(_, _) => unimplemented!(),
ScalarValue::Dictionary(key_type, v) => {
let (values_array, values_index) = match key_type.as_ref() {
- DataType::Int8 => get_dict_value::<Int8Type>(array, index),
- DataType::Int16 => get_dict_value::<Int16Type>(array, index),
- DataType::Int32 => get_dict_value::<Int32Type>(array, index),
- DataType::Int64 => get_dict_value::<Int64Type>(array, index),
- DataType::UInt8 => get_dict_value::<UInt8Type>(array, index),
- DataType::UInt16 => get_dict_value::<UInt16Type>(array, index),
- DataType::UInt32 => get_dict_value::<UInt32Type>(array, index),
- DataType::UInt64 => get_dict_value::<UInt64Type>(array, index),
+ DataType::Int8 => get_dict_value::<Int8Type>(array, index)?,
+ DataType::Int16 => get_dict_value::<Int16Type>(array, index)?,
+ DataType::Int32 => get_dict_value::<Int32Type>(array, index)?,
+ DataType::Int64 => get_dict_value::<Int64Type>(array, index)?,
+ DataType::UInt8 => get_dict_value::<UInt8Type>(array, index)?,
+ DataType::UInt16 => get_dict_value::<UInt16Type>(array, index)?,
+ DataType::UInt32 => get_dict_value::<UInt32Type>(array, index)?,
+ DataType::UInt64 => get_dict_value::<UInt64Type>(array, index)?,
_ => unreachable!("Invalid dictionary keys type: {:?}", key_type),
};
// was the value in the array non null?
match values_index {
- Some(values_index) => v.eq_array(values_array, values_index),
+ Some(values_index) => v.eq_array(values_array, values_index)?,
None => v.is_null(),
}
}
ScalarValue::Null => array.is_null(index),
- }
+ })
}
/// Estimate size if bytes including `Self`. For values with internal containers such as `String`
@@ -2785,6 +2864,11 @@ macro_rules! format_option {
}};
}
+// Implement Display trait for ScalarValue
+//
+// # Panics
+//
+// Panics if there is an error when creating a visual representation of columns via `arrow::util::pretty`
impl fmt::Display for ScalarValue {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
@@ -3031,7 +3115,9 @@ mod tests {
])]);
let sv = ScalarValue::List(Arc::new(arr));
- let actual_arr = sv.to_array_of_size(2);
+ let actual_arr = sv
+ .to_array_of_size(2)
+ .expect("Failed to convert to array of size");
let actual_list_arr = as_list_array(&actual_arr);
let arr = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
@@ -3238,8 +3324,8 @@ mod tests {
{
let scalar_result = left.add_checked(&right);
- let left_array = left.to_array();
- let right_array = right.to_array();
+ let left_array = left.to_array().expect("Failed to convert to array");
+ let right_array = right.to_array().expect("Failed to convert to array");
let arrow_left_array = left_array.as_primitive::<T>();
let arrow_right_array = right_array.as_primitive::<T>();
let arrow_result = kernels::numeric::add(arrow_left_array, arrow_right_array);
@@ -3287,22 +3373,30 @@ mod tests {
}
// decimal scalar to array
- let array = decimal_value.to_array();
+ let array = decimal_value
+ .to_array()
+ .expect("Failed to convert to array");
let array = as_decimal128_array(&array)?;
assert_eq!(1, array.len());
assert_eq!(DataType::Decimal128(10, 1), array.data_type().clone());
assert_eq!(123i128, array.value(0));
// decimal scalar to array with size
- let array = decimal_value.to_array_of_size(10);
+ let array = decimal_value
+ .to_array_of_size(10)
+ .expect("Failed to convert to array of size");
let array_decimal = as_decimal128_array(&array)?;
assert_eq!(10, array.len());
assert_eq!(DataType::Decimal128(10, 1), array.data_type().clone());
assert_eq!(123i128, array_decimal.value(0));
assert_eq!(123i128, array_decimal.value(9));
// test eq array
- assert!(decimal_value.eq_array(&array, 1));
- assert!(decimal_value.eq_array(&array, 5));
+ assert!(decimal_value
+ .eq_array(&array, 1)
+ .expect("Failed to compare arrays"));
+ assert!(decimal_value
+ .eq_array(&array, 5)
+ .expect("Failed to compare arrays"));
// test try from array
assert_eq!(
decimal_value,
@@ -3349,13 +3443,16 @@ mod tests {
assert!(ScalarValue::try_new_decimal128(1, 10, 2)
.unwrap()
- .eq_array(&array, 0));
+ .eq_array(&array, 0)
+ .expect("Failed to compare arrays"));
assert!(ScalarValue::try_new_decimal128(2, 10, 2)
.unwrap()
- .eq_array(&array, 1));
+ .eq_array(&array, 1)
+ .expect("Failed to compare arrays"));
assert!(ScalarValue::try_new_decimal128(3, 10, 2)
.unwrap()
- .eq_array(&array, 2));
+ .eq_array(&array, 2)
+ .expect("Failed to compare arrays"));
assert_eq!(
ScalarValue::Decimal128(None, 10, 2),
ScalarValue::try_from_array(&array, 3).unwrap()
@@ -3442,14 +3539,14 @@ mod tests {
#[test]
fn scalar_value_to_array_u64() -> Result<()> {
let value = ScalarValue::UInt64(Some(13u64));
- let array = value.to_array();
+ let array = value.to_array().expect("Failed to convert to array");
let array = as_uint64_array(&array)?;
assert_eq!(array.len(), 1);
assert!(!array.is_null(0));
assert_eq!(array.value(0), 13);
let value = ScalarValue::UInt64(None);
- let array = value.to_array();
+ let array = value.to_array().expect("Failed to convert to array");
let array = as_uint64_array(&array)?;
assert_eq!(array.len(), 1);
assert!(array.is_null(0));
@@ -3459,14 +3556,14 @@ mod tests {
#[test]
fn scalar_value_to_array_u32() -> Result<()> {
let value = ScalarValue::UInt32(Some(13u32));
- let array = value.to_array();
+ let array = value.to_array().expect("Failed to convert to array");
let array = as_uint32_array(&array)?;
assert_eq!(array.len(), 1);
assert!(!array.is_null(0));
assert_eq!(array.value(0), 13);
let value = ScalarValue::UInt32(None);
- let array = value.to_array();
+ let array = value.to_array().expect("Failed to convert to array");
let array = as_uint32_array(&array)?;
assert_eq!(array.len(), 1);
assert!(array.is_null(0));
@@ -4025,7 +4122,9 @@ mod tests {
for (index, scalar) in scalars.into_iter().enumerate() {
assert!(
- scalar.eq_array(&array, index),
+ scalar
+ .eq_array(&array, index)
+ .expect("Failed to compare arrays"),
"Expected {scalar:?} to be equal to {array:?} at index {index}"
);
@@ -4033,7 +4132,7 @@ mod tests {
for other_index in 0..array.len() {
if index != other_index {
assert!(
- !scalar.eq_array(&array, other_index),
+ !scalar.eq_array(&array, other_index).expect("Failed to compare arrays"),
"Expected {scalar:?} to be NOT equal to {array:?} at index {other_index}"
);
}
@@ -4136,7 +4235,9 @@ mod tests {
);
// Convert to length-2 array
- let array = scalar.to_array_of_size(2);
+ let array = scalar
+ .to_array_of_size(2)
+ .expect("Failed to convert to array of size");
let expected = Arc::new(StructArray::from(vec![
(
@@ -4570,7 +4671,7 @@ mod tests {
DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
);
- let array = scalar.to_array();
+ let array = scalar.to_array().expect("Failed to convert to array");
assert_eq!(array.len(), 1);
assert_eq!(
array.data_type(),
@@ -4607,7 +4708,7 @@ mod tests {
// mimics how casting work on scalar values by `casting` `scalar` to `desired_type`
fn check_scalar_cast(scalar: ScalarValue, desired_type: DataType) {
// convert from scalar --> Array to call cast
- let scalar_array = scalar.to_array();
+ let scalar_array = scalar.to_array().expect("Failed to convert to array");
// cast the actual value
let cast_array = kernels::cast::cast(&scalar_array, &desired_type).unwrap();
@@ -4616,7 +4717,9 @@ mod tests {
assert_eq!(cast_scalar.data_type(), desired_type);
// Some time later the "cast" scalar is turned back into an array:
- let array = cast_scalar.to_array_of_size(10);
+ let array = cast_scalar
+ .to_array_of_size(10)
+ .expect("Failed to convert to array of size");
// The datatype should be "Dictionary" but is actually Utf8!!!
assert_eq!(array.data_type(), &desired_type)
@@ -5065,7 +5168,8 @@ mod tests {
let arrays = scalars
.iter()
.map(ScalarValue::to_array)
- .collect::<Vec<_>>();
+ .collect::<Result<Vec<_>>>()
+ .expect("Failed to convert to array");
let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
let array = concat(&arrays).unwrap();
check_array(array);
diff --git a/datafusion/core/benches/scalar.rs b/datafusion/core/benches/scalar.rs
index 30f21a964d..540f7212e9 100644
--- a/datafusion/core/benches/scalar.rs
+++ b/datafusion/core/benches/scalar.rs
@@ -22,7 +22,15 @@ fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("to_array_of_size 100000", |b| {
let scalar = ScalarValue::Int32(Some(100));
- b.iter(|| assert_eq!(scalar.to_array_of_size(100000).null_count(), 0))
+ b.iter(|| {
+ assert_eq!(
+ scalar
+ .to_array_of_size(100000)
+ .expect("Failed to convert to array of size")
+ .null_count(),
+ 0
+ )
+ })
});
}
diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index d6a0add9b2..986e54ebbe 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -276,7 +276,10 @@ async fn prune_partitions(
// Applies `filter` to `batch` returning `None` on error
let do_filter = |filter| -> Option<ArrayRef> {
let expr = create_physical_expr(filter, &df_schema, &schema, &props).ok()?;
- Some(expr.evaluate(&batch).ok()?.into_array(partitions.len()))
+ expr.evaluate(&batch)
+ .ok()?
+ .into_array(partitions.len())
+ .ok()
};
//.Compute the conjunction of the filters, ignoring errors
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index 3efb0df9df..68e996391c 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -336,7 +336,7 @@ impl PartitionColumnProjector {
&mut self.key_buffer_cache,
partition_value.as_ref(),
file_batch.num_rows(),
- ),
+ )?,
)
}
@@ -396,11 +396,11 @@ fn create_dict_array<T>(
dict_val: &ScalarValue,
len: usize,
data_type: DataType,
-) -> ArrayRef
+) -> Result<ArrayRef>
where
T: ArrowNativeType,
{
- let dict_vals = dict_val.to_array();
+ let dict_vals = dict_val.to_array()?;
let sliced_key_buffer = buffer_gen.get_buffer(len);
@@ -409,16 +409,16 @@ where
.len(len)
.add_buffer(sliced_key_buffer);
builder = builder.add_child_data(dict_vals.to_data());
- Arc::new(DictionaryArray::<UInt16Type>::from(
+ Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
builder.build().unwrap(),
- ))
+ )))
}
fn create_output_array(
key_buffer_cache: &mut ZeroBufferGenerators,
val: &ScalarValue,
len: usize,
-) -> ArrayRef {
+) -> Result<ArrayRef> {
if let ScalarValue::Dictionary(key_type, dict_val) = &val {
match key_type.as_ref() {
DataType::Int8 => {
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
index 0f4b09caed..5fe0a0a13a 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
@@ -126,7 +126,7 @@ impl ArrowPredicate for DatafusionArrowPredicate {
match self
.physical_expr
.evaluate(&batch)
- .map(|v| v.into_array(batch.num_rows()))
+ .and_then(|v| v.into_array(batch.num_rows()))
{
Ok(array) => {
let bool_arr = as_boolean_array(&array)?.clone();
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 91bceed916..dc6ef50bc1 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -405,7 +405,7 @@ macro_rules! get_min_max_values {
.flatten()
// column either didn't have statistics at all or didn't have min/max values
.or_else(|| Some(null_scalar.clone()))
- .map(|s| s.to_array())
+ .and_then(|s| s.to_array().ok())
}}
}
@@ -425,7 +425,7 @@ macro_rules! get_null_count_values {
},
);
- Some(value.to_array())
+ value.to_array().ok()
}};
}
diff --git a/datafusion/expr/src/columnar_value.rs b/datafusion/expr/src/columnar_value.rs
index c72aae69c8..7a28839281 100644
--- a/datafusion/expr/src/columnar_value.rs
+++ b/datafusion/expr/src/columnar_value.rs
@@ -20,7 +20,7 @@
use arrow::array::ArrayRef;
use arrow::array::NullArray;
use arrow::datatypes::DataType;
-use datafusion_common::ScalarValue;
+use datafusion_common::{Result, ScalarValue};
use std::sync::Arc;
/// Represents the result of evaluating an expression: either a single
@@ -47,11 +47,15 @@ impl ColumnarValue {
/// Convert a columnar value into an ArrayRef. [`Self::Scalar`] is
/// converted by repeating the same scalar multiple times.
- pub fn into_array(self, num_rows: usize) -> ArrayRef {
- match self {
+ ///
+ /// # Errors
+ ///
+ /// Errors if `self` is a Scalar that fails to be converted into an array of size
+ pub fn into_array(self, num_rows: usize) -> Result<ArrayRef> {
+ Ok(match self {
ColumnarValue::Array(array) => array,
- ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
- }
+ ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
+ })
}
/// null columnar values are implemented as a null array in order to pass batch
diff --git a/datafusion/expr/src/window_state.rs b/datafusion/expr/src/window_state.rs
index 4ea9ecea5f..de88396d9b 100644
--- a/datafusion/expr/src/window_state.rs
+++ b/datafusion/expr/src/window_state.rs
@@ -98,7 +98,7 @@ impl WindowAggState {
}
pub fn new(out_type: &DataType) -> Result<Self> {
- let empty_out_col = ScalarValue::try_from(out_type)?.to_array_of_size(0);
+ let empty_out_col = ScalarValue::try_from(out_type)?.to_array_of_size(0)?;
Ok(Self {
window_frame_range: Range { start: 0, end: 0 },
window_frame_ctx: None,
diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
index 468981a5fb..907c12b7af 100644
--- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
+++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
@@ -1089,8 +1089,12 @@ mod tests {
// Verify that calling the arrow
// cast kernel yields the same results
// input array
- let literal_array = literal.to_array_of_size(1);
- let expected_array = expected_value.to_array_of_size(1);
+ let literal_array = literal
+ .to_array_of_size(1)
+ .expect("Failed to convert to array of size");
+ let expected_array = expected_value
+ .to_array_of_size(1)
+ .expect("Failed to convert to array of size");
let cast_array = cast_with_options(
&literal_array,
&target_type,
diff --git a/datafusion/physical-expr/src/aggregate/correlation.rs b/datafusion/physical-expr/src/aggregate/correlation.rs
index 475bfa4ce0..61f2db5c8e 100644
--- a/datafusion/physical-expr/src/aggregate/correlation.rs
+++ b/datafusion/physical-expr/src/aggregate/correlation.rs
@@ -505,13 +505,17 @@ mod tests {
let values1 = expr1
.iter()
- .map(|e| e.evaluate(batch1))
- .map(|r| r.map(|v| v.into_array(batch1.num_rows())))
+ .map(|e| {
+ e.evaluate(batch1)
+ .and_then(|v| v.into_array(batch1.num_rows()))
+ })
.collect::<Result<Vec<_>>>()?;
let values2 = expr2
.iter()
- .map(|e| e.evaluate(batch2))
- .map(|r| r.map(|v| v.into_array(batch2.num_rows())))
+ .map(|e| {
+ e.evaluate(batch2)
+ .and_then(|v| v.into_array(batch2.num_rows()))
+ })
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
diff --git a/datafusion/physical-expr/src/aggregate/covariance.rs b/datafusion/physical-expr/src/aggregate/covariance.rs
index 5e589d4e39..0f838eb6fa 100644
--- a/datafusion/physical-expr/src/aggregate/covariance.rs
+++ b/datafusion/physical-expr/src/aggregate/covariance.rs
@@ -754,13 +754,17 @@ mod tests {
let values1 = expr1
.iter()
- .map(|e| e.evaluate(batch1))
- .map(|r| r.map(|v| v.into_array(batch1.num_rows())))
+ .map(|e| {
+ e.evaluate(batch1)
+ .and_then(|v| v.into_array(batch1.num_rows()))
+ })
.collect::<Result<Vec<_>>>()?;
let values2 = expr2
.iter()
- .map(|e| e.evaluate(batch2))
- .map(|r| r.map(|v| v.into_array(batch2.num_rows())))
+ .map(|e| {
+ e.evaluate(batch2)
+ .and_then(|v| v.into_array(batch2.num_rows()))
+ })
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs
index a4e0a6dc49..0dc27dede8 100644
--- a/datafusion/physical-expr/src/aggregate/first_last.rs
+++ b/datafusion/physical-expr/src/aggregate/first_last.rs
@@ -587,7 +587,10 @@ mod tests {
let mut states = vec![];
for idx in 0..state1.len() {
- states.push(concat(&[&state1[idx].to_array(), &state2[idx].to_array()])?);
+ states.push(concat(&[
+ &state1[idx].to_array()?,
+ &state2[idx].to_array()?,
+ ])?);
}
let mut first_accumulator =
@@ -614,7 +617,10 @@ mod tests {
let mut states = vec![];
for idx in 0..state1.len() {
- states.push(concat(&[&state1[idx].to_array(), &state2[idx].to_array()])?);
+ states.push(concat(&[
+ &state1[idx].to_array()?,
+ &state2[idx].to_array()?,
+ ])?);
}
let mut last_accumulator =
diff --git a/datafusion/physical-expr/src/aggregate/stddev.rs b/datafusion/physical-expr/src/aggregate/stddev.rs
index 330507d6ff..64e19ef502 100644
--- a/datafusion/physical-expr/src/aggregate/stddev.rs
+++ b/datafusion/physical-expr/src/aggregate/stddev.rs
@@ -445,13 +445,17 @@ mod tests {
let values1 = expr1
.iter()
- .map(|e| e.evaluate(batch1))
- .map(|r| r.map(|v| v.into_array(batch1.num_rows())))
+ .map(|e| {
+ e.evaluate(batch1)
+ .and_then(|v| v.into_array(batch1.num_rows()))
+ })
.collect::<Result<Vec<_>>>()?;
let values2 = expr2
.iter()
- .map(|e| e.evaluate(batch2))
- .map(|r| r.map(|v| v.into_array(batch2.num_rows())))
+ .map(|e| {
+ e.evaluate(batch2)
+ .and_then(|v| v.into_array(batch2.num_rows()))
+ })
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
diff --git a/datafusion/physical-expr/src/aggregate/utils.rs b/datafusion/physical-expr/src/aggregate/utils.rs
index da3a527132..e5421ef5ab 100644
--- a/datafusion/physical-expr/src/aggregate/utils.rs
+++ b/datafusion/physical-expr/src/aggregate/utils.rs
@@ -36,11 +36,11 @@ use std::sync::Arc;
pub fn get_accum_scalar_values_as_arrays(
accum: &dyn Accumulator,
) -> Result<Vec<ArrayRef>> {
- Ok(accum
+ accum
.state()?
.iter()
.map(|s| s.to_array_of_size(1))
- .collect::<Vec<_>>())
+ .collect::<Result<Vec<_>>>()
}
/// Computes averages for `Decimal128`/`Decimal256` values, checking for overflow
diff --git a/datafusion/physical-expr/src/aggregate/variance.rs b/datafusion/physical-expr/src/aggregate/variance.rs
index a720dd833a..d82c5ad562 100644
--- a/datafusion/physical-expr/src/aggregate/variance.rs
+++ b/datafusion/physical-expr/src/aggregate/variance.rs
@@ -519,13 +519,17 @@ mod tests {
let values1 = expr1
.iter()
- .map(|e| e.evaluate(batch1))
- .map(|r| r.map(|v| v.into_array(batch1.num_rows())))
+ .map(|e| {
+ e.evaluate(batch1)
+ .and_then(|v| v.into_array(batch1.num_rows()))
+ })
.collect::<Result<Vec<_>>>()?;
let values2 = expr2
.iter()
- .map(|e| e.evaluate(batch2))
- .map(|r| r.map(|v| v.into_array(batch2.num_rows())))
+ .map(|e| {
+ e.evaluate(batch2)
+ .and_then(|v| v.into_array(batch2.num_rows()))
+ })
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
diff --git a/datafusion/physical-expr/src/conditional_expressions.rs b/datafusion/physical-expr/src/conditional_expressions.rs
index 37adb2d71c..a9a25ffe2e 100644
--- a/datafusion/physical-expr/src/conditional_expressions.rs
+++ b/datafusion/physical-expr/src/conditional_expressions.rs
@@ -54,7 +54,7 @@ pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if value.is_null() {
continue;
} else {
- let last_value = value.to_array_of_size(size);
+ let last_value = value.to_array_of_size(size)?;
current_value =
zip(&remainder, &last_value, current_value.as_ref())?;
break;
diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs
index 3b61e7f48d..5b597de78a 100644
--- a/datafusion/physical-expr/src/datetime_expressions.rs
+++ b/datafusion/physical-expr/src/datetime_expressions.rs
@@ -852,7 +852,7 @@ pub fn date_part(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let array = match array {
ColumnarValue::Array(array) => array.clone(),
- ColumnarValue::Scalar(scalar) => scalar.to_array(),
+ ColumnarValue::Scalar(scalar) => scalar.to_array()?,
};
let arr = match date_part.to_lowercase().as_str() {
diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs
index 63fa98011f..0a05a479e5 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -304,8 +304,8 @@ impl PhysicalExpr for BinaryExpr {
// if both arrays or both literals - extract arrays and continue execution
let (left, right) = (
- lhs.into_array(batch.num_rows()),
- rhs.into_array(batch.num_rows()),
+ lhs.into_array(batch.num_rows())?,
+ rhs.into_array(batch.num_rows())?,
);
self.evaluate_with_resolved_args(left, &left_data_type, right, &right_data_type)
.map(ColumnarValue::Array)
@@ -597,7 +597,10 @@ mod tests {
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?;
- let result = lt.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = lt
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
assert_eq!(result.len(), 5);
let expected = [false, false, true, true, true];
@@ -641,7 +644,10 @@ mod tests {
assert_eq!("a@0 < b@1 OR a@0 = b@1", format!("{expr}"));
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
assert_eq!(result.len(), 5);
let expected = [true, true, false, true, false];
@@ -685,7 +691,7 @@ mod tests {
assert_eq!(expression.data_type(&schema)?, $C_TYPE);
// compute
- let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expression.evaluate(&batch)?.into_array(batch.num_rows()).expect("Failed to convert to array");
// verify that the array's data_type is correct
assert_eq!(*result.data_type(), $C_TYPE);
@@ -2138,7 +2144,10 @@ mod tests {
let arithmetic_op =
binary_op(col("a", &schema)?, op, col("b", &schema)?, &schema)?;
let batch = RecordBatch::try_new(schema, data)?;
- let result = arithmetic_op.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = arithmetic_op
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
assert_eq!(result.as_ref(), &expected);
Ok(())
@@ -2154,7 +2163,10 @@ mod tests {
let lit = Arc::new(Literal::new(literal));
let arithmetic_op = binary_op(col("a", &schema)?, op, lit, &schema)?;
let batch = RecordBatch::try_new(schema, data)?;
- let result = arithmetic_op.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = arithmetic_op
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
assert_eq!(&result, &expected);
Ok(())
@@ -2170,7 +2182,10 @@ mod tests {
let op = binary_op(col("a", schema)?, op, col("b", schema)?, schema)?;
let data: Vec<ArrayRef> = vec![left.clone(), right.clone()];
let batch = RecordBatch::try_new(schema.clone(), data)?;
- let result = op.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = op
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
assert_eq!(result.as_ref(), &expected);
Ok(())
@@ -2187,7 +2202,10 @@ mod tests {
let scalar = lit(scalar.clone());
let op = binary_op(scalar, op, col("a", schema)?, schema)?;
let batch = RecordBatch::try_new(Arc::clone(schema), vec![Arc::clone(arr)])?;
- let result = op.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = op
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
assert_eq!(result.as_ref(), expected);
Ok(())
@@ -2204,7 +2222,10 @@ mod tests {
let scalar = lit(scalar.clone());
let op = binary_op(col("a", schema)?, op, scalar, schema)?;
let batch = RecordBatch::try_new(Arc::clone(schema), vec![Arc::clone(arr)])?;
- let result = op.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = op
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
assert_eq!(result.as_ref(), expected);
Ok(())
@@ -2776,7 +2797,8 @@ mod tests {
let result = expr
.evaluate(&batch)
.expect("evaluation")
- .into_array(batch.num_rows());
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
let expected: Int32Array = input
.into_iter()
@@ -3255,7 +3277,10 @@ mod tests {
let arithmetic_op = binary_op(col("a", schema)?, op, col("b", schema)?, schema)?;
let data: Vec<ArrayRef> = vec![left.clone(), right.clone()];
let batch = RecordBatch::try_new(schema.clone(), data)?;
- let result = arithmetic_op.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = arithmetic_op
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
assert_eq!(result.as_ref(), expected.as_ref());
Ok(())
diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs
index a2395c4a0c..5fcfd61d90 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -126,7 +126,7 @@ impl CaseExpr {
let return_type = self.data_type(&batch.schema())?;
let expr = self.expr.as_ref().unwrap();
let base_value = expr.evaluate(batch)?;
- let base_value = base_value.into_array(batch.num_rows());
+ let base_value = base_value.into_array(batch.num_rows())?;
let base_nulls = is_null(base_value.as_ref())?;
// start with nulls as default output
@@ -137,7 +137,7 @@ impl CaseExpr {
let when_value = self.when_then_expr[i]
.0
.evaluate_selection(batch, &remainder)?;
- let when_value = when_value.into_array(batch.num_rows());
+ let when_value = when_value.into_array(batch.num_rows())?;
// build boolean array representing which rows match the "when" value
let when_match = eq(&when_value, &base_value)?;
// Treat nulls as false
@@ -153,7 +153,7 @@ impl CaseExpr {
ColumnarValue::Scalar(value) if value.is_null() => {
new_null_array(&return_type, batch.num_rows())
}
- _ => then_value.into_array(batch.num_rows()),
+ _ => then_value.into_array(batch.num_rows())?,
};
current_value =
@@ -170,7 +170,7 @@ impl CaseExpr {
remainder = or(&base_nulls, &remainder)?;
let else_ = expr
.evaluate_selection(batch, &remainder)?
- .into_array(batch.num_rows());
+ .into_array(batch.num_rows())?;
current_value = zip(&remainder, else_.as_ref(), current_value.as_ref())?;
}
@@ -194,7 +194,7 @@ impl CaseExpr {
let when_value = self.when_then_expr[i]
.0
.evaluate_selection(batch, &remainder)?;
- let when_value = when_value.into_array(batch.num_rows());
+ let when_value = when_value.into_array(batch.num_rows())?;
let when_value = as_boolean_array(&when_value).map_err(|e| {
DataFusionError::Context(
"WHEN expression did not return a BooleanArray".to_string(),
@@ -214,7 +214,7 @@ impl CaseExpr {
ColumnarValue::Scalar(value) if value.is_null() => {
new_null_array(&return_type, batch.num_rows())
}
- _ => then_value.into_array(batch.num_rows()),
+ _ => then_value.into_array(batch.num_rows())?,
};
current_value =
@@ -231,7 +231,7 @@ impl CaseExpr {
.unwrap_or_else(|_| e.clone());
let else_ = expr
.evaluate_selection(batch, &remainder)?
- .into_array(batch.num_rows());
+ .into_array(batch.num_rows())?;
current_value = zip(&remainder, else_.as_ref(), current_value.as_ref())?;
}
@@ -425,7 +425,10 @@ mod tests {
None,
schema.as_ref(),
)?;
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
let result = as_int32_array(&result)?;
let expected = &Int32Array::from(vec![Some(123), None, None, Some(456)]);
@@ -453,7 +456,10 @@ mod tests {
Some(else_value),
schema.as_ref(),
)?;
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
let result = as_int32_array(&result)?;
let expected =
@@ -485,7 +491,10 @@ mod tests {
Some(else_value),
schema.as_ref(),
)?;
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
let result =
as_float64_array(&result).expect("failed to downcast to Float64Array");
@@ -523,7 +532,10 @@ mod tests {
None,
schema.as_ref(),
)?;
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
let result = as_int32_array(&result)?;
let expected = &Int32Array::from(vec![Some(123), None, None, Some(456)]);
@@ -551,7 +563,10 @@ mod tests {
Some(else_value),
schema.as_ref(),
)?;
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
let result = as_int32_array(&result)?;
let expected =
@@ -583,7 +598,10 @@ mod tests {
Some(x),
schema.as_ref(),
)?;
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
let result =
as_float64_array(&result).expect("failed to downcast to Float64Array");
@@ -629,7 +647,10 @@ mod tests {
Some(else_value),
schema.as_ref(),
)?;
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
let result = as_int32_array(&result)?;
let expected =
@@ -661,7 +682,10 @@ mod tests {
Some(else_value),
schema.as_ref(),
)?;
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
let result =
as_float64_array(&result).expect("failed to downcast to Float64Array");
@@ -693,7 +717,10 @@ mod tests {
None,
schema.as_ref(),
)?;
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
let result =
as_float64_array(&result).expect("failed to downcast to Float64Array");
@@ -721,7 +748,10 @@ mod tests {
None,
schema.as_ref(),
)?;
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
let result =
as_float64_array(&result).expect("failed to downcast to Float64Array");
diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs
index 5d56af3646..780e042156 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -178,7 +178,7 @@ pub fn cast_column(
kernels::cast::cast_with_options(array, cast_type, &cast_options)?,
)),
ColumnarValue::Scalar(scalar) => {
- let scalar_array = scalar.to_array();
+ let scalar_array = scalar.to_array()?;
let cast_array = kernels::cast::cast_with_options(
&scalar_array,
cast_type,
@@ -263,7 +263,10 @@ mod tests {
assert_eq!(expression.data_type(&schema)?, $TYPE);
// compute
- let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expression
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
// verify that the array's data_type is correct
assert_eq!(*result.data_type(), $TYPE);
@@ -312,7 +315,10 @@ mod tests {
assert_eq!(expression.data_type(&schema)?, $TYPE);
// compute
- let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expression
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
// verify that the array's data_type is correct
assert_eq!(*result.data_type(), $TYPE);
diff --git a/datafusion/physical-expr/src/expressions/datum.rs b/datafusion/physical-expr/src/expressions/datum.rs
index f57cbbd4ff..2bb79922cf 100644
--- a/datafusion/physical-expr/src/expressions/datum.rs
+++ b/datafusion/physical-expr/src/expressions/datum.rs
@@ -34,14 +34,14 @@ pub(crate) fn apply(
(ColumnarValue::Array(left), ColumnarValue::Array(right)) => {
Ok(ColumnarValue::Array(f(&left.as_ref(), &right.as_ref())?))
}
- (ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => {
- Ok(ColumnarValue::Array(f(&left.to_scalar(), &right.as_ref())?))
- }
- (ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => {
- Ok(ColumnarValue::Array(f(&left.as_ref(), &right.to_scalar())?))
- }
+ (ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => Ok(
+ ColumnarValue::Array(f(&left.to_scalar()?, &right.as_ref())?),
+ ),
+ (ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => Ok(
+ ColumnarValue::Array(f(&left.as_ref(), &right.to_scalar()?)?),
+ ),
(ColumnarValue::Scalar(left), ColumnarValue::Scalar(right)) => {
- let array = f(&left.to_scalar(), &right.to_scalar())?;
+ let array = f(&left.to_scalar()?, &right.to_scalar()?)?;
let scalar = ScalarValue::try_from_array(array.as_ref(), 0)?;
Ok(ColumnarValue::Scalar(scalar))
}
diff --git a/datafusion/physical-expr/src/expressions/get_indexed_field.rs b/datafusion/physical-expr/src/expressions/get_indexed_field.rs
index df79e28358..7d5f16c454 100644
--- a/datafusion/physical-expr/src/expressions/get_indexed_field.rs
+++ b/datafusion/physical-expr/src/expressions/get_indexed_field.rs
@@ -183,7 +183,7 @@ impl PhysicalExpr for GetIndexedFieldExpr {
}
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
- let array = self.arg.evaluate(batch)?.into_array(batch.num_rows());
+ let array = self.arg.evaluate(batch)?.into_array(batch.num_rows())?;
match &self.field {
GetFieldAccessExpr::NamedStructField{name} => match (array.data_type(), name) {
(DataType::Map(_, _), ScalarValue::Utf8(Some(k))) => {
@@ -210,7 +210,7 @@ impl PhysicalExpr for GetIndexedFieldExpr {
with utf8 indexes. Tried {dt:?} with {name:?} index"),
},
GetFieldAccessExpr::ListIndex{key} => {
- let key = key.evaluate(batch)?.into_array(batch.num_rows());
+ let key = key.evaluate(batch)?.into_array(batch.num_rows())?;
match (array.data_type(), key.data_type()) {
(DataType::List(_), DataType::Int64) => Ok(ColumnarValue::Array(array_element(&[
array, key
@@ -224,8 +224,8 @@ impl PhysicalExpr for GetIndexedFieldExpr {
}
},
GetFieldAccessExpr::ListRange{start, stop} => {
- let start = start.evaluate(batch)?.into_array(batch.num_rows());
- let stop = stop.evaluate(batch)?.into_array(batch.num_rows());
+ let start = start.evaluate(batch)?.into_array(batch.num_rows())?;
+ let stop = stop.evaluate(batch)?.into_array(batch.num_rows())?;
match (array.data_type(), start.data_type(), stop.data_type()) {
(DataType::List(_), DataType::Int64, DataType::Int64) => Ok(ColumnarValue::Array(array_slice(&[
array, start, stop
@@ -326,7 +326,10 @@ mod tests {
// only one row should be processed
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)])?;
let expr = Arc::new(GetIndexedFieldExpr::new_field(expr, "a"));
- let result = expr.evaluate(&batch)?.into_array(1);
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(1)
+ .expect("Failed to convert to array");
let result =
as_boolean_array(&result).expect("failed to downcast to BooleanArray");
assert_eq!(boolean, result.clone());
@@ -383,7 +386,10 @@ mod tests {
vec![Arc::new(list_col), Arc::new(key_col)],
)?;
let expr = Arc::new(GetIndexedFieldExpr::new_index(expr, key));
- let result = expr.evaluate(&batch)?.into_array(1);
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(1)
+ .expect("Failed to convert to array");
let result = as_string_array(&result).expect("failed to downcast to ListArray");
let expected = StringArray::from(expected_list);
assert_eq!(expected, result.clone());
@@ -419,7 +425,10 @@ mod tests {
vec![Arc::new(list_col), Arc::new(start_col), Arc::new(stop_col)],
)?;
let expr = Arc::new(GetIndexedFieldExpr::new_range(expr, start, stop));
- let result = expr.evaluate(&batch)?.into_array(1);
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(1)
+ .expect("Failed to convert to array");
let result = as_list_array(&result).expect("failed to downcast to ListArray");
let (expected, _, _) =
build_list_arguments(expected_list, vec![None], vec![None]);
@@ -440,7 +449,10 @@ mod tests {
vec![Arc::new(list_builder.finish()), key_array],
)?;
let expr = Arc::new(GetIndexedFieldExpr::new_index(expr, key));
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
assert!(result.is_null(0));
Ok(())
}
@@ -461,7 +473,10 @@ mod tests {
vec![Arc::new(list_builder.finish()), Arc::new(key_array)],
)?;
let expr = Arc::new(GetIndexedFieldExpr::new_index(expr, key));
- let result = expr.evaluate(&batch)?.into_array(1);
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(1)
+ .expect("Failed to convert to array");
assert!(result.is_null(0));
Ok(())
}
diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs
index 8d55fb70bd..625b01ec9a 100644
--- a/datafusion/physical-expr/src/expressions/in_list.rs
+++ b/datafusion/physical-expr/src/expressions/in_list.rs
@@ -351,15 +351,15 @@ impl PhysicalExpr for InListExpr {
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let value = self.expr.evaluate(batch)?;
let r = match &self.static_filter {
- Some(f) => f.contains(value.into_array(1).as_ref(), self.negated)?,
+ Some(f) => f.contains(value.into_array(1)?.as_ref(), self.negated)?,
None => {
- let value = value.into_array(batch.num_rows());
+ let value = value.into_array(batch.num_rows())?;
let found = self.list.iter().map(|expr| expr.evaluate(batch)).try_fold(
BooleanArray::new(BooleanBuffer::new_unset(batch.num_rows()), None),
|result, expr| -> Result<BooleanArray> {
Ok(or_kleene(
&result,
- &eq(&value, &expr?.into_array(batch.num_rows()))?,
+ &eq(&value, &expr?.into_array(batch.num_rows())?)?,
)?)
},
)?;
@@ -501,7 +501,10 @@ mod tests {
($BATCH:expr, $LIST:expr, $NEGATED:expr, $EXPECTED:expr, $COL:expr, $SCHEMA:expr) => {{
let (cast_expr, cast_list_exprs) = in_list_cast($COL, $LIST, $SCHEMA)?;
let expr = in_list(cast_expr, cast_list_exprs, $NEGATED, $SCHEMA).unwrap();
- let result = expr.evaluate(&$BATCH)?.into_array($BATCH.num_rows());
+ let result = expr
+ .evaluate(&$BATCH)?
+ .into_array($BATCH.num_rows())
+ .expect("Failed to convert to array");
let result =
as_boolean_array(&result).expect("failed to downcast to BooleanArray");
let expected = &BooleanArray::from($EXPECTED);
diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs
index da717a517f..2e6a2bec9c 100644
--- a/datafusion/physical-expr/src/expressions/is_not_null.rs
+++ b/datafusion/physical-expr/src/expressions/is_not_null.rs
@@ -132,7 +132,10 @@ mod tests {
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)])?;
// expression: "a is not null"
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
let result =
as_boolean_array(&result).expect("failed to downcast to BooleanArray");
diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs
index ee7897edd4..3ad4058dd6 100644
--- a/datafusion/physical-expr/src/expressions/is_null.rs
+++ b/datafusion/physical-expr/src/expressions/is_null.rs
@@ -134,7 +134,10 @@ mod tests {
let expr = is_null(col("a", &schema)?).unwrap();
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)])?;
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
let result =
as_boolean_array(&result).expect("failed to downcast to BooleanArray");
diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs
index e833eabbff..37452e2784 100644
--- a/datafusion/physical-expr/src/expressions/like.rs
+++ b/datafusion/physical-expr/src/expressions/like.rs
@@ -201,7 +201,10 @@ mod test {
)?;
// compute
- let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expression
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
let result =
as_boolean_array(&result).expect("failed to downcast to BooleanArray");
let expected = &BooleanArray::from($VEC);
diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs
index 91cb23d586..cd3b51f091 100644
--- a/datafusion/physical-expr/src/expressions/literal.rs
+++ b/datafusion/physical-expr/src/expressions/literal.rs
@@ -131,7 +131,10 @@ mod tests {
let literal_expr = lit(42i32);
assert_eq!("42", format!("{literal_expr}"));
- let literal_array = literal_expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let literal_array = literal_expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
let literal_array = as_int32_array(&literal_array)?;
// note that the contents of the literal array are unrelated to the batch contents except for the length of the array
diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs
index c44b3cf01d..1919cac979 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -247,8 +247,10 @@ pub(crate) mod tests {
let expr = agg.expressions();
let values = expr
.iter()
- .map(|e| e.evaluate(batch))
- .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+ .map(|e| {
+ e.evaluate(batch)
+ .and_then(|v| v.into_array(batch.num_rows()))
+ })
.collect::<Result<Vec<_>>>()?;
accum.update_batch(&values)?;
accum.evaluate()
@@ -262,8 +264,10 @@ pub(crate) mod tests {
let expr = agg.expressions();
let values = expr
.iter()
- .map(|e| e.evaluate(batch))
- .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+ .map(|e| {
+ e.evaluate(batch)
+ .and_then(|v| v.into_array(batch.num_rows()))
+ })
.collect::<Result<Vec<_>>>()?;
let indices = vec![0; batch.num_rows()];
accum.update_batch(&values, &indices, None, 1)?;
diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs
index 86b000e76a..65b3479411 100644
--- a/datafusion/physical-expr/src/expressions/negative.rs
+++ b/datafusion/physical-expr/src/expressions/negative.rs
@@ -195,7 +195,7 @@ mod tests {
let expected = &paste!{[<$DATA_TY Array>]::from(arr_expected)};
let batch =
RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(input)])?;
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows()).expect("Failed to convert to array");
let result =
as_primitive_array(&result).expect(format!("failed to downcast to {:?}Array", $DATA_TY).as_str());
assert_eq!(result, expected);
diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs
index c154fad100..4ceccc6932 100644
--- a/datafusion/physical-expr/src/expressions/not.rs
+++ b/datafusion/physical-expr/src/expressions/not.rs
@@ -150,7 +150,10 @@ mod tests {
let batch =
RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(input)])?;
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
let result =
as_boolean_array(&result).expect("failed to downcast to BooleanArray");
assert_eq!(result, expected);
diff --git a/datafusion/physical-expr/src/expressions/nullif.rs b/datafusion/physical-expr/src/expressions/nullif.rs
index 7bbe9d73d4..252bd10c3e 100644
--- a/datafusion/physical-expr/src/expressions/nullif.rs
+++ b/datafusion/physical-expr/src/expressions/nullif.rs
@@ -37,7 +37,7 @@ pub fn nullif_func(args: &[ColumnarValue]) -> Result<ColumnarValue> {
match (lhs, rhs) {
(ColumnarValue::Array(lhs), ColumnarValue::Scalar(rhs)) => {
- let rhs = rhs.to_scalar();
+ let rhs = rhs.to_scalar()?;
let array = nullif(lhs, &eq(&lhs, &rhs)?)?;
Ok(ColumnarValue::Array(array))
@@ -47,7 +47,7 @@ pub fn nullif_func(args: &[ColumnarValue]) -> Result<ColumnarValue> {
Ok(ColumnarValue::Array(array))
}
(ColumnarValue::Scalar(lhs), ColumnarValue::Array(rhs)) => {
- let lhs = lhs.to_array_of_size(rhs.len());
+ let lhs = lhs.to_array_of_size(rhs.len())?;
let array = nullif(&lhs, &eq(&lhs, &rhs)?)?;
Ok(ColumnarValue::Array(array))
}
@@ -89,7 +89,7 @@ mod tests {
let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));
let result = nullif_func(&[a, lit_array])?;
- let result = result.into_array(0);
+ let result = result.into_array(0).expect("Failed to convert to array");
let expected = Arc::new(Int32Array::from(vec![
Some(1),
@@ -115,7 +115,7 @@ mod tests {
let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(1i32)));
let result = nullif_func(&[a, lit_array])?;
- let result = result.into_array(0);
+ let result = result.into_array(0).expect("Failed to convert to array");
let expected = Arc::new(Int32Array::from(vec![
None,
@@ -140,7 +140,7 @@ mod tests {
let lit_array = ColumnarValue::Scalar(ScalarValue::Boolean(Some(false)));
let result = nullif_func(&[a, lit_array])?;
- let result = result.into_array(0);
+ let result = result.into_array(0).expect("Failed to convert to array");
let expected =
Arc::new(BooleanArray::from(vec![Some(true), None, None])) as ArrayRef;
@@ -157,7 +157,7 @@ mod tests {
let lit_array = ColumnarValue::Scalar(ScalarValue::Utf8(Some("bar".to_string())));
let result = nullif_func(&[a, lit_array])?;
- let result = result.into_array(0);
+ let result = result.into_array(0).expect("Failed to convert to array");
let expected = Arc::new(StringArray::from(vec![
Some("foo"),
@@ -178,7 +178,7 @@ mod tests {
let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));
let result = nullif_func(&[lit_array, a])?;
- let result = result.into_array(0);
+ let result = result.into_array(0).expect("Failed to convert to array");
let expected = Arc::new(Int32Array::from(vec![
Some(2),
@@ -198,7 +198,7 @@ mod tests {
let b_eq = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));
let result_eq = nullif_func(&[a_eq, b_eq])?;
- let result_eq = result_eq.into_array(1);
+ let result_eq = result_eq.into_array(1).expect("Failed to convert to array");
let expected_eq = Arc::new(Int32Array::from(vec![None])) as ArrayRef;
@@ -208,7 +208,9 @@ mod tests {
let b_neq = ColumnarValue::Scalar(ScalarValue::Int32(Some(1i32)));
let result_neq = nullif_func(&[a_neq, b_neq])?;
- let result_neq = result_neq.into_array(1);
+ let result_neq = result_neq
+ .into_array(1)
+ .expect("Failed to convert to array");
let expected_neq = Arc::new(Int32Array::from(vec![Some(2i32)])) as ArrayRef;
assert_eq!(expected_neq.as_ref(), result_neq.as_ref());
diff --git a/datafusion/physical-expr/src/expressions/try_cast.rs b/datafusion/physical-expr/src/expressions/try_cast.rs
index cba026c565..dea7f9f86a 100644
--- a/datafusion/physical-expr/src/expressions/try_cast.rs
+++ b/datafusion/physical-expr/src/expressions/try_cast.rs
@@ -89,7 +89,7 @@ impl PhysicalExpr for TryCastExpr {
Ok(ColumnarValue::Array(cast))
}
ColumnarValue::Scalar(scalar) => {
- let array = scalar.to_array();
+ let array = scalar.to_array()?;
let cast_array = cast_with_options(&array, &self.cast_type, &options)?;
let cast_scalar = ScalarValue::try_from_array(&cast_array, 0)?;
Ok(ColumnarValue::Scalar(cast_scalar))
@@ -187,7 +187,10 @@ mod tests {
assert_eq!(expression.data_type(&schema)?, $TYPE);
// compute
- let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expression
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
// verify that the array's data_type is correct
assert_eq!(*result.data_type(), $TYPE);
@@ -235,7 +238,10 @@ mod tests {
assert_eq!(expression.data_type(&schema)?, $TYPE);
// compute
- let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expression
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
// verify that the array's data_type is correct
assert_eq!(*result.data_type(), $TYPE);
diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs
index c973232c75..9185ade313 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -239,7 +239,7 @@ where
};
arg.clone().into_array(expansion_len)
})
- .collect::<Vec<ArrayRef>>();
+ .collect::<Result<Vec<_>>>()?;
let result = (inner)(&args);
@@ -937,7 +937,7 @@ mod tests {
match expected {
Ok(expected) => {
let result = expr.evaluate(&batch)?;
- let result = result.into_array(batch.num_rows());
+ let result = result.into_array(batch.num_rows()).expect("Failed to convert to array");
let result = result.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
// value is correct
@@ -2906,7 +2906,10 @@ mod tests {
// evaluate works
let batch = RecordBatch::try_new(Arc::new(schema.clone()), columns)?;
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
// downcast works
let result = as_list_array(&result)?;
@@ -2945,7 +2948,10 @@ mod tests {
// evaluate works
let batch = RecordBatch::try_new(Arc::new(schema.clone()), columns)?;
- let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
+ let result = expr
+ .evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
// downcast works
let result = as_list_array(&result)?;
@@ -3017,8 +3023,11 @@ mod tests {
let adapter_func = make_scalar_function(dummy_function);
let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
- let array_arg =
- ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+ let array_arg = ColumnarValue::Array(
+ ScalarValue::Int64(Some(1))
+ .to_array_of_size(5)
+ .expect("Failed to convert to array of size"),
+ );
let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?;
assert_eq!(result, vec![5, 5]);
@@ -3030,8 +3039,11 @@ mod tests {
let adapter_func = make_scalar_function_with_hints(dummy_function, vec![]);
let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
- let array_arg =
- ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+ let array_arg = ColumnarValue::Array(
+ ScalarValue::Int64(Some(1))
+ .to_array_of_size(5)
+ .expect("Failed to convert to array of size"),
+ );
let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?;
assert_eq!(result, vec![5, 5]);
@@ -3046,8 +3058,11 @@ mod tests {
);
let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
- let array_arg =
- ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+ let array_arg = ColumnarValue::Array(
+ ScalarValue::Int64(Some(1))
+ .to_array_of_size(5)
+ .expect("Failed to convert to array of size"),
+ );
let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?;
assert_eq!(result, vec![5, 1]);
@@ -3056,8 +3071,11 @@ mod tests {
#[test]
fn test_make_scalar_function_with_hints_on_arrays() -> Result<()> {
- let array_arg =
- ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+ let array_arg = ColumnarValue::Array(
+ ScalarValue::Int64(Some(1))
+ .to_array_of_size(5)
+ .expect("Failed to convert to array of size"),
+ );
let adapter_func = make_scalar_function_with_hints(
dummy_function,
vec![Hint::Pad, Hint::AcceptsSingular],
@@ -3077,8 +3095,11 @@ mod tests {
);
let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
- let array_arg =
- ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+ let array_arg = ColumnarValue::Array(
+ ScalarValue::Int64(Some(1))
+ .to_array_of_size(5)
+ .expect("Failed to convert to array of size"),
+ );
let result = unpack_uint64_array(adapter_func(&[
array_arg,
scalar_arg.clone(),
@@ -3097,8 +3118,11 @@ mod tests {
);
let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
- let array_arg =
- ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+ let array_arg = ColumnarValue::Array(
+ ScalarValue::Int64(Some(1))
+ .to_array_of_size(5)
+ .expect("Failed to convert to array of size"),
+ );
let result = unpack_uint64_array(adapter_func(&[
array_arg.clone(),
scalar_arg.clone(),
@@ -3125,8 +3149,11 @@ mod tests {
);
let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
- let array_arg =
- ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+ let array_arg = ColumnarValue::Array(
+ ScalarValue::Int64(Some(1))
+ .to_array_of_size(5)
+ .expect("Failed to convert to array of size"),
+ );
let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?;
assert_eq!(result, vec![5, 1]);
diff --git a/datafusion/physical-expr/src/intervals/interval_aritmetic.rs b/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
index 1ea9b2d9ae..4b81adfbb1 100644
--- a/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
+++ b/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
@@ -750,7 +750,7 @@ fn cast_scalar_value(
data_type: &DataType,
cast_options: &CastOptions,
) -> Result<ScalarValue> {
- let cast_array = cast_with_options(&value.to_array(), data_type, cast_options)?;
+ let cast_array = cast_with_options(&value.to_array()?, data_type, cast_options)?;
ScalarValue::try_from_array(&cast_array, 0)
}
diff --git a/datafusion/physical-expr/src/math_expressions.rs b/datafusion/physical-expr/src/math_expressions.rs
index 0b7bc34014..af66862aec 100644
--- a/datafusion/physical-expr/src/math_expressions.rs
+++ b/datafusion/physical-expr/src/math_expressions.rs
@@ -769,7 +769,8 @@ mod tests {
let args = vec![ColumnarValue::Array(Arc::new(NullArray::new(1)))];
let array = random(&args)
.expect("failed to initialize function random")
- .into_array(1);
+ .into_array(1)
+ .expect("Failed to convert to array");
let floats =
as_float64_array(&array).expect("failed to initialize function random");
diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs
index 64c1d0be04..f318cd3b0f 100644
--- a/datafusion/physical-expr/src/planner.rs
+++ b/datafusion/physical-expr/src/planner.rs
@@ -472,7 +472,7 @@ mod tests {
]))],
)?;
let result = p.evaluate(&batch)?;
- let result = result.into_array(4);
+ let result = result.into_array(4).expect("Failed to convert to array");
assert_eq!(
&result,
diff --git a/datafusion/physical-expr/src/struct_expressions.rs b/datafusion/physical-expr/src/struct_expressions.rs
index baa29d668e..0eed1d16fb 100644
--- a/datafusion/physical-expr/src/struct_expressions.rs
+++ b/datafusion/physical-expr/src/struct_expressions.rs
@@ -67,13 +67,15 @@ fn array_struct(args: &[ArrayRef]) -> Result<ArrayRef> {
/// put values in a struct array.
pub fn struct_expr(values: &[ColumnarValue]) -> Result<ColumnarValue> {
- let arrays: Vec<ArrayRef> = values
+ let arrays = values
.iter()
- .map(|x| match x {
- ColumnarValue::Array(array) => array.clone(),
- ColumnarValue::Scalar(scalar) => scalar.to_array().clone(),
+ .map(|x| {
+ Ok(match x {
+ ColumnarValue::Array(array) => array.clone(),
+ ColumnarValue::Scalar(scalar) => scalar.to_array()?.clone(),
+ })
})
- .collect();
+ .collect::<Result<Vec<ArrayRef>>>()?;
Ok(ColumnarValue::Array(array_struct(arrays.as_slice())?))
}
@@ -93,7 +95,8 @@ mod tests {
];
let struc = struct_expr(&args)
.expect("failed to initialize function struct")
- .into_array(1);
+ .into_array(1)
+ .expect("Failed to convert to array");
let result =
as_struct_array(&struc).expect("failed to initialize function struct");
assert_eq!(
diff --git a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
index 66ffa990b7..7aa4f6536a 100644
--- a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
+++ b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
@@ -60,8 +60,10 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
self.expressions()
.iter()
- .map(|e| e.evaluate(batch))
- .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+ .map(|e| {
+ e.evaluate(batch)
+ .and_then(|v| v.into_array(batch.num_rows()))
+ })
.collect()
}
diff --git a/datafusion/physical-expr/src/window/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs
index f55f1600b9..d22660d41e 100644
--- a/datafusion/physical-expr/src/window/lead_lag.rs
+++ b/datafusion/physical-expr/src/window/lead_lag.rs
@@ -139,6 +139,7 @@ fn create_empty_array(
let array = value
.as_ref()
.map(|scalar| scalar.to_array_of_size(size))
+ .transpose()?
.unwrap_or_else(|| new_null_array(data_type, size));
if array.data_type() != data_type {
cast(&array, data_type).map_err(DataFusionError::ArrowError)
diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs
index 9b0a02d329..b282e35797 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -82,8 +82,10 @@ pub trait WindowExpr: Send + Sync + Debug {
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
self.expressions()
.iter()
- .map(|e| e.evaluate(batch))
- .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+ .map(|e| {
+ e.evaluate(batch)
+ .and_then(|v| v.into_array(batch.num_rows()))
+ })
.collect()
}
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs
index 4052d6aef0..3ac8129297 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -1064,10 +1064,11 @@ fn finalize_aggregation(
// build the vector of states
let a = accumulators
.iter()
- .map(|accumulator| accumulator.state())
- .map(|value| {
- value.map(|e| {
- e.iter().map(|v| v.to_array()).collect::<Vec<ArrayRef>>()
+ .map(|accumulator| {
+ accumulator.state().and_then(|e| {
+ e.iter()
+ .map(|v| v.to_array())
+ .collect::<Result<Vec<ArrayRef>>>()
})
})
.collect::<Result<Vec<_>>>()?;
@@ -1080,7 +1081,7 @@ fn finalize_aggregation(
// merge the state to the final value
accumulators
.iter()
- .map(|accumulator| accumulator.evaluate().map(|v| v.to_array()))
+ .map(|accumulator| accumulator.evaluate().and_then(|v| v.to_array()))
.collect::<Result<Vec<ArrayRef>>>()
}
}
@@ -1092,9 +1093,11 @@ fn evaluate(
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
expr.iter()
- .map(|expr| expr.evaluate(batch))
- .map(|r| r.map(|v| v.into_array(batch.num_rows())))
- .collect::<Result<Vec<_>>>()
+ .map(|expr| {
+ expr.evaluate(batch)
+ .and_then(|v| v.into_array(batch.num_rows()))
+ })
+ .collect()
}
/// Evaluates expressions against a record batch.
@@ -1114,9 +1117,11 @@ fn evaluate_optional(
expr.iter()
.map(|expr| {
expr.as_ref()
- .map(|expr| expr.evaluate(batch))
+ .map(|expr| {
+ expr.evaluate(batch)
+ .and_then(|v| v.into_array(batch.num_rows()))
+ })
.transpose()
- .map(|r| r.map(|v| v.into_array(batch.num_rows())))
})
.collect::<Result<Vec<_>>>()
}
@@ -1140,7 +1145,7 @@ pub(crate) fn evaluate_group_by(
.iter()
.map(|(expr, _)| {
let value = expr.evaluate(batch)?;
- Ok(value.into_array(batch.num_rows()))
+ value.into_array(batch.num_rows())
})
.collect::<Result<Vec<_>>>()?;
@@ -1149,7 +1154,7 @@ pub(crate) fn evaluate_group_by(
.iter()
.map(|(expr, _)| {
let value = expr.evaluate(batch)?;
- Ok(value.into_array(batch.num_rows()))
+ value.into_array(batch.num_rows())
})
.collect::<Result<Vec<_>>>()?;
diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs
index 32c0bbc78a..90eb488a2e 100644
--- a/datafusion/physical-plan/src/aggregates/no_grouping.rs
+++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs
@@ -217,8 +217,10 @@ fn aggregate_batch(
// 1.3
let values = &expr
.iter()
- .map(|e| e.evaluate(&batch))
- .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+ .map(|e| {
+ e.evaluate(&batch)
+ .and_then(|v| v.into_array(batch.num_rows()))
+ })
.collect::<Result<Vec<_>>>()?;
// 1.4
diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs
index 0c44b367e5..d560a219f2 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -300,7 +300,7 @@ pub(crate) fn batch_filter(
) -> Result<RecordBatch> {
predicate
.evaluate(batch)
- .map(|v| v.into_array(batch.num_rows()))
+ .and_then(|v| v.into_array(batch.num_rows()))
.and_then(|array| {
Ok(as_boolean_array(&array)?)
// apply filter array to record batch
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs
index 102f0c42e9..4c928d44ca 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -344,7 +344,7 @@ fn build_batch(
.iter()
.map(|arr| {
let scalar = ScalarValue::try_from_array(arr, left_index)?;
- Ok(scalar.to_array_of_size(batch.num_rows()))
+ scalar.to_array_of_size(batch.num_rows())
})
.collect::<Result<Vec<_>>>()?;
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs
index 1a2db87d98..546a929bf9 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -713,7 +713,7 @@ where
// evaluate the keys
let keys_values = on
.iter()
- .map(|c| Ok(c.evaluate(batch)?.into_array(batch.num_rows())))
+ .map(|c| c.evaluate(batch)?.into_array(batch.num_rows()))
.collect::<Result<Vec<_>>>()?;
// calculate the hash values
@@ -857,13 +857,13 @@ pub fn build_equal_condition_join_indices<T: JoinHashMapType>(
) -> Result<(UInt64Array, UInt32Array)> {
let keys_values = probe_on
.iter()
- .map(|c| Ok(c.evaluate(probe_batch)?.into_array(probe_batch.num_rows())))
+ .map(|c| c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))
.collect::<Result<Vec<_>>>()?;
let build_join_values = build_on
.iter()
.map(|c| {
- Ok(c.evaluate(build_input_buffer)?
- .into_array(build_input_buffer.num_rows()))
+ c.evaluate(build_input_buffer)?
+ .into_array(build_input_buffer.num_rows())
})
.collect::<Result<Vec<_>>>()?;
hashes_buffer.clear();
diff --git a/datafusion/physical-plan/src/joins/hash_join_utils.rs b/datafusion/physical-plan/src/joins/hash_join_utils.rs
index c134b23d78..5ebf370b6d 100644
--- a/datafusion/physical-plan/src/joins/hash_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/hash_join_utils.rs
@@ -607,7 +607,7 @@ pub fn update_filter_expr_interval(
.origin_sorted_expr()
.expr
.evaluate(batch)?
- .into_array(1);
+ .into_array(1)?;
// Convert the array to a ScalarValue:
let value = ScalarValue::try_from_array(&array, 0)?;
// Create a ScalarValue representing positive or negative infinity for the same data type:
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 1306a48744..51561f5dab 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -626,7 +626,9 @@ impl Stream for SymmetricHashJoinStream {
/// # Returns
///
/// A [Result] object that contains the pruning length. The function will return
-/// an error if there is an issue evaluating the build side filter expression.
+/// an error if
+/// - there is an issue evaluating the build side filter expression;
+/// - there is an issue converting the build side filter expression into an array
fn determine_prune_length(
buffer: &RecordBatch,
build_side_filter_expr: &SortedFilterExpr,
@@ -637,7 +639,7 @@ fn determine_prune_length(
let batch_arr = origin_sorted_expr
.expr
.evaluate(buffer)?
- .into_array(buffer.num_rows());
+ .into_array(buffer.num_rows())?;
// Get the lower or upper interval based on the sort direction
let target = if origin_sorted_expr.options.descending {
diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs
index 5efeedfe65..f93f08255e 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -778,7 +778,7 @@ pub(crate) fn apply_join_filter_to_indices(
let filter_result = filter
.expression()
.evaluate(&intermediate_batch)?
- .into_array(intermediate_batch.num_rows());
+ .into_array(intermediate_batch.num_rows())?;
let mask = as_boolean_array(&filter_result)?;
let left_filtered = compute::filter(&build_indices, mask)?;
diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs
index bbf0d6d4b3..b8e2d0e425 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -310,8 +310,10 @@ impl ProjectionStream {
let arrays = self
.expr
.iter()
- .map(|expr| expr.evaluate(batch))
- .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+ .map(|expr| {
+ expr.evaluate(batch)
+ .and_then(|v| v.into_array(batch.num_rows()))
+ })
.collect::<Result<Vec<_>>>()?;
if arrays.is_empty() {
diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs
index 66f7037e5c..9836e057ff 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -169,9 +169,7 @@ impl BatchPartitioner {
let arrays = exprs
.iter()
- .map(|expr| {
- Ok(expr.evaluate(&batch)?.into_array(batch.num_rows()))
- })
+ .map(|expr| expr.evaluate(&batch)?.into_array(batch.num_rows()))
.collect::<Result<Vec<_>>>()?;
hash_buffer.clear();
diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs
index 4cabdc6e17..135b4fbdec 100644
--- a/datafusion/physical-plan/src/sorts/stream.rs
+++ b/datafusion/physical-plan/src/sorts/stream.rs
@@ -118,7 +118,7 @@ impl RowCursorStream {
let cols = self
.column_expressions
.iter()
- .map(|expr| Ok(expr.evaluate(batch)?.into_array(batch.num_rows())))
+ .map(|expr| expr.evaluate(batch)?.into_array(batch.num_rows()))
.collect::<Result<Vec<_>>>()?;
let rows = self.converter.convert_columns(&cols)?;
@@ -181,7 +181,7 @@ impl<T: CursorArray> FieldCursorStream<T> {
fn convert_batch(&mut self, batch: &RecordBatch) -> Result<ArrayValues<T::Values>> {
let value = self.sort.expr.evaluate(batch)?;
- let array = value.into_array(batch.num_rows());
+ let array = value.into_array(batch.num_rows())?;
let array = array.as_any().downcast_ref::<T>().expect("field values");
Ok(ArrayValues::new(self.sort.options, array))
}
diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs
index 4638c0dcf2..9120566273 100644
--- a/datafusion/physical-plan/src/topk/mod.rs
+++ b/datafusion/physical-plan/src/topk/mod.rs
@@ -153,7 +153,7 @@ impl TopK {
.iter()
.map(|expr| {
let value = expr.expr.evaluate(&batch)?;
- Ok(value.into_array(batch.num_rows()))
+ value.into_array(batch.num_rows())
})
.collect::<Result<Vec<_>>>()?;
diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs
index c9f3fb76c2..af4a81626c 100644
--- a/datafusion/physical-plan/src/unnest.rs
+++ b/datafusion/physical-plan/src/unnest.rs
@@ -242,7 +242,7 @@ fn build_batch(
column: &Column,
options: &UnnestOptions,
) -> Result<RecordBatch> {
- let list_array = column.evaluate(batch)?.into_array(batch.num_rows());
+ let list_array = column.evaluate(batch)?.into_array(batch.num_rows())?;
match list_array.data_type() {
DataType::List(_) => {
let list_array = list_array.as_any().downcast_ref::<ListArray>().unwrap();