You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/06/14 21:20:15 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6671: Allow `AggregateUDF` to define retractable batch

alamb commented on code in PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671#discussion_r1230177818


##########
datafusion/core/src/physical_plan/udaf.rs:
##########
@@ -70,6 +70,11 @@ impl AggregateFunctionExpr {
     pub fn fun(&self) -> &AggregateUDF {
         &self.fun
     }
+
+    /// Returns true if this can support sliding accumulators
+    pub fn retractable(&self) -> Result<bool> {
+        Ok((self.fun.accumulator)(&self.data_type)?.supports_retract_batch())

Review Comment:
   it is non ideal that we need to instanatiate the accumulator just to check if it supports retract batch, but that is consistent with the window function API so I think that is ok



##########
datafusion/core/src/physical_plan/windows/mod.rs:
##########
@@ -90,12 +93,26 @@ pub fn create_window_expr(
             order_by,
             window_frame,
         )),
-        WindowFunction::AggregateUDF(fun) => Arc::new(PlainAggregateWindowExpr::new(
-            udaf::create_aggregate_expr(fun.as_ref(), args, input_schema, name)?,
-            partition_by,
-            order_by,
-            window_frame,
-        )),
+        WindowFunction::AggregateUDF(fun) => {

Review Comment:
   The major difference with this PR and what @stuartcarnie did is that this PR still allows AggregateUDFs to run as window functions even if they don't implement `retract_batch`



##########
datafusion/expr/src/accumulator.rs:
##########
@@ -68,6 +71,16 @@ pub trait Accumulator: Send + Sync + Debug {
         ))
     }
 
+    /// Does the accumulator support incrementally updating its value

Review Comment:
   Here is a new API addition, but it should be backwards compatible



##########
datafusion/core/tests/user_defined_aggregates.rs:
##########
@@ -79,15 +98,41 @@ async fn test_udaf_as_window() {
     ];
     assert_batches_eq!(expected, &execute(&ctx, sql).await);
     // aggregate over the entire window function call update_batch
-    assert!(counters.update_batch());
-    assert!(!counters.retract_batch());
+    assert!(test_state.update_batch());
+    assert!(!test_state.retract_batch());
 }
 
 /// User defined aggregate used as a window function with a window frame
 #[tokio::test]
 async fn test_udaf_as_window_with_frame() {
-    let TestContext { ctx, counters } = TestContext::new();
+    let TestContext { ctx, test_state } = TestContext::new();
+    let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    let expected = vec![
+        "+----------------------------+",
+        "| time_sum                   |",
+        "+----------------------------+",
+        "| 1970-01-01T00:00:00.000005 |",
+        "| 1970-01-01T00:00:00.000009 |",
+        "| 1970-01-01T00:00:00.000012 |",
+        "| 1970-01-01T00:00:00.000014 |",
+        "| 1970-01-01T00:00:00.000010 |",
+        "+----------------------------+",
+    ];
+    assert_batches_eq!(expected, &execute(&ctx, sql).await);
+    // user defined aggregates with window frame should be calling retract batch
+    assert!(test_state.update_batch());
+    assert!(test_state.retract_batch());
+}
+
+/// Ensure that User defined aggregate used as a window function with a window
+/// frame, but that does not implement retract_batch, does not error
+#[tokio::test]
+async fn test_udaf_as_window_with_frame_without_retract_batch() {
+    let test_state = Arc::new(TestState::new().with_error_on_retract_batch());
+
+    let TestContext { ctx, test_state } = TestContext::new_with_test_state(test_state);
     let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    // TODO: It is not clear why this is a different value than when retract batch is used

Review Comment:
   I am not sure why the UDWF gets different answers when implemented with retract_batch 🤔 maybe @mustafasrepo  can offer some hints or insights



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org