You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by co...@apache.org on 2024/02/28 01:17:23 UTC

(arrow-datafusion-comet) branch main updated: build: Upgrade DF to 36.0.0 and arrow-rs 50.0.0 (#66)

This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new ee977c3  build: Upgrade DF to 36.0.0 and arrow-rs 50.0.0 (#66)
ee977c3 is described below

commit ee977c3d4277d57a657b97e05c077d66798e6457
Author: comphead <co...@users.noreply.github.com>
AuthorDate: Tue Feb 27 17:17:18 2024 -0800

    build: Upgrade DF to 36.0.0 and arrow-rs 50.0.0 (#66)
    
    * Upgrade DF and arrow-rs
    
    * fix benches
    
    * fix merge
    
    * fix merge
    
    * Update core/src/execution/datafusion/expressions/scalar_funcs.rs
    
    Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    
    * Update core/src/execution/datafusion/expressions/scalar_funcs.rs
    
    Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    
    ---------
    
    Co-authored-by: o_voievodin <o_...@apple.com>
    Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
---
 core/Cargo.lock                                    | 93 +++++++++++++++-------
 core/Cargo.toml                                    |  6 +-
 core/benches/common.rs                             |  2 +
 core/src/execution/datafusion/expressions/avg.rs   | 12 +--
 .../datafusion/expressions/avg_decimal.rs          | 10 +--
 .../datafusion/expressions/scalar_funcs.rs         | 20 +++--
 .../datafusion/expressions/sum_decimal.rs          | 10 +--
 core/src/execution/datafusion/planner.rs           |  2 +
 core/src/execution/operators/copy.rs               |  5 +-
 core/src/execution/operators/scan.rs               |  4 +-
 10 files changed, 103 insertions(+), 61 deletions(-)

diff --git a/core/Cargo.lock b/core/Cargo.lock
index 0f262c0..456d969 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -492,16 +492,16 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
 [[package]]
 name = "chrono"
-version = "0.4.31"
+version = "0.4.34"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38"
+checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b"
 dependencies = [
  "android-tzdata",
  "iana-time-zone",
  "js-sys",
  "num-traits",
  "wasm-bindgen",
- "windows-targets 0.48.5",
+ "windows-targets 0.52.0",
 ]
 
 [[package]]
@@ -650,8 +650,8 @@ version = "7.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7c64043d6c7b7a4c58e39e7efccfdea7b93d885a795d0c054a69dbbf4dd52686"
 dependencies = [
- "strum",
- "strum_macros",
+ "strum 0.25.0",
+ "strum_macros 0.25.3",
  "unicode-width",
 ]
 
@@ -833,9 +833,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion"
-version = "35.0.0"
+version = "36.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4328f5467f76d890fe3f924362dbc3a838c6a733f762b32d87f9e0b7bef5fb49"
+checksum = "b2b360b692bf6c6d6e6b6dbaf41a3be0020daeceac0f406aed54c75331e50dbb"
 dependencies = [
  "ahash",
  "arrow",
@@ -849,6 +849,7 @@ dependencies = [
  "datafusion-common",
  "datafusion-execution",
  "datafusion-expr",
+ "datafusion-functions",
  "datafusion-optimizer",
  "datafusion-physical-expr",
  "datafusion-physical-plan",
@@ -874,9 +875,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-common"
-version = "35.0.0"
+version = "36.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d29a7752143b446db4a2cccd9a6517293c6b97e8c39e520ca43ccd07135a4f7e"
+checksum = "37f343ccc298f440e25aa38ff82678291a7acc24061c7370ba6c0ff5cc811412"
 dependencies = [
  "ahash",
  "arrow",
@@ -893,9 +894,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-execution"
-version = "35.0.0"
+version = "36.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2d447650af16e138c31237f53ddaef6dd4f92f0e2d3f2f35d190e16c214ca496"
+checksum = "3f9c93043081487e335399a21ebf8295626367a647ac5cb87d41d18afad7d0f7"
 dependencies = [
  "arrow",
  "chrono",
@@ -914,9 +915,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-expr"
-version = "35.0.0"
+version = "36.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d8d19598e48a498850fb79f97a9719b1f95e7deb64a7a06f93f313e8fa1d524b"
+checksum = "e204d89909e678846b6a95f156aafc1ee5b36cb6c9e37ec2e1449b078a38c818"
 dependencies = [
  "ahash",
  "arrow",
@@ -924,15 +925,30 @@ dependencies = [
  "datafusion-common",
  "paste",
  "sqlparser",
- "strum",
- "strum_macros",
+ "strum 0.26.1",
+ "strum_macros 0.26.1",
+]
+
+[[package]]
+name = "datafusion-functions"
+version = "36.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "98f1c73f7801b2b8ba2297b3ad78ffcf6c1fc6b8171f502987eb9ad5cb244ee7"
+dependencies = [
+ "arrow",
+ "base64",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-expr",
+ "hex",
+ "log",
 ]
 
 [[package]]
 name = "datafusion-optimizer"
-version = "35.0.0"
+version = "36.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8b7feb0391f1fc75575acb95b74bfd276903dc37a5409fcebe160bc7ddff2010"
+checksum = "5ae27e07bf1f04d327be5c2a293470879801ab5535204dc3b16b062fda195496"
 dependencies = [
  "arrow",
  "async-trait",
@@ -948,9 +964,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-physical-expr"
-version = "35.0.0"
+version = "36.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e911bca609c89a54e8f014777449d8290327414d3e10c57a3e3c2122e38878d0"
+checksum = "dde620cd9ef76a3bca9c754fb68854bd2349c49f55baf97e08001f9e967f6d6b"
 dependencies = [
  "ahash",
  "arrow",
@@ -958,11 +974,13 @@ dependencies = [
  "arrow-buffer",
  "arrow-ord",
  "arrow-schema",
+ "arrow-string",
  "base64",
  "blake2",
  "blake3",
  "chrono",
  "datafusion-common",
+ "datafusion-execution",
  "datafusion-expr",
  "half 2.1.0",
  "hashbrown 0.14.3",
@@ -982,9 +1000,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-physical-plan"
-version = "35.0.0"
+version = "36.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e96b546b8a02e9c2ab35ac6420d511f12a4701950c1eb2e568c122b4fefb0be3"
+checksum = "9a4c75fba9ea99d64b2246cbd2fcae2e6fc973e6616b1015237a616036506dd4"
 dependencies = [
  "ahash",
  "arrow",
@@ -1013,9 +1031,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-sql"
-version = "35.0.0"
+version = "36.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2d18d36f260bbbd63aafdb55339213a23d540d3419810575850ef0a798a6b768"
+checksum = "21474a95c3a62d113599d21b439fa15091b538bac06bd20be0bb2e7d22903c09"
 dependencies = [
  "arrow",
  "arrow-schema",
@@ -2516,9 +2534,9 @@ checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b"
 
 [[package]]
 name = "sqlparser"
-version = "0.41.0"
+version = "0.43.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
+checksum = "f95c4bae5aba7cd30bd506f7140026ade63cff5afd778af8854026f9606bf5d4"
 dependencies = [
  "log",
  "sqlparser_derive",
@@ -2558,8 +2576,14 @@ name = "strum"
 version = "0.25.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125"
+
+[[package]]
+name = "strum"
+version = "0.26.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "723b93e8addf9aa965ebe2d11da6d7540fa2283fcea14b3371ff055f7ba13f5f"
 dependencies = [
- "strum_macros",
+ "strum_macros 0.26.1",
 ]
 
 [[package]]
@@ -2575,6 +2599,19 @@ dependencies = [
  "syn 2.0.48",
 ]
 
+[[package]]
+name = "strum_macros"
+version = "0.26.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7a3417fc93d76740d974a01654a09777cb500428cc874ca9f45edfe0c4d4cd18"
+dependencies = [
+ "heck 0.4.1",
+ "proc-macro2",
+ "quote",
+ "rustversion",
+ "syn 2.0.48",
+]
+
 [[package]]
 name = "subtle"
 version = "2.5.0"
@@ -2740,9 +2777,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
 
 [[package]]
 name = "tokio"
-version = "1.35.1"
+version = "1.36.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104"
+checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931"
 dependencies = [
  "backtrace",
  "bytes",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 14e2717..4dc5afe 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -66,9 +66,9 @@ itertools = "0.11.0"
 chrono = { version = "0.4", default-features = false, features = ["clock"] }
 chrono-tz = { version = "0.8" }
 paste = "1.0.14"
-datafusion-common = { version = "35.0.0" }
-datafusion = { default-features = false, version = "35.0.0", features = ["unicode_expressions"] }
-datafusion-physical-expr = { version = "35.0.0", default-features = false , features = ["unicode_expressions"] }
+datafusion-common = { version = "36.0.0" }
+datafusion = { default-features = false, version = "36.0.0", features = ["unicode_expressions"] }
+datafusion-physical-expr = { version = "36.0.0", default-features = false , features = ["unicode_expressions"] }
 unicode-segmentation = "^1.10.1"
 once_cell = "1.18.0"
 regex = "1.9.6"
diff --git a/core/benches/common.rs b/core/benches/common.rs
index 0597216..15952b8 100644
--- a/core/benches/common.rs
+++ b/core/benches/common.rs
@@ -45,6 +45,7 @@ pub fn create_int64_array(size: usize, null_density: f32, min: i64, max: i64) ->
         .collect()
 }
 
+#[allow(dead_code)]
 pub fn create_primitive_array<T>(size: usize, null_density: f32) -> PrimitiveArray<T>
 where
     T: ArrowPrimitiveType,
@@ -64,6 +65,7 @@ where
 
 /// Creates a dictionary with random keys and values, with value type `T`.
 /// Note here the keys are the dictionary indices.
+#[allow(dead_code)]
 pub fn create_dictionary_array<T>(
     size: usize,
     value_size: usize,
diff --git a/core/src/execution/datafusion/expressions/avg.rs b/core/src/execution/datafusion/expressions/avg.rs
index dc2b347..1e04ab0 100644
--- a/core/src/execution/datafusion/expressions/avg.rs
+++ b/core/src/execution/datafusion/expressions/avg.rs
@@ -24,11 +24,11 @@ use arrow_array::{
     Array, ArrayRef, ArrowNumericType, Int64Array, PrimitiveArray,
 };
 use arrow_schema::{DataType, Field};
-use datafusion::logical_expr::{type_coercion::aggregates::avg_return_type, Accumulator};
-use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
-use datafusion_physical_expr::{
-    expressions::format_state_name, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr,
+use datafusion::logical_expr::{
+    type_coercion::aggregates::avg_return_type, Accumulator, EmitTo, GroupsAccumulator,
 };
+use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
+use datafusion_physical_expr::{expressions::format_state_name, AggregateExpr, PhysicalExpr};
 use std::{any::Any, sync::Arc};
 
 use arrow_array::ArrowNativeTypeOp;
@@ -146,7 +146,7 @@ pub struct AvgAccumulator {
 }
 
 impl Accumulator for AvgAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![
             ScalarValue::Float64(self.sum),
             ScalarValue::from(self.count),
@@ -175,7 +175,7 @@ impl Accumulator for AvgAccumulator {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         Ok(ScalarValue::Float64(
             self.sum.map(|f| f / self.count as f64),
         ))
diff --git a/core/src/execution/datafusion/expressions/avg_decimal.rs b/core/src/execution/datafusion/expressions/avg_decimal.rs
index dc7bf15..6fb5581 100644
--- a/core/src/execution/datafusion/expressions/avg_decimal.rs
+++ b/core/src/execution/datafusion/expressions/avg_decimal.rs
@@ -24,11 +24,9 @@ use arrow_array::{
     Array, ArrayRef, Decimal128Array, Int64Array, PrimitiveArray,
 };
 use arrow_schema::{DataType, Field};
-use datafusion::logical_expr::Accumulator;
+use datafusion::logical_expr::{Accumulator, EmitTo, GroupsAccumulator};
 use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
-use datafusion_physical_expr::{
-    expressions::format_state_name, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr,
-};
+use datafusion_physical_expr::{expressions::format_state_name, AggregateExpr, PhysicalExpr};
 use std::{any::Any, sync::Arc};
 
 use arrow_array::ArrowNativeTypeOp;
@@ -214,7 +212,7 @@ impl AvgDecimalAccumulator {
 }
 
 impl Accumulator for AvgDecimalAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![
             ScalarValue::Decimal128(self.sum, self.sum_precision, self.sum_scale),
             ScalarValue::from(self.count),
@@ -266,7 +264,7 @@ impl Accumulator for AvgDecimalAccumulator {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         fn make_decimal128(value: Option<i128>, precision: u8, scale: i8) -> ScalarValue {
             ScalarValue::Decimal128(value, precision, scale)
         }
diff --git a/core/src/execution/datafusion/expressions/scalar_funcs.rs b/core/src/execution/datafusion/expressions/scalar_funcs.rs
index 8759566..8ff13e1 100644
--- a/core/src/execution/datafusion/expressions/scalar_funcs.rs
+++ b/core/src/execution/datafusion/expressions/scalar_funcs.rs
@@ -31,13 +31,11 @@ use datafusion::{
     physical_plan::ColumnarValue,
 };
 use datafusion_common::{
-    cast::as_generic_string_array, internal_err, DataFusionError, Result as DataFusionResult,
-    ScalarValue,
+    cast::as_generic_string_array, exec_err, internal_err, DataFusionError,
+    Result as DataFusionResult, ScalarValue,
 };
 use datafusion_physical_expr::{
-    execution_props::ExecutionProps,
-    functions::{create_physical_fun, make_scalar_function},
-    math_expressions,
+    execution_props::ExecutionProps, functions::create_physical_fun, math_expressions,
 };
 use num::{BigInt, Signed, ToPrimitive};
 use unicode_segmentation::UnicodeSegmentation;
@@ -366,7 +364,12 @@ fn spark_round(
                 let (precision, scale) = get_precision_scale(data_type);
                 make_decimal_array(array, precision, scale, &f)
             }
-            _ => make_scalar_function(math_expressions::round)(args),
+            DataType::Float32 | DataType::Float64 => {
+                Ok(ColumnarValue::Array(math_expressions::round(&[
+                    array.clone()
+                ])?))
+            }
+            dt => exec_err!("Not supported datatype for ROUND: {dt}"),
         },
         ColumnarValue::Scalar(a) => match a {
             ScalarValue::Int64(a) if *point < 0 => {
@@ -386,7 +389,10 @@ fn spark_round(
                 let (precision, scale) = get_precision_scale(data_type);
                 make_decimal_scalar(a, precision, scale, &f)
             }
-            _ => make_scalar_function(math_expressions::round)(args),
+            ScalarValue::Float32(_) | ScalarValue::Float64(_) => Ok(ColumnarValue::Scalar(
+                ScalarValue::try_from_array(&math_expressions::round(&[a.to_array()?])?, 0)?,
+            )),
+            dt => exec_err!("Not supported datatype for ROUND: {dt}"),
         },
     }
 }
diff --git a/core/src/execution/datafusion/expressions/sum_decimal.rs b/core/src/execution/datafusion/expressions/sum_decimal.rs
index a6da5f5..2afbbf0 100644
--- a/core/src/execution/datafusion/expressions/sum_decimal.rs
+++ b/core/src/execution/datafusion/expressions/sum_decimal.rs
@@ -24,11 +24,9 @@ use arrow_array::{
 };
 use arrow_data::decimal::validate_decimal_precision;
 use arrow_schema::{DataType, Field};
-use datafusion::logical_expr::Accumulator;
+use datafusion::logical_expr::{Accumulator, EmitTo, GroupsAccumulator};
 use datafusion_common::{Result as DFResult, ScalarValue};
-use datafusion_physical_expr::{
-    aggregate::utils::down_cast_any_ref, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr,
-};
+use datafusion_physical_expr::{aggregate::utils::down_cast_any_ref, AggregateExpr, PhysicalExpr};
 use std::{any::Any, ops::BitAnd, sync::Arc};
 
 use crate::unlikely;
@@ -204,7 +202,7 @@ impl Accumulator for SumDecimalAccumulator {
         Ok(())
     }
 
-    fn evaluate(&self) -> DFResult<ScalarValue> {
+    fn evaluate(&mut self) -> DFResult<ScalarValue> {
         // For each group:
         //   1. if `is_empty` is true, it means either there is no value or all values for the group
         //      are null, in this case we'll return null
@@ -224,7 +222,7 @@ impl Accumulator for SumDecimalAccumulator {
         std::mem::size_of_val(self)
     }
 
-    fn state(&self) -> DFResult<Vec<ScalarValue>> {
+    fn state(&mut self) -> DFResult<Vec<ScalarValue>> {
         let sum = if self.is_not_null {
             ScalarValue::try_new_decimal128(self.sum, self.precision, self.scale)?
         } else {
diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs
index 66a29cb..f4a0cec 100644
--- a/core/src/execution/datafusion/planner.rs
+++ b/core/src/execution/datafusion/planner.rs
@@ -608,6 +608,7 @@ impl PhysicalPlanner {
                     vec![left, right],
                     data_type,
                     None,
+                    false,
                 )))
             }
             _ => Ok(Arc::new(BinaryExpr::new(left, op, right))),
@@ -984,6 +985,7 @@ impl PhysicalPlanner {
             args.to_vec(),
             data_type,
             None,
+            args.is_empty(),
         ));
 
         Ok(scalar_expr)
diff --git a/core/src/execution/operators/copy.rs b/core/src/execution/operators/copy.rs
index c818d62..996db2b 100644
--- a/core/src/execution/operators/copy.rs
+++ b/core/src/execution/operators/copy.rs
@@ -28,7 +28,7 @@ use arrow_array::{ArrayRef, RecordBatch};
 use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
 
 use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*};
-use datafusion_common::{DataFusionError, Result as DataFusionResult};
+use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult};
 
 use super::copy_or_cast_array;
 
@@ -141,8 +141,7 @@ impl CopyStream {
             .iter()
             .map(|v| copy_or_cast_array(v))
             .collect::<Result<Vec<ArrayRef>, _>>()?;
-        RecordBatch::try_new(self.schema.clone(), vectors)
-            .map_err(|err| DataFusionError::ArrowError(err, None))
+        RecordBatch::try_new(self.schema.clone(), vectors).map_err(|e| arrow_datafusion_err!(e))
     }
 }
 
diff --git a/core/src/execution/operators/scan.rs b/core/src/execution/operators/scan.rs
index 9f85de8..e31230c 100644
--- a/core/src/execution/operators/scan.rs
+++ b/core/src/execution/operators/scan.rs
@@ -43,7 +43,7 @@ use datafusion::{
     physical_expr::*,
     physical_plan::{ExecutionPlan, *},
 };
-use datafusion_common::{DataFusionError, Result as DataFusionResult};
+use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult};
 use jni::{
     objects::{GlobalRef, JLongArray, JObject, ReleaseMode},
     sys::jlongArray,
@@ -325,7 +325,7 @@ impl ScanStream {
 
         let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
         RecordBatch::try_new_with_options(self.schema.clone(), new_columns, &options)
-            .map_err(|err| DataFusionError::ArrowError(err, None))
+            .map_err(|e| arrow_datafusion_err!(e))
     }
 }