You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by co...@apache.org on 2024/02/28 01:17:23 UTC
(arrow-datafusion-comet) branch main updated: build: Upgrade DF to 36.0.0 and arrow-rs 50.0.0 (#66)
This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new ee977c3 build: Upgrade DF to 36.0.0 and arrow-rs 50.0.0 (#66)
ee977c3 is described below
commit ee977c3d4277d57a657b97e05c077d66798e6457
Author: comphead <co...@users.noreply.github.com>
AuthorDate: Tue Feb 27 17:17:18 2024 -0800
build: Upgrade DF to 36.0.0 and arrow-rs 50.0.0 (#66)
* Upgrade DF and arrow-rs
* fix benches
* fix merge
* fix merge
* Update core/src/execution/datafusion/expressions/scalar_funcs.rs
Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
* Update core/src/execution/datafusion/expressions/scalar_funcs.rs
Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
---------
Co-authored-by: o_voievodin <o_...@apple.com>
Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
---
core/Cargo.lock | 93 +++++++++++++++-------
core/Cargo.toml | 6 +-
core/benches/common.rs | 2 +
core/src/execution/datafusion/expressions/avg.rs | 12 +--
.../datafusion/expressions/avg_decimal.rs | 10 +--
.../datafusion/expressions/scalar_funcs.rs | 20 +++--
.../datafusion/expressions/sum_decimal.rs | 10 +--
core/src/execution/datafusion/planner.rs | 2 +
core/src/execution/operators/copy.rs | 5 +-
core/src/execution/operators/scan.rs | 4 +-
10 files changed, 103 insertions(+), 61 deletions(-)
diff --git a/core/Cargo.lock b/core/Cargo.lock
index 0f262c0..456d969 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -492,16 +492,16 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
-version = "0.4.31"
+version = "0.4.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38"
+checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
- "windows-targets 0.48.5",
+ "windows-targets 0.52.0",
]
[[package]]
@@ -650,8 +650,8 @@ version = "7.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c64043d6c7b7a4c58e39e7efccfdea7b93d885a795d0c054a69dbbf4dd52686"
dependencies = [
- "strum",
- "strum_macros",
+ "strum 0.25.0",
+ "strum_macros 0.25.3",
"unicode-width",
]
@@ -833,9 +833,9 @@ dependencies = [
[[package]]
name = "datafusion"
-version = "35.0.0"
+version = "36.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4328f5467f76d890fe3f924362dbc3a838c6a733f762b32d87f9e0b7bef5fb49"
+checksum = "b2b360b692bf6c6d6e6b6dbaf41a3be0020daeceac0f406aed54c75331e50dbb"
dependencies = [
"ahash",
"arrow",
@@ -849,6 +849,7 @@ dependencies = [
"datafusion-common",
"datafusion-execution",
"datafusion-expr",
+ "datafusion-functions",
"datafusion-optimizer",
"datafusion-physical-expr",
"datafusion-physical-plan",
@@ -874,9 +875,9 @@ dependencies = [
[[package]]
name = "datafusion-common"
-version = "35.0.0"
+version = "36.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d29a7752143b446db4a2cccd9a6517293c6b97e8c39e520ca43ccd07135a4f7e"
+checksum = "37f343ccc298f440e25aa38ff82678291a7acc24061c7370ba6c0ff5cc811412"
dependencies = [
"ahash",
"arrow",
@@ -893,9 +894,9 @@ dependencies = [
[[package]]
name = "datafusion-execution"
-version = "35.0.0"
+version = "36.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2d447650af16e138c31237f53ddaef6dd4f92f0e2d3f2f35d190e16c214ca496"
+checksum = "3f9c93043081487e335399a21ebf8295626367a647ac5cb87d41d18afad7d0f7"
dependencies = [
"arrow",
"chrono",
@@ -914,9 +915,9 @@ dependencies = [
[[package]]
name = "datafusion-expr"
-version = "35.0.0"
+version = "36.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d8d19598e48a498850fb79f97a9719b1f95e7deb64a7a06f93f313e8fa1d524b"
+checksum = "e204d89909e678846b6a95f156aafc1ee5b36cb6c9e37ec2e1449b078a38c818"
dependencies = [
"ahash",
"arrow",
@@ -924,15 +925,30 @@ dependencies = [
"datafusion-common",
"paste",
"sqlparser",
- "strum",
- "strum_macros",
+ "strum 0.26.1",
+ "strum_macros 0.26.1",
+]
+
+[[package]]
+name = "datafusion-functions"
+version = "36.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "98f1c73f7801b2b8ba2297b3ad78ffcf6c1fc6b8171f502987eb9ad5cb244ee7"
+dependencies = [
+ "arrow",
+ "base64",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-expr",
+ "hex",
+ "log",
]
[[package]]
name = "datafusion-optimizer"
-version = "35.0.0"
+version = "36.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8b7feb0391f1fc75575acb95b74bfd276903dc37a5409fcebe160bc7ddff2010"
+checksum = "5ae27e07bf1f04d327be5c2a293470879801ab5535204dc3b16b062fda195496"
dependencies = [
"arrow",
"async-trait",
@@ -948,9 +964,9 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
-version = "35.0.0"
+version = "36.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e911bca609c89a54e8f014777449d8290327414d3e10c57a3e3c2122e38878d0"
+checksum = "dde620cd9ef76a3bca9c754fb68854bd2349c49f55baf97e08001f9e967f6d6b"
dependencies = [
"ahash",
"arrow",
@@ -958,11 +974,13 @@ dependencies = [
"arrow-buffer",
"arrow-ord",
"arrow-schema",
+ "arrow-string",
"base64",
"blake2",
"blake3",
"chrono",
"datafusion-common",
+ "datafusion-execution",
"datafusion-expr",
"half 2.1.0",
"hashbrown 0.14.3",
@@ -982,9 +1000,9 @@ dependencies = [
[[package]]
name = "datafusion-physical-plan"
-version = "35.0.0"
+version = "36.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e96b546b8a02e9c2ab35ac6420d511f12a4701950c1eb2e568c122b4fefb0be3"
+checksum = "9a4c75fba9ea99d64b2246cbd2fcae2e6fc973e6616b1015237a616036506dd4"
dependencies = [
"ahash",
"arrow",
@@ -1013,9 +1031,9 @@ dependencies = [
[[package]]
name = "datafusion-sql"
-version = "35.0.0"
+version = "36.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2d18d36f260bbbd63aafdb55339213a23d540d3419810575850ef0a798a6b768"
+checksum = "21474a95c3a62d113599d21b439fa15091b538bac06bd20be0bb2e7d22903c09"
dependencies = [
"arrow",
"arrow-schema",
@@ -2516,9 +2534,9 @@ checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b"
[[package]]
name = "sqlparser"
-version = "0.41.0"
+version = "0.43.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
+checksum = "f95c4bae5aba7cd30bd506f7140026ade63cff5afd778af8854026f9606bf5d4"
dependencies = [
"log",
"sqlparser_derive",
@@ -2558,8 +2576,14 @@ name = "strum"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125"
+
+[[package]]
+name = "strum"
+version = "0.26.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "723b93e8addf9aa965ebe2d11da6d7540fa2283fcea14b3371ff055f7ba13f5f"
dependencies = [
- "strum_macros",
+ "strum_macros 0.26.1",
]
[[package]]
@@ -2575,6 +2599,19 @@ dependencies = [
"syn 2.0.48",
]
+[[package]]
+name = "strum_macros"
+version = "0.26.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7a3417fc93d76740d974a01654a09777cb500428cc874ca9f45edfe0c4d4cd18"
+dependencies = [
+ "heck 0.4.1",
+ "proc-macro2",
+ "quote",
+ "rustversion",
+ "syn 2.0.48",
+]
+
[[package]]
name = "subtle"
version = "2.5.0"
@@ -2740,9 +2777,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
-version = "1.35.1"
+version = "1.36.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104"
+checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931"
dependencies = [
"backtrace",
"bytes",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 14e2717..4dc5afe 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -66,9 +66,9 @@ itertools = "0.11.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.8" }
paste = "1.0.14"
-datafusion-common = { version = "35.0.0" }
-datafusion = { default-features = false, version = "35.0.0", features = ["unicode_expressions"] }
-datafusion-physical-expr = { version = "35.0.0", default-features = false , features = ["unicode_expressions"] }
+datafusion-common = { version = "36.0.0" }
+datafusion = { default-features = false, version = "36.0.0", features = ["unicode_expressions"] }
+datafusion-physical-expr = { version = "36.0.0", default-features = false , features = ["unicode_expressions"] }
unicode-segmentation = "^1.10.1"
once_cell = "1.18.0"
regex = "1.9.6"
diff --git a/core/benches/common.rs b/core/benches/common.rs
index 0597216..15952b8 100644
--- a/core/benches/common.rs
+++ b/core/benches/common.rs
@@ -45,6 +45,7 @@ pub fn create_int64_array(size: usize, null_density: f32, min: i64, max: i64) ->
.collect()
}
+#[allow(dead_code)]
pub fn create_primitive_array<T>(size: usize, null_density: f32) -> PrimitiveArray<T>
where
T: ArrowPrimitiveType,
@@ -64,6 +65,7 @@ where
/// Creates a dictionary with random keys and values, with value type `T`.
/// Note here the keys are the dictionary indices.
+#[allow(dead_code)]
pub fn create_dictionary_array<T>(
size: usize,
value_size: usize,
diff --git a/core/src/execution/datafusion/expressions/avg.rs b/core/src/execution/datafusion/expressions/avg.rs
index dc2b347..1e04ab0 100644
--- a/core/src/execution/datafusion/expressions/avg.rs
+++ b/core/src/execution/datafusion/expressions/avg.rs
@@ -24,11 +24,11 @@ use arrow_array::{
Array, ArrayRef, ArrowNumericType, Int64Array, PrimitiveArray,
};
use arrow_schema::{DataType, Field};
-use datafusion::logical_expr::{type_coercion::aggregates::avg_return_type, Accumulator};
-use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
-use datafusion_physical_expr::{
- expressions::format_state_name, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr,
+use datafusion::logical_expr::{
+ type_coercion::aggregates::avg_return_type, Accumulator, EmitTo, GroupsAccumulator,
};
+use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
+use datafusion_physical_expr::{expressions::format_state_name, AggregateExpr, PhysicalExpr};
use std::{any::Any, sync::Arc};
use arrow_array::ArrowNativeTypeOp;
@@ -146,7 +146,7 @@ pub struct AvgAccumulator {
}
impl Accumulator for AvgAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::Float64(self.sum),
ScalarValue::from(self.count),
@@ -175,7 +175,7 @@ impl Accumulator for AvgAccumulator {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::Float64(
self.sum.map(|f| f / self.count as f64),
))
diff --git a/core/src/execution/datafusion/expressions/avg_decimal.rs b/core/src/execution/datafusion/expressions/avg_decimal.rs
index dc7bf15..6fb5581 100644
--- a/core/src/execution/datafusion/expressions/avg_decimal.rs
+++ b/core/src/execution/datafusion/expressions/avg_decimal.rs
@@ -24,11 +24,9 @@ use arrow_array::{
Array, ArrayRef, Decimal128Array, Int64Array, PrimitiveArray,
};
use arrow_schema::{DataType, Field};
-use datafusion::logical_expr::Accumulator;
+use datafusion::logical_expr::{Accumulator, EmitTo, GroupsAccumulator};
use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
-use datafusion_physical_expr::{
- expressions::format_state_name, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr,
-};
+use datafusion_physical_expr::{expressions::format_state_name, AggregateExpr, PhysicalExpr};
use std::{any::Any, sync::Arc};
use arrow_array::ArrowNativeTypeOp;
@@ -214,7 +212,7 @@ impl AvgDecimalAccumulator {
}
impl Accumulator for AvgDecimalAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::Decimal128(self.sum, self.sum_precision, self.sum_scale),
ScalarValue::from(self.count),
@@ -266,7 +264,7 @@ impl Accumulator for AvgDecimalAccumulator {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
fn make_decimal128(value: Option<i128>, precision: u8, scale: i8) -> ScalarValue {
ScalarValue::Decimal128(value, precision, scale)
}
diff --git a/core/src/execution/datafusion/expressions/scalar_funcs.rs b/core/src/execution/datafusion/expressions/scalar_funcs.rs
index 8759566..8ff13e1 100644
--- a/core/src/execution/datafusion/expressions/scalar_funcs.rs
+++ b/core/src/execution/datafusion/expressions/scalar_funcs.rs
@@ -31,13 +31,11 @@ use datafusion::{
physical_plan::ColumnarValue,
};
use datafusion_common::{
- cast::as_generic_string_array, internal_err, DataFusionError, Result as DataFusionResult,
- ScalarValue,
+ cast::as_generic_string_array, exec_err, internal_err, DataFusionError,
+ Result as DataFusionResult, ScalarValue,
};
use datafusion_physical_expr::{
- execution_props::ExecutionProps,
- functions::{create_physical_fun, make_scalar_function},
- math_expressions,
+ execution_props::ExecutionProps, functions::create_physical_fun, math_expressions,
};
use num::{BigInt, Signed, ToPrimitive};
use unicode_segmentation::UnicodeSegmentation;
@@ -366,7 +364,12 @@ fn spark_round(
let (precision, scale) = get_precision_scale(data_type);
make_decimal_array(array, precision, scale, &f)
}
- _ => make_scalar_function(math_expressions::round)(args),
+ DataType::Float32 | DataType::Float64 => {
+ Ok(ColumnarValue::Array(math_expressions::round(&[
+ array.clone()
+ ])?))
+ }
+ dt => exec_err!("Not supported datatype for ROUND: {dt}"),
},
ColumnarValue::Scalar(a) => match a {
ScalarValue::Int64(a) if *point < 0 => {
@@ -386,7 +389,10 @@ fn spark_round(
let (precision, scale) = get_precision_scale(data_type);
make_decimal_scalar(a, precision, scale, &f)
}
- _ => make_scalar_function(math_expressions::round)(args),
+ ScalarValue::Float32(_) | ScalarValue::Float64(_) => Ok(ColumnarValue::Scalar(
+ ScalarValue::try_from_array(&math_expressions::round(&[a.to_array()?])?, 0)?,
+ )),
+ dt => exec_err!("Not supported datatype for ROUND: {dt}"),
},
}
}
diff --git a/core/src/execution/datafusion/expressions/sum_decimal.rs b/core/src/execution/datafusion/expressions/sum_decimal.rs
index a6da5f5..2afbbf0 100644
--- a/core/src/execution/datafusion/expressions/sum_decimal.rs
+++ b/core/src/execution/datafusion/expressions/sum_decimal.rs
@@ -24,11 +24,9 @@ use arrow_array::{
};
use arrow_data::decimal::validate_decimal_precision;
use arrow_schema::{DataType, Field};
-use datafusion::logical_expr::Accumulator;
+use datafusion::logical_expr::{Accumulator, EmitTo, GroupsAccumulator};
use datafusion_common::{Result as DFResult, ScalarValue};
-use datafusion_physical_expr::{
- aggregate::utils::down_cast_any_ref, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr,
-};
+use datafusion_physical_expr::{aggregate::utils::down_cast_any_ref, AggregateExpr, PhysicalExpr};
use std::{any::Any, ops::BitAnd, sync::Arc};
use crate::unlikely;
@@ -204,7 +202,7 @@ impl Accumulator for SumDecimalAccumulator {
Ok(())
}
- fn evaluate(&self) -> DFResult<ScalarValue> {
+ fn evaluate(&mut self) -> DFResult<ScalarValue> {
// For each group:
// 1. if `is_empty` is true, it means either there is no value or all values for the group
// are null, in this case we'll return null
@@ -224,7 +222,7 @@ impl Accumulator for SumDecimalAccumulator {
std::mem::size_of_val(self)
}
- fn state(&self) -> DFResult<Vec<ScalarValue>> {
+ fn state(&mut self) -> DFResult<Vec<ScalarValue>> {
let sum = if self.is_not_null {
ScalarValue::try_new_decimal128(self.sum, self.precision, self.scale)?
} else {
diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs
index 66a29cb..f4a0cec 100644
--- a/core/src/execution/datafusion/planner.rs
+++ b/core/src/execution/datafusion/planner.rs
@@ -608,6 +608,7 @@ impl PhysicalPlanner {
vec![left, right],
data_type,
None,
+ false,
)))
}
_ => Ok(Arc::new(BinaryExpr::new(left, op, right))),
@@ -984,6 +985,7 @@ impl PhysicalPlanner {
args.to_vec(),
data_type,
None,
+ args.is_empty(),
));
Ok(scalar_expr)
diff --git a/core/src/execution/operators/copy.rs b/core/src/execution/operators/copy.rs
index c818d62..996db2b 100644
--- a/core/src/execution/operators/copy.rs
+++ b/core/src/execution/operators/copy.rs
@@ -28,7 +28,7 @@ use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*};
-use datafusion_common::{DataFusionError, Result as DataFusionResult};
+use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult};
use super::copy_or_cast_array;
@@ -141,8 +141,7 @@ impl CopyStream {
.iter()
.map(|v| copy_or_cast_array(v))
.collect::<Result<Vec<ArrayRef>, _>>()?;
- RecordBatch::try_new(self.schema.clone(), vectors)
- .map_err(|err| DataFusionError::ArrowError(err, None))
+ RecordBatch::try_new(self.schema.clone(), vectors).map_err(|e| arrow_datafusion_err!(e))
}
}
diff --git a/core/src/execution/operators/scan.rs b/core/src/execution/operators/scan.rs
index 9f85de8..e31230c 100644
--- a/core/src/execution/operators/scan.rs
+++ b/core/src/execution/operators/scan.rs
@@ -43,7 +43,7 @@ use datafusion::{
physical_expr::*,
physical_plan::{ExecutionPlan, *},
};
-use datafusion_common::{DataFusionError, Result as DataFusionResult};
+use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult};
use jni::{
objects::{GlobalRef, JLongArray, JObject, ReleaseMode},
sys::jlongArray,
@@ -325,7 +325,7 @@ impl ScanStream {
let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
RecordBatch::try_new_with_options(self.schema.clone(), new_columns, &options)
- .map_err(|err| DataFusionError::ArrowError(err, None))
+ .map_err(|e| arrow_datafusion_err!(e))
}
}