You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2024/01/03 12:25:04 UTC
(arrow-datafusion) branch main updated: Implement trait based API for defining WindowUDF (#8719)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6b1e9c6a3a Implement trait based API for defining WindowUDF (#8719)
6b1e9c6a3a is described below
commit 6b1e9c6a3ae95b7065e902d99d9fde66f0f8e054
Author: junxiangMu <63...@users.noreply.github.com>
AuthorDate: Wed Jan 3 20:24:58 2024 +0800
Implement trait based API for defining WindowUDF (#8719)
* Implement trait based API for defining WindowUDF
* add test case & docs
* fix docs
* rename WindowUDFImpl function
---
datafusion-examples/README.md | 1 +
datafusion-examples/examples/advanced_udwf.rs | 230 +++++++++++++++++++++
.../user_defined/user_defined_window_functions.rs | 64 ++++--
datafusion/expr/src/expr_fn.rs | 67 +++++-
datafusion/expr/src/lib.rs | 2 +-
datafusion/expr/src/udwf.rs | 116 ++++++++++-
.../proto/tests/cases/roundtrip_logical_plan.rs | 55 +++--
docs/source/library-user-guide/adding-udfs.md | 7 +-
8 files changed, 498 insertions(+), 44 deletions(-)
diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index 1296c74ea2..aae451add9 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -63,6 +63,7 @@ cargo run --example csv_sql
- [`advanced_udf.rs`](examples/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF)
- [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
- [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF)
+- [`advanced_udwf.rs`](examples/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF)
## Distributed
diff --git a/datafusion-examples/examples/advanced_udwf.rs b/datafusion-examples/examples/advanced_udwf.rs
new file mode 100644
index 0000000000..91869d80a4
--- /dev/null
+++ b/datafusion-examples/examples/advanced_udwf.rs
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
+use std::any::Any;
+
+use arrow::{
+ array::{ArrayRef, AsArray, Float64Array},
+ datatypes::Float64Type,
+};
+use datafusion::error::Result;
+use datafusion::prelude::*;
+use datafusion_common::ScalarValue;
+use datafusion_expr::{
+ PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
+};
+
+/// This example shows how to use the full WindowUDFImpl API to implement a user
+/// defined window function. As in the `simple_udwf.rs` example, this struct implements
+/// a function `partition_evaluator` that returns the `MyPartitionEvaluator` instance.
+///
+/// To do so, we must implement the `WindowUDFImpl` trait.
+struct SmoothItUdf {
+ signature: Signature,
+}
+
+impl SmoothItUdf {
+ /// Create a new instance of the SmoothItUdf struct
+ fn new() -> Self {
+ Self {
+ signature: Signature::exact(
+ // this function will always take one arguments of type f64
+ vec![DataType::Float64],
+ // this function is deterministic and will always return the same
+ // result for the same input
+ Volatility::Immutable,
+ ),
+ }
+ }
+}
+
+impl WindowUDFImpl for SmoothItUdf {
+ /// We implement as_any so that we can downcast the WindowUDFImpl trait object
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ /// Return the name of this function
+ fn name(&self) -> &str {
+ "smooth_it"
+ }
+
+ /// Return the "signature" of this function -- namely that types of arguments it will take
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ /// What is the type of value that will be returned by this function.
+ fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+ Ok(DataType::Float64)
+ }
+
+ /// Create a `PartitionEvalutor` to evaluate this function on a new
+ /// partition.
+ fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
+ Ok(Box::new(MyPartitionEvaluator::new()))
+ }
+}
+
+/// This implements the lowest level evaluation for a window function
+///
+/// It handles calculating the value of the window function for each
+/// distinct values of `PARTITION BY` (each car type in our example)
+#[derive(Clone, Debug)]
+struct MyPartitionEvaluator {}
+
+impl MyPartitionEvaluator {
+ fn new() -> Self {
+ Self {}
+ }
+}
+
+/// Different evaluation methods are called depending on the various
+/// settings of WindowUDF. This example uses the simplest and most
+/// general, `evaluate`. See `PartitionEvaluator` for the other more
+/// advanced uses.
+impl PartitionEvaluator for MyPartitionEvaluator {
+ /// Tell DataFusion the window function varies based on the value
+ /// of the window frame.
+ fn uses_window_frame(&self) -> bool {
+ true
+ }
+
+ /// This function is called once per input row.
+ ///
+ /// `range`specifies which indexes of `values` should be
+ /// considered for the calculation.
+ ///
+ /// Note this is the SLOWEST, but simplest, way to evaluate a
+ /// window function. It is much faster to implement
+ /// evaluate_all or evaluate_all_with_rank, if possible
+ fn evaluate(
+ &mut self,
+ values: &[ArrayRef],
+ range: &std::ops::Range<usize>,
+ ) -> Result<ScalarValue> {
+ // Again, the input argument is an array of floating
+ // point numbers to calculate a moving average
+ let arr: &Float64Array = values[0].as_ref().as_primitive::<Float64Type>();
+
+ let range_len = range.end - range.start;
+
+ // our smoothing function will average all the values in the
+ let output = if range_len > 0 {
+ let sum: f64 = arr.values().iter().skip(range.start).take(range_len).sum();
+ Some(sum / range_len as f64)
+ } else {
+ None
+ };
+
+ Ok(ScalarValue::Float64(output))
+ }
+}
+
+// create local execution context with `cars.csv` registered as a table named `cars`
+async fn create_context() -> Result<SessionContext> {
+ // declare a new context. In spark API, this corresponds to a new spark SQL session
+ let ctx = SessionContext::new();
+
+ // declare a table in memory. In spark API, this corresponds to createDataFrame(...).
+ println!("pwd: {}", std::env::current_dir().unwrap().display());
+ let csv_path = "../../datafusion/core/tests/data/cars.csv".to_string();
+ let read_options = CsvReadOptions::default().has_header(true);
+
+ ctx.register_csv("cars", &csv_path, read_options).await?;
+ Ok(ctx)
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let ctx = create_context().await?;
+ let smooth_it = WindowUDF::from(SmoothItUdf::new());
+ ctx.register_udwf(smooth_it.clone());
+
+ // Use SQL to run the new window function
+ let df = ctx.sql("SELECT * from cars").await?;
+ // print the results
+ df.show().await?;
+
+ // Use SQL to run the new window function:
+ //
+ // `PARTITION BY car`:each distinct value of car (red, and green)
+ // should be treated as a separate partition (and will result in
+ // creating a new `PartitionEvaluator`)
+ //
+ // `ORDER BY time`: within each partition ('green' or 'red') the
+ // rows will be be ordered by the value in the `time` column
+ //
+ // `evaluate_inside_range` is invoked with a window defined by the
+ // SQL. In this case:
+ //
+ // The first invocation will be passed row 0, the first row in the
+ // partition.
+ //
+ // The second invocation will be passed rows 0 and 1, the first
+ // two rows in the partition.
+ //
+ // etc.
+ let df = ctx
+ .sql(
+ "SELECT \
+ car, \
+ speed, \
+ smooth_it(speed) OVER (PARTITION BY car ORDER BY time) AS smooth_speed,\
+ time \
+ from cars \
+ ORDER BY \
+ car",
+ )
+ .await?;
+ // print the results
+ df.show().await?;
+
+ // this time, call the new widow function with an explicit
+ // window so evaluate will be invoked with each window.
+ //
+ // `ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING`: each invocation
+ // sees at most 3 rows: the row before, the current row, and the 1
+ // row afterward.
+ let df = ctx.sql(
+ "SELECT \
+ car, \
+ speed, \
+ smooth_it(speed) OVER (PARTITION BY car ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS smooth_speed,\
+ time \
+ from cars \
+ ORDER BY \
+ car",
+ ).await?;
+ // print the results
+ df.show().await?;
+
+ // Now, run the function using the DataFrame API:
+ let window_expr = smooth_it.call(
+ vec![col("speed")], // smooth_it(speed)
+ vec![col("car")], // PARTITION BY car
+ vec![col("time").sort(true, true)], // ORDER BY time ASC
+ WindowFrame::new(false),
+ );
+ let df = ctx.table("cars").await?.window(vec![window_expr])?;
+
+ // print the results
+ df.show().await?;
+
+ Ok(())
+}
diff --git a/datafusion/core/tests/user_defined/user_defined_window_functions.rs b/datafusion/core/tests/user_defined/user_defined_window_functions.rs
index 5f99391572..3040fbafe8 100644
--- a/datafusion/core/tests/user_defined/user_defined_window_functions.rs
+++ b/datafusion/core/tests/user_defined/user_defined_window_functions.rs
@@ -19,6 +19,7 @@
//! user defined window functions
use std::{
+ any::Any,
ops::Range,
sync::{
atomic::{AtomicUsize, Ordering},
@@ -32,8 +33,7 @@ use arrow_schema::DataType;
use datafusion::{assert_batches_eq, prelude::SessionContext};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{
- function::PartitionEvaluatorFactory, PartitionEvaluator, ReturnTypeFunction,
- Signature, Volatility, WindowUDF,
+ PartitionEvaluator, Signature, Volatility, WindowUDF, WindowUDFImpl,
};
/// A query with a window function evaluated over the entire partition
@@ -471,24 +471,48 @@ impl OddCounter {
}
fn register(ctx: &mut SessionContext, test_state: Arc<TestState>) {
- let name = "odd_counter";
- let volatility = Volatility::Immutable;
-
- let signature = Signature::exact(vec![DataType::Int64], volatility);
-
- let return_type = Arc::new(DataType::Int64);
- let return_type: ReturnTypeFunction =
- Arc::new(move |_| Ok(Arc::clone(&return_type)));
-
- let partition_evaluator_factory: PartitionEvaluatorFactory =
- Arc::new(move || Ok(Box::new(OddCounter::new(Arc::clone(&test_state)))));
-
- ctx.register_udwf(WindowUDF::new(
- name,
- &signature,
- &return_type,
- &partition_evaluator_factory,
- ))
+ struct SimpleWindowUDF {
+ signature: Signature,
+ return_type: DataType,
+ test_state: Arc<TestState>,
+ }
+
+ impl SimpleWindowUDF {
+ fn new(test_state: Arc<TestState>) -> Self {
+ let signature =
+ Signature::exact(vec![DataType::Float64], Volatility::Immutable);
+ let return_type = DataType::Int64;
+ Self {
+ signature,
+ return_type,
+ test_state,
+ }
+ }
+ }
+
+ impl WindowUDFImpl for SimpleWindowUDF {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "odd_counter"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+ Ok(self.return_type.clone())
+ }
+
+ fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
+ Ok(Box::new(OddCounter::new(Arc::clone(&self.test_state))))
+ }
+ }
+
+ ctx.register_udwf(WindowUDF::from(SimpleWindowUDF::new(test_state)))
}
}
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index eed41d97cc..f76fb17b38 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -28,7 +28,7 @@ use crate::{
BuiltinScalarFunction, Expr, LogicalPlan, Operator, ReturnTypeFunction,
ScalarFunctionImplementation, ScalarUDF, Signature, StateTypeFunction, Volatility,
};
-use crate::{ColumnarValue, ScalarUDFImpl, WindowUDF};
+use crate::{ColumnarValue, ScalarUDFImpl, WindowUDF, WindowUDFImpl};
use arrow::datatypes::DataType;
use datafusion_common::{Column, Result};
use std::any::Any;
@@ -1059,13 +1059,66 @@ pub fn create_udwf(
volatility: Volatility,
partition_evaluator_factory: PartitionEvaluatorFactory,
) -> WindowUDF {
- let return_type: ReturnTypeFunction = Arc::new(move |_| Ok(return_type.clone()));
- WindowUDF::new(
+ let return_type = Arc::try_unwrap(return_type).unwrap_or_else(|t| t.as_ref().clone());
+ WindowUDF::from(SimpleWindowUDF::new(
name,
- &Signature::exact(vec![input_type], volatility),
- &return_type,
- &partition_evaluator_factory,
- )
+ input_type,
+ return_type,
+ volatility,
+ partition_evaluator_factory,
+ ))
+}
+
+/// Implements [`WindowUDFImpl`] for functions that have a single signature and
+/// return type.
+pub struct SimpleWindowUDF {
+ name: String,
+ signature: Signature,
+ return_type: DataType,
+ partition_evaluator_factory: PartitionEvaluatorFactory,
+}
+
+impl SimpleWindowUDF {
+ /// Create a new `SimpleWindowUDF` from a name, input types, return type and
+ /// implementation. Implementing [`WindowUDFImpl`] allows more flexibility
+ pub fn new(
+ name: impl Into<String>,
+ input_type: DataType,
+ return_type: DataType,
+ volatility: Volatility,
+ partition_evaluator_factory: PartitionEvaluatorFactory,
+ ) -> Self {
+ let name = name.into();
+ let signature = Signature::exact([input_type].to_vec(), volatility);
+ Self {
+ name,
+ signature,
+ return_type,
+ partition_evaluator_factory,
+ }
+ }
+}
+
+impl WindowUDFImpl for SimpleWindowUDF {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ &self.name
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+ Ok(self.return_type.clone())
+ }
+
+ fn partition_evaluator(&self) -> Result<Box<dyn crate::PartitionEvaluator>> {
+ (self.partition_evaluator_factory)()
+ }
}
/// Calls a named built in function
diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs
index ab213a19a3..077681d217 100644
--- a/datafusion/expr/src/lib.rs
+++ b/datafusion/expr/src/lib.rs
@@ -82,7 +82,7 @@ pub use signature::{
pub use table_source::{TableProviderFilterPushDown, TableSource, TableType};
pub use udaf::AggregateUDF;
pub use udf::{ScalarUDF, ScalarUDFImpl};
-pub use udwf::WindowUDF;
+pub use udwf::{WindowUDF, WindowUDFImpl};
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
#[cfg(test)]
diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs
index a97a68341f..800386bfc7 100644
--- a/datafusion/expr/src/udwf.rs
+++ b/datafusion/expr/src/udwf.rs
@@ -24,6 +24,7 @@ use crate::{
use arrow::datatypes::DataType;
use datafusion_common::Result;
use std::{
+ any::Any,
fmt::{self, Debug, Display, Formatter},
sync::Arc,
};
@@ -80,7 +81,11 @@ impl std::hash::Hash for WindowUDF {
}
impl WindowUDF {
- /// Create a new WindowUDF
+ /// Create a new WindowUDF from low level details.
+ ///
+ /// See [`WindowUDFImpl`] for a more convenient way to create a
+ /// `WindowUDF` using trait objects
+ #[deprecated(since = "34.0.0", note = "please implement ScalarUDFImpl instead")]
pub fn new(
name: &str,
signature: &Signature,
@@ -95,6 +100,32 @@ impl WindowUDF {
}
}
+ /// Create a new `WindowUDF` from a `[WindowUDFImpl]` trait object
+ ///
+ /// Note this is the same as using the `From` impl (`WindowUDF::from`)
+ pub fn new_from_impl<F>(fun: F) -> WindowUDF
+ where
+ F: WindowUDFImpl + Send + Sync + 'static,
+ {
+ let arc_fun = Arc::new(fun);
+ let captured_self = arc_fun.clone();
+ let return_type: ReturnTypeFunction = Arc::new(move |arg_types| {
+ let return_type = captured_self.return_type(arg_types)?;
+ Ok(Arc::new(return_type))
+ });
+
+ let captured_self = arc_fun.clone();
+ let partition_evaluator_factory: PartitionEvaluatorFactory =
+ Arc::new(move || captured_self.partition_evaluator());
+
+ Self {
+ name: arc_fun.name().to_string(),
+ signature: arc_fun.signature().clone(),
+ return_type: return_type.clone(),
+ partition_evaluator_factory,
+ }
+ }
+
/// creates a [`Expr`] that calls the window function given
/// the `partition_by`, `order_by`, and `window_frame` definition
///
@@ -140,3 +171,86 @@ impl WindowUDF {
(self.partition_evaluator_factory)()
}
}
+
+impl<F> From<F> for WindowUDF
+where
+ F: WindowUDFImpl + Send + Sync + 'static,
+{
+ fn from(fun: F) -> Self {
+ Self::new_from_impl(fun)
+ }
+}
+
+/// Trait for implementing [`WindowUDF`].
+///
+/// This trait exposes the full API for implementing user defined window functions and
+/// can be used to implement any function.
+///
+/// See [`advanced_udwf.rs`] for a full example with complete implementation and
+/// [`WindowUDF`] for other available options.
+///
+///
+/// [`advanced_udwf.rs`]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/advanced_udwf.rs
+/// # Basic Example
+/// ```
+/// # use std::any::Any;
+/// # use arrow::datatypes::DataType;
+/// # use datafusion_common::{DataFusionError, plan_err, Result};
+/// # use datafusion_expr::{col, Signature, Volatility, PartitionEvaluator, WindowFrame};
+/// # use datafusion_expr::{WindowUDFImpl, WindowUDF};
+/// struct SmoothIt {
+/// signature: Signature
+/// };
+///
+/// impl SmoothIt {
+/// fn new() -> Self {
+/// Self {
+/// signature: Signature::uniform(1, vec![DataType::Int32], Volatility::Immutable)
+/// }
+/// }
+/// }
+///
+/// /// Implement the WindowUDFImpl trait for AddOne
+/// impl WindowUDFImpl for SmoothIt {
+/// fn as_any(&self) -> &dyn Any { self }
+/// fn name(&self) -> &str { "smooth_it" }
+/// fn signature(&self) -> &Signature { &self.signature }
+/// fn return_type(&self, args: &[DataType]) -> Result<DataType> {
+/// if !matches!(args.get(0), Some(&DataType::Int32)) {
+/// return plan_err!("smooth_it only accepts Int32 arguments");
+/// }
+/// Ok(DataType::Int32)
+/// }
+/// // The actual implementation would add one to the argument
+/// fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> { unimplemented!() }
+/// }
+///
+/// // Create a new ScalarUDF from the implementation
+/// let smooth_it = WindowUDF::from(SmoothIt::new());
+///
+/// // Call the function `add_one(col)`
+/// let expr = smooth_it.call(
+/// vec![col("speed")], // smooth_it(speed)
+/// vec![col("car")], // PARTITION BY car
+/// vec![col("time").sort(true, true)], // ORDER BY time ASC
+/// WindowFrame::new(false),
+/// );
+/// ```
+pub trait WindowUDFImpl {
+ /// Returns this object as an [`Any`] trait object
+ fn as_any(&self) -> &dyn Any;
+
+ /// Returns this function's name
+ fn name(&self) -> &str;
+
+ /// Returns the function's [`Signature`] for information about what input
+ /// types are accepted and the function's Volatility.
+ fn signature(&self) -> &Signature;
+
+ /// What [`DataType`] will be returned by this function, given the types of
+ /// the arguments
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>;
+
+ /// Invoke the function, returning the [`PartitionEvaluator`] instance
+ fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>>;
+}
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index dea99f91e3..402781e17e 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::any::Any;
use std::collections::HashMap;
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
@@ -54,6 +55,7 @@ use datafusion_expr::{
BuiltinScalarFunction::{Sqrt, Substr},
Expr, LogicalPlan, Operator, PartitionEvaluator, Signature, TryCast, Volatility,
WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, WindowUDF,
+ WindowUDFImpl,
};
use datafusion_proto::bytes::{
logical_plan_from_bytes, logical_plan_from_bytes_with_extension_codec,
@@ -1785,27 +1787,52 @@ fn roundtrip_window() {
}
}
- fn return_type(arg_types: &[DataType]) -> Result<Arc<DataType>> {
- if arg_types.len() != 1 {
- return plan_err!(
- "dummy_udwf expects 1 argument, got {}: {:?}",
- arg_types.len(),
- arg_types
- );
+ struct SimpleWindowUDF {
+ signature: Signature,
+ }
+
+ impl SimpleWindowUDF {
+ fn new() -> Self {
+ let signature =
+ Signature::exact(vec![DataType::Float64], Volatility::Immutable);
+ Self { signature }
+ }
+ }
+
+ impl WindowUDFImpl for SimpleWindowUDF {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "dummy_udwf"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ if arg_types.len() != 1 {
+ return plan_err!(
+ "dummy_udwf expects 1 argument, got {}: {:?}",
+ arg_types.len(),
+ arg_types
+ );
+ }
+ Ok(arg_types[0].clone())
+ }
+
+ fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
+ make_partition_evaluator()
}
- Ok(Arc::new(arg_types[0].clone()))
}
fn make_partition_evaluator() -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(DummyWindow {}))
}
- let dummy_window_udf = WindowUDF::new(
- "dummy_udwf",
- &Signature::exact(vec![DataType::Float64], Volatility::Immutable),
- &(Arc::new(return_type) as _),
- &(Arc::new(make_partition_evaluator) as _),
- );
+ let dummy_window_udf = WindowUDF::from(SimpleWindowUDF::new());
let test_expr6 = Expr::WindowFunction(expr::WindowFunction::new(
WindowFunctionDefinition::WindowUDF(Arc::new(dummy_window_udf.clone())),
diff --git a/docs/source/library-user-guide/adding-udfs.md b/docs/source/library-user-guide/adding-udfs.md
index c51e4de323..1f687f978f 100644
--- a/docs/source/library-user-guide/adding-udfs.md
+++ b/docs/source/library-user-guide/adding-udfs.md
@@ -201,7 +201,8 @@ fn make_partition_evaluator() -> Result<Box<dyn PartitionEvaluator>> {
### Registering a Window UDF
-To register a Window UDF, you need to wrap the function implementation in a `WindowUDF` struct and then register it with the `SessionContext`. DataFusion provides the `create_udwf` helper functions to make this easier.
+To register a Window UDF, you need to wrap the function implementation in a [`WindowUDF`] struct and then register it with the `SessionContext`. DataFusion provides the [`create_udwf`] helper functions to make this easier.
+There is a lower level API with more functionality but is more complex, that is documented in [`advanced_udwf.rs`].
```rust
use datafusion::logical_expr::{Volatility, create_udwf};
@@ -218,6 +219,10 @@ let smooth_it = create_udwf(
);
```
+[`windowudf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.WindowUDF.html
+[`create_udwf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udwf.html
+[`advanced_udwf.rs`]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/advanced_udwf.rs
+
The `create_udwf` has five arguments to check:
- The first argument is the name of the function. This is the name that will be used in SQL queries.