You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/13 12:13:55 UTC
[arrow-datafusion] branch master updated: Minor: Remove datafusion-core dev dependency from datafusion-sql (#4589)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 0de7d819f Minor: Remove datafusion-core dev dependency from datafusion-sql (#4589)
0de7d819f is described below
commit 0de7d819f5f598ab7606a111ac8b578aeb1cb967
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Dec 13 07:13:49 2022 -0500
Minor: Remove datafusion-core dev dependency from datafusion-sql (#4589)
---
datafusion/core/tests/sql/udf.rs | 62 ++++++++++++++++++++++++++++++++++++++--
datafusion/sql/Cargo.toml | 3 --
datafusion/sql/src/planner.rs | 61 ---------------------------------------
3 files changed, 60 insertions(+), 66 deletions(-)
diff --git a/datafusion/core/tests/sql/udf.rs b/datafusion/core/tests/sql/udf.rs
index f554b9424..37a869f93 100644
--- a/datafusion/core/tests/sql/udf.rs
+++ b/datafusion/core/tests/sql/udf.rs
@@ -21,8 +21,8 @@ use datafusion::{
execution::registry::FunctionRegistry,
physical_plan::{expressions::AvgAccumulator, functions::make_scalar_function},
};
-use datafusion_common::cast::as_int32_array;
-use datafusion_expr::{create_udaf, LogicalPlanBuilder};
+use datafusion_common::{cast::as_int32_array, ScalarValue};
+use datafusion_expr::{create_udaf, Accumulator, AggregateState, LogicalPlanBuilder};
/// test that casting happens on udfs.
/// c11 is f32, but `custom_sqrt` requires f64. Casting happens but the logical plan and
@@ -168,3 +168,61 @@ async fn simple_udaf() -> Result<()> {
Ok(())
}
+
+#[test]
+fn udaf_as_window_func() -> Result<()> {
+ #[derive(Debug)]
+ struct MyAccumulator;
+
+ impl Accumulator for MyAccumulator {
+ fn state(&self) -> Result<Vec<AggregateState>> {
+ unimplemented!()
+ }
+
+ fn update_batch(&mut self, _: &[ArrayRef]) -> Result<()> {
+ unimplemented!()
+ }
+
+ fn merge_batch(&mut self, _: &[ArrayRef]) -> Result<()> {
+ unimplemented!()
+ }
+
+ fn evaluate(&self) -> Result<ScalarValue> {
+ unimplemented!()
+ }
+
+ fn size(&self) -> usize {
+ unimplemented!()
+ }
+ }
+
+ let my_acc = create_udaf(
+ "my_acc",
+ DataType::Int32,
+ Arc::new(DataType::Int32),
+ Volatility::Immutable,
+ Arc::new(|_| Ok(Box::new(MyAccumulator))),
+ Arc::new(vec![DataType::Int32]),
+ );
+
+ let mut context = SessionContext::new();
+ context.register_table(
+ "my_table",
+ Arc::new(datafusion::datasource::empty::EmptyTable::new(Arc::new(
+ Schema::new(vec![
+ Field::new("a", DataType::UInt32, false),
+ Field::new("b", DataType::Int32, false),
+ ]),
+ ))),
+ )?;
+ context.register_udaf(my_acc);
+
+ let sql = "SELECT a, MY_ACC(b) OVER(PARTITION BY a) FROM my_table";
+ let expected = r#"Projection: my_table.a, AggregateUDF { name: "my_acc", signature: Signature { type_signature: Exact([Int32]), volatility: Immutable }, fun: "<FUNC>" }(my_table.b) PARTITION BY [my_table.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
+ WindowAggr: windowExpr=[[AggregateUDF { name: "my_acc", signature: Signature { type_signature: Exact([Int32]), volatility: Immutable }, fun: "<FUNC>" }(my_table.b) PARTITION BY [my_table.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
+ TableScan: my_table"#;
+
+ let plan = context.create_logical_plan(sql)?;
+ assert_eq!(format!("{:?}", plan), expected);
+ Ok(())
+}
diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml
index bc6add89a..dd19e6aff 100644
--- a/datafusion/sql/Cargo.toml
+++ b/datafusion/sql/Cargo.toml
@@ -42,6 +42,3 @@ datafusion-common = { path = "../common", version = "15.0.0" }
datafusion-expr = { path = "../expr", version = "15.0.0" }
log = "^0.4"
sqlparser = "0.28"
-
-[dev-dependencies]
-datafusion = { path = "../core" }
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index a04a71d4e..86057aa62 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -3301,14 +3301,11 @@ fn ensure_any_column_reference_is_unambiguous(
#[cfg(test)]
mod tests {
- use datafusion::arrow::array::ArrayRef;
- use datafusion::prelude::SessionContext;
use std::any::Any;
use sqlparser::dialect::{Dialect, GenericDialect, HiveDialect, MySqlDialect};
use datafusion_common::assert_contains;
- use datafusion_expr::{create_udaf, Accumulator, AggregateState, Volatility};
use super::*;
@@ -5330,64 +5327,6 @@ mod tests {
quick_test(sql, expected);
}
- #[test]
- fn udaf_as_window_func() -> Result<()> {
- #[derive(Debug)]
- struct MyAccumulator;
-
- impl Accumulator for MyAccumulator {
- fn state(&self) -> Result<Vec<AggregateState>> {
- unimplemented!()
- }
-
- fn update_batch(&mut self, _: &[ArrayRef]) -> Result<()> {
- unimplemented!()
- }
-
- fn merge_batch(&mut self, _: &[ArrayRef]) -> Result<()> {
- unimplemented!()
- }
-
- fn evaluate(&self) -> Result<ScalarValue> {
- unimplemented!()
- }
-
- fn size(&self) -> usize {
- unimplemented!()
- }
- }
-
- let my_acc = create_udaf(
- "my_acc",
- DataType::Int32,
- Arc::new(DataType::Int32),
- Volatility::Immutable,
- Arc::new(|_| Ok(Box::new(MyAccumulator))),
- Arc::new(vec![DataType::Int32]),
- );
-
- let mut context = SessionContext::new();
- context.register_table(
- TableReference::Bare { table: "my_table" },
- Arc::new(datafusion::datasource::empty::EmptyTable::new(Arc::new(
- Schema::new(vec![
- Field::new("a", DataType::UInt32, false),
- Field::new("b", DataType::Int32, false),
- ]),
- ))),
- )?;
- context.register_udaf(my_acc);
-
- let sql = "SELECT a, MY_ACC(b) OVER(PARTITION BY a) FROM my_table";
- let expected = r#"Projection: my_table.a, AggregateUDF { name: "my_acc", signature: Signature { type_signature: Exact([Int32]), volatility: Immutable }, fun: "<FUNC>" }(my_table.b) PARTITION BY [my_table.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
- WindowAggr: windowExpr=[[AggregateUDF { name: "my_acc", signature: Signature { type_signature: Exact([Int32]), volatility: Immutable }, fun: "<FUNC>" }(my_table.b) PARTITION BY [my_table.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
- TableScan: my_table"#;
-
- let plan = context.create_logical_plan(sql)?;
- assert_eq!(format!("{:?}", plan), expected);
- Ok(())
- }
-
#[test]
fn select_typed_date_string() {
let sql = "SELECT date '2020-12-10' AS date";