You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/11/04 19:24:18 UTC
(arrow-datafusion) branch main updated: General approach for Array replace (#8050)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3469c4e09a General approach for Array replace (#8050)
3469c4e09a is described below
commit 3469c4e09a3d32381949dd0c0f626f406c00c6ad
Author: Jay Zhan <ja...@gmail.com>
AuthorDate: Sun Nov 5 03:24:13 2023 +0800
General approach for Array replace (#8050)
* checkpoint
Signed-off-by: jayzhan211 <ja...@gmail.com>
* optimize non-list
Signed-off-by: jayzhan211 <ja...@gmail.com>
* replace list ver
Signed-off-by: jayzhan211 <ja...@gmail.com>
* cleanup
Signed-off-by: jayzhan211 <ja...@gmail.com>
* rename
Signed-off-by: jayzhan211 <ja...@gmail.com>
* cleanup
Signed-off-by: jayzhan211 <ja...@gmail.com>
---------
Signed-off-by: jayzhan211 <ja...@gmail.com>
---
Cargo.toml | 1 +
datafusion-cli/Cargo.lock | 1 +
datafusion/physical-expr/Cargo.toml | 1 +
datafusion/physical-expr/src/array_expressions.rs | 293 ++++++++--------------
4 files changed, 102 insertions(+), 194 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 1a2f4a84af..6558642d4a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -53,6 +53,7 @@ arrow = { version = "48.0.0", features = ["prettyprint"] }
arrow-array = { version = "48.0.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = { version = "48.0.0", default-features = false }
arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] }
+arrow-ord = { version = "48.0.0", default-features = false }
arrow-schema = { version = "48.0.0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "0.4.1"
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index dc828f018f..a5eafa68cf 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1241,6 +1241,7 @@ dependencies = [
"arrow",
"arrow-array",
"arrow-buffer",
+ "arrow-ord",
"arrow-schema",
"base64",
"blake2",
diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml
index 4be625e384..4496e72152 100644
--- a/datafusion/physical-expr/Cargo.toml
+++ b/datafusion/physical-expr/Cargo.toml
@@ -44,6 +44,7 @@ ahash = { version = "0.8", default-features = false, features = ["runtime-rng"]
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
+arrow-ord = { workspace = true }
arrow-schema = { workspace = true }
base64 = { version = "0.21", optional = true }
blake2 = { version = "^0.10.2", optional = true }
diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs
index 687502e79f..e296e9c96f 100644
--- a/datafusion/physical-expr/src/array_expressions.rs
+++ b/datafusion/physical-expr/src/array_expressions.rs
@@ -31,7 +31,8 @@ use datafusion_common::cast::{
};
use datafusion_common::utils::array_into_list_array;
use datafusion_common::{
- exec_err, internal_err, not_impl_err, plan_err, DataFusionError, Result,
+ exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err,
+ DataFusionError, Result,
};
use itertools::Itertools;
@@ -1221,217 +1222,121 @@ array_removement_function!(
"Array_remove_all SQL function"
);
-macro_rules! general_replace {
- ($ARRAY:expr, $FROM:expr, $TO:expr, $MAX:expr, $ARRAY_TYPE:ident) => {{
- let mut offsets: Vec<i32> = vec![0];
- let mut values =
- downcast_arg!(new_empty_array($FROM.data_type()), $ARRAY_TYPE).clone();
-
- let from_array = downcast_arg!($FROM, $ARRAY_TYPE);
- let to_array = downcast_arg!($TO, $ARRAY_TYPE);
- for (((arr, from), to), max) in $ARRAY
- .iter()
- .zip(from_array.iter())
- .zip(to_array.iter())
- .zip($MAX.iter())
- {
- let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
- DataFusionError::Internal(format!("offsets should not be empty"))
- })?;
- match arr {
- Some(arr) => {
- let child_array = downcast_arg!(arr, $ARRAY_TYPE);
- let mut counter = 0;
- let max = if max < Some(1) { 1 } else { max.unwrap() };
-
- let replaced_array = child_array
- .iter()
- .map(|el| {
- if counter != max && el == from {
- counter += 1;
- to
+fn general_replace(args: &[ArrayRef], arr_n: Vec<i64>) -> Result<ArrayRef> {
+ let list_array = as_list_array(&args[0])?;
+ let from_array = &args[1];
+ let to_array = &args[2];
+
+ let mut offsets: Vec<i32> = vec![0];
+ let data_type = list_array.value_type();
+ let mut values = new_empty_array(&data_type);
+
+ for (row_index, (arr, n)) in list_array.iter().zip(arr_n.iter()).enumerate() {
+ let last_offset: i32 = offsets
+ .last()
+ .copied()
+ .ok_or_else(|| internal_datafusion_err!("offsets should not be empty"))?;
+ match arr {
+ Some(arr) => {
+ let indices = UInt32Array::from(vec![row_index as u32]);
+ let from_arr = arrow::compute::take(from_array, &indices, None)?;
+
+ let eq_array = match from_arr.data_type() {
+ // arrow_ord::cmp_eq does not support ListArray, so we need to compare it by loop
+ DataType::List(_) => {
+ let from_a = as_list_array(&from_arr)?.value(0);
+ let list_arr = as_list_array(&arr)?;
+
+ let mut bool_values = vec![];
+ for arr in list_arr.iter() {
+ if let Some(a) = arr {
+ bool_values.push(Some(a.eq(&from_a)));
} else {
- el
+ return internal_err!(
+ "Null value is not supported in array_replace"
+ );
}
- })
- .collect::<$ARRAY_TYPE>();
-
- values = downcast_arg!(
- compute::concat(&[&values, &replaced_array])?.clone(),
- $ARRAY_TYPE
- )
- .clone();
- offsets.push(last_offset + replaced_array.len() as i32);
- }
- None => {
- offsets.push(last_offset);
- }
- }
- }
-
- let field = Arc::new(Field::new("item", $FROM.data_type().clone(), true));
-
- Arc::new(ListArray::try_new(
- field,
- OffsetBuffer::new(offsets.into()),
- Arc::new(values),
- None,
- )?)
- }};
-}
-
-macro_rules! general_replace_list {
- ($ARRAY:expr, $FROM:expr, $TO:expr, $MAX:expr, $ARRAY_TYPE:ident) => {{
- let mut offsets: Vec<i32> = vec![0];
- let mut values =
- downcast_arg!(new_empty_array($FROM.data_type()), ListArray).clone();
-
- let from_array = downcast_arg!($FROM, ListArray);
- let to_array = downcast_arg!($TO, ListArray);
- for (((arr, from), to), max) in $ARRAY
- .iter()
- .zip(from_array.iter())
- .zip(to_array.iter())
- .zip($MAX.iter())
- {
- let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
- DataFusionError::Internal(format!("offsets should not be empty"))
- })?;
- match arr {
- Some(arr) => {
- let child_array = downcast_arg!(arr, ListArray);
- let mut counter = 0;
- let max = if max < Some(1) { 1 } else { max.unwrap() };
+ }
+ BooleanArray::from(bool_values)
+ }
+ _ => {
+ let from_arr = Scalar::new(from_arr);
+ arrow_ord::cmp::eq(&arr, &from_arr)?
+ }
+ };
- let replaced_vec = child_array
- .iter()
- .map(|el| {
- if counter != max && el == from {
- counter += 1;
- to.clone().unwrap()
- } else {
- el.clone().unwrap()
+ // Use MutableArrayData to build the replaced array
+ // First array is the original array, second array is the element to replace with.
+ let arrays = vec![arr, to_array.clone()];
+ let arrays_data = arrays
+ .iter()
+ .map(|a| a.to_data())
+ .collect::<Vec<ArrayData>>();
+ let arrays_data = arrays_data.iter().collect::<Vec<&ArrayData>>();
+
+ let arrays = arrays
+ .iter()
+ .map(|arr| arr.as_ref())
+ .collect::<Vec<&dyn Array>>();
+ let capacity = Capacities::Array(arrays.iter().map(|a| a.len()).sum());
+
+ let mut mutable =
+ MutableArrayData::with_capacities(arrays_data, false, capacity);
+
+ let mut counter = 0;
+ for (i, to_replace) in eq_array.iter().enumerate() {
+ if let Some(to_replace) = to_replace {
+ if to_replace {
+ mutable.extend(1, row_index, row_index + 1);
+ counter += 1;
+ if counter == *n {
+ // extend the rest of the array
+ mutable.extend(0, i + 1, eq_array.len());
+ break;
}
- })
- .collect::<Vec<_>>();
-
- let mut i: i32 = 0;
- let mut replaced_offsets = vec![i];
- replaced_offsets.extend(
- replaced_vec
- .clone()
- .into_iter()
- .map(|a| {
- i += a.len() as i32;
- i
- })
- .collect::<Vec<_>>(),
- );
-
- let mut replaced_values = downcast_arg!(
- new_empty_array(&from_array.value_type()),
- $ARRAY_TYPE
- )
- .clone();
- for replaced_list in replaced_vec {
- replaced_values = downcast_arg!(
- compute::concat(&[&replaced_values, &replaced_list])?,
- $ARRAY_TYPE
- )
- .clone();
+ } else {
+ mutable.extend(0, i, i + 1);
+ }
+ } else {
+ return internal_err!("eq_array should not contain None");
}
+ }
- let field = Arc::new(Field::new(
- "item",
- from_array.value_type().clone(),
- true,
- ));
- let replaced_array = ListArray::try_new(
- field,
- OffsetBuffer::new(replaced_offsets.clone().into()),
- Arc::new(replaced_values),
- None,
- )?;
+ let data = mutable.freeze();
+ let replaced_array = arrow_array::make_array(data);
- values = downcast_arg!(
- compute::concat(&[&values, &replaced_array,])?.clone(),
- ListArray
- )
- .clone();
- offsets.push(last_offset + replaced_array.len() as i32);
- }
- None => {
- offsets.push(last_offset);
- }
+ let v = arrow::compute::concat(&[&values, &replaced_array])?;
+ values = v;
+ offsets.push(last_offset + replaced_array.len() as i32);
+ }
+ None => {
+ offsets.push(last_offset);
}
}
+ }
- let field = Arc::new(Field::new("item", $FROM.data_type().clone(), true));
-
- Arc::new(ListArray::try_new(
- field,
- OffsetBuffer::new(offsets.into()),
- Arc::new(values),
- None,
- )?)
- }};
-}
-
-macro_rules! array_replacement_function {
- ($FUNC:ident, $MAX_FUNC:expr, $DOC:expr) => {
- #[doc = $DOC]
- pub fn $FUNC(args: &[ArrayRef]) -> Result<ArrayRef> {
- let arr = as_list_array(&args[0])?;
- let from = &args[1];
- let to = &args[2];
- let max = $MAX_FUNC(args)?;
-
- check_datatypes(stringify!($FUNC), &[arr.values(), from, to])?;
- let res = match arr.value_type() {
- DataType::List(field) => {
- macro_rules! array_function {
- ($ARRAY_TYPE:ident) => {
- general_replace_list!(arr, from, to, max, $ARRAY_TYPE)
- };
- }
- call_array_function!(field.data_type(), true)
- }
- data_type => {
- macro_rules! array_function {
- ($ARRAY_TYPE:ident) => {
- general_replace!(arr, from, to, max, $ARRAY_TYPE)
- };
- }
- call_array_function!(data_type, false)
- }
- };
-
- Ok(res)
- }
- };
+ Ok(Arc::new(ListArray::try_new(
+ Arc::new(Field::new("item", data_type, true)),
+ OffsetBuffer::new(offsets.into()),
+ values,
+ None,
+ )?))
}
-fn replace_one(args: &[ArrayRef]) -> Result<Int64Array> {
- Ok(Int64Array::from_value(1, args[0].len()))
+pub fn array_replace(args: &[ArrayRef]) -> Result<ArrayRef> {
+ general_replace(args, vec![1; args[0].len()])
}
-fn replace_n(args: &[ArrayRef]) -> Result<Int64Array> {
- as_int64_array(&args[3]).cloned()
+pub fn array_replace_n(args: &[ArrayRef]) -> Result<ArrayRef> {
+ let arr = as_int64_array(&args[3])?;
+ let arr_n = arr.values().to_vec();
+ general_replace(args, arr_n)
}
-fn replace_all(args: &[ArrayRef]) -> Result<Int64Array> {
- Ok(Int64Array::from_value(i64::MAX, args[0].len()))
+pub fn array_replace_all(args: &[ArrayRef]) -> Result<ArrayRef> {
+ general_replace(args, vec![i64::MAX; args[0].len()])
}
-// array replacement functions
-array_replacement_function!(array_replace, replace_one, "Array_replace SQL function");
-array_replacement_function!(array_replace_n, replace_n, "Array_replace_n SQL function");
-array_replacement_function!(
- array_replace_all,
- replace_all,
- "Array_replace_all SQL function"
-);
-
macro_rules! to_string {
($ARG:expr, $ARRAY:expr, $DELIMITER:expr, $NULL_STRING:expr, $WITH_NULL_STRING:expr, $ARRAY_TYPE:ident) => {{
let arr = downcast_arg!($ARRAY, $ARRAY_TYPE);