You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2024/02/10 11:04:10 UTC
(arrow-datafusion) branch main updated: Use concat to simplify Nested Scalar creation (#9174)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new f97a2087c2 Use concat to simplify Nested Scalar creation (#9174)
f97a2087c2 is described below
commit f97a2087c28c9c134198b94f388cfde9da8a354c
Author: Jay Zhan <ja...@gmail.com>
AuthorDate: Sat Feb 10 19:04:04 2024 +0800
Use concat to simplify Nested Scalar creation (#9174)
* replace with concat
Signed-off-by: jayzhan211 <ja...@gmail.com>
* rewrite
Signed-off-by: jayzhan211 <ja...@gmail.com>
* remove map_err
Signed-off-by: jayzhan211 <ja...@gmail.com>
---------
Signed-off-by: jayzhan211 <ja...@gmail.com>
---
datafusion/common/src/scalar.rs | 185 ++++++++++++++--------------------------
1 file changed, 65 insertions(+), 120 deletions(-)
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 85b478bc0d..8f7968a373 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -53,7 +53,7 @@ use arrow::{
};
use arrow_array::cast::as_list_array;
use arrow_array::{ArrowNativeTypeOp, Scalar};
-use arrow_buffer::{Buffer, NullBuffer};
+use arrow_buffer::NullBuffer;
/// A dynamically typed, nullable single value, (the single-valued counter-part
/// to arrow's [`Array`])
@@ -1402,121 +1402,6 @@ impl ScalarValue {
}};
}
- fn build_struct_array(
- scalars: impl IntoIterator<Item = ScalarValue>,
- ) -> Result<ArrayRef> {
- let arrays = scalars
- .into_iter()
- .map(|s| s.to_array())
- .collect::<Result<Vec<_>>>()?;
-
- let first_struct = arrays[0].as_struct_opt();
- if first_struct.is_none() {
- return _internal_err!(
- "Inconsistent types in ScalarValue::iter_to_array. \
- Expected ScalarValue::Struct, got {:?}",
- arrays[0].clone()
- );
- }
-
- let mut valid = BooleanBufferBuilder::new(arrays.len());
-
- let first_struct = first_struct.unwrap();
- valid.append(first_struct.is_valid(0));
-
- let mut column_values: Vec<Vec<ScalarValue>> =
- vec![Vec::with_capacity(arrays.len()); first_struct.num_columns()];
-
- for (i, v) in first_struct.columns().iter().enumerate() {
- // ScalarValue::Struct contains a single element in each column.
- let sv = ScalarValue::try_from_array(v, 0)?;
- column_values[i].push(sv);
- }
-
- for arr in arrays.iter().skip(1) {
- if let Some(struct_array) = arr.as_struct_opt() {
- valid.append(struct_array.is_valid(0));
-
- for (i, v) in struct_array.columns().iter().enumerate() {
- // ScalarValue::Struct contains a single element in each column.
- let sv = ScalarValue::try_from_array(v, 0)?;
- column_values[i].push(sv);
- }
- } else {
- return _internal_err!(
- "Inconsistent types in ScalarValue::iter_to_array. \
- Expected ScalarValue::Struct, got {arr:?}"
- );
- }
- }
-
- let column_fields = first_struct.fields().to_vec();
-
- let mut data = vec![];
- for (field, values) in
- column_fields.into_iter().zip(column_values.into_iter())
- {
- let field = field.to_owned();
- let array = ScalarValue::iter_to_array(values.into_iter())?;
- data.push((field, array));
- }
-
- let bool_buffer = valid.finish();
- let buffer: Buffer = bool_buffer.values().into();
- Ok(Arc::new(StructArray::from((data, buffer))))
- }
-
- fn build_list_array(
- scalars: impl IntoIterator<Item = ScalarValue>,
- ) -> Result<ArrayRef> {
- let arrays = scalars
- .into_iter()
- .map(|s| s.to_array())
- .collect::<Result<Vec<_>>>()?;
-
- let capacity = Capacities::Array(
- arrays
- .iter()
- .filter_map(|arr| {
- if !arr.is_null(0) {
- Some(arr.len())
- } else {
- None
- }
- })
- .sum(),
- );
-
- // ScalarValue::List contains a single element ListArray.
- let nulls = arrays
- .iter()
- .map(|arr| arr.is_null(0))
- .collect::<Vec<bool>>();
- let arrays_data = arrays
- .iter()
- .filter(|arr| !arr.is_null(0))
- .map(|arr| arr.to_data())
- .collect::<Vec<_>>();
-
- let arrays_ref = arrays_data.iter().collect::<Vec<_>>();
- let mut mutable =
- MutableArrayData::with_capacities(arrays_ref, true, capacity);
-
- // ScalarValue::List contains a single element ListArray.
- let mut index = 0;
- for is_null in nulls.into_iter() {
- if is_null {
- mutable.extend_nulls(1);
- } else {
- // mutable array contains non-null elements
- mutable.extend(index, 0, 1);
- index += 1;
- }
- }
- let data = mutable.freeze();
- Ok(arrow_array::make_array(data))
- }
-
let array: ArrayRef = match &data_type {
DataType::Decimal128(precision, scale) => {
let decimal_array =
@@ -1591,10 +1476,32 @@ impl ScalarValue {
DataType::Interval(IntervalUnit::MonthDayNano) => {
build_array_primitive!(IntervalMonthDayNanoArray, IntervalMonthDayNano)
}
- DataType::Struct(_) => build_struct_array(scalars)?,
- DataType::List(_)
- | DataType::LargeList(_)
- | DataType::FixedSizeList(_, _) => build_list_array(scalars)?,
+ DataType::FixedSizeList(_, _) => {
+ // arrow::compute::concat does not allow inconsistent types including the size of FixedSizeList.
+ // The length of nulls here we got is 1, so we need to resize the length of nulls to
+ // the length of non-nulls.
+ let mut arrays =
+ scalars.map(|s| s.to_array()).collect::<Result<Vec<_>>>()?;
+ let first_non_null_data_type = arrays
+ .iter()
+ .find(|sv| !sv.is_null(0))
+ .map(|sv| sv.data_type().to_owned());
+ if let Some(DataType::FixedSizeList(f, l)) = first_non_null_data_type {
+ for array in arrays.iter_mut() {
+ if array.is_null(0) {
+ *array =
+ Arc::new(FixedSizeListArray::new_null(f.clone(), l, 1));
+ }
+ }
+ }
+ let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
+ arrow::compute::concat(arrays.as_slice())?
+ }
+ DataType::List(_) | DataType::LargeList(_) | DataType::Struct(_) => {
+ let arrays = scalars.map(|s| s.to_array()).collect::<Result<Vec<_>>>()?;
+ let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
+ arrow::compute::concat(arrays.as_slice())?
+ }
DataType::Dictionary(key_type, value_type) => {
// create the values array
let value_scalars = scalars
@@ -3529,6 +3436,44 @@ mod tests {
.collect()
}
+ #[test]
+ fn test_iter_to_array_fixed_size_list() {
+ let field = Arc::new(Field::new("item", DataType::Int32, true));
+ let f1 = Arc::new(FixedSizeListArray::new(
+ field.clone(),
+ 3,
+ Arc::new(Int32Array::from(vec![1, 2, 3])),
+ None,
+ ));
+ let f2 = Arc::new(FixedSizeListArray::new(
+ field.clone(),
+ 3,
+ Arc::new(Int32Array::from(vec![4, 5, 6])),
+ None,
+ ));
+ let f_nulls = Arc::new(FixedSizeListArray::new_null(field, 1, 1));
+
+ let scalars = vec![
+ ScalarValue::FixedSizeList(f_nulls.clone()),
+ ScalarValue::FixedSizeList(f1),
+ ScalarValue::FixedSizeList(f2),
+ ScalarValue::FixedSizeList(f_nulls),
+ ];
+
+ let array = ScalarValue::iter_to_array(scalars).unwrap();
+
+ let expected = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
+ vec![
+ None,
+ Some(vec![Some(1), Some(2), Some(3)]),
+ Some(vec![Some(4), Some(5), Some(6)]),
+ None,
+ ],
+ 3,
+ );
+ assert_eq!(array.as_ref(), &expected);
+ }
+
#[test]
fn test_iter_to_array_struct() {
let s1 = StructArray::from(vec![