You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/06/14 19:56:25 UTC

[GitHub] [arrow-datafusion] alamb opened a new pull request, #6671: Allow `AggregateUDF` to define retractable batch @alamb

alamb opened a new pull request, #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671

   # Which issue does this PR close?
   Draft as it builds on https://github.com/apache/arrow-datafusion/pull/6669
   
   Closes https://github.com/apache/arrow-datafusion/issues/6611
   
   # Rationale for this change
   
   When used as window functions, `AggregateUDF` can not use the `retract_batch` API to speed up the calculations
   
   # What changes are included in this PR?
   
   1. Add a new `supports_retract_batch` in `Accumulator` trait (to mirror the API added by @mustafasrepo  for Window Functions)
   2. If the Accumulator reports it supports retrict batch, use the sliding accumulator
   
   # Are these changes tested?
   Yes
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   3. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   New API, though they should all be backwards compatible


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6671: Allow `AggregateUDF` to define retractable batch , efficiently implement window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671#discussion_r1231316053


##########
datafusion/core/tests/user_defined_aggregates.rs:
##########
@@ -79,15 +98,41 @@ async fn test_udaf_as_window() {
     ];
     assert_batches_eq!(expected, &execute(&ctx, sql).await);
     // aggregate over the entire window function call update_batch
-    assert!(counters.update_batch());
-    assert!(!counters.retract_batch());
+    assert!(test_state.update_batch());
+    assert!(!test_state.retract_batch());
 }
 
 /// User defined aggregate used as a window function with a window frame
 #[tokio::test]
 async fn test_udaf_as_window_with_frame() {
-    let TestContext { ctx, counters } = TestContext::new();
+    let TestContext { ctx, test_state } = TestContext::new();
+    let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    let expected = vec![
+        "+----------------------------+",
+        "| time_sum                   |",
+        "+----------------------------+",
+        "| 1970-01-01T00:00:00.000005 |",
+        "| 1970-01-01T00:00:00.000009 |",
+        "| 1970-01-01T00:00:00.000012 |",
+        "| 1970-01-01T00:00:00.000014 |",
+        "| 1970-01-01T00:00:00.000010 |",
+        "+----------------------------+",
+    ];
+    assert_batches_eq!(expected, &execute(&ctx, sql).await);
+    // user defined aggregates with window frame should be calling retract batch
+    assert!(test_state.update_batch());
+    assert!(test_state.retract_batch());
+}
+
+/// Ensure that User defined aggregate used as a window function with a window
+/// frame, but that does not implement retract_batch, does not error
+#[tokio::test]
+async fn test_udaf_as_window_with_frame_without_retract_batch() {
+    let test_state = Arc::new(TestState::new().with_error_on_retract_batch());
+
+    let TestContext { ctx, test_state } = TestContext::new_with_test_state(test_state);
     let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    // TODO: It is not clear why this is a different value than when retract batch is used

Review Comment:
   I thought about this some more and have made queries with unbounded windows without retract_batch error as you suggested in 999866e28 -- much better to get an error rather than the wrong answer.
   
   It seems to me that there is no fundamental reason DataFusion couldn't support sliding windows with user defined aggregate functions without `retract_batch` -- for example it could make a new `Accumulator` instance for each row and feed in the entire widow. 
   
   However, given it is likely to be much more efficient to implement `retract_batch`, that is what I think we should encourage people to do. If someone really needs to use a UDAF without `retract_batch` for a sliding window, we can look into implementing the "full recompute" solution at that time



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6671: Allow `AggregateUDF` to define retractable batch , efficiently implement window functions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671#discussion_r1230531322


##########
datafusion/core/tests/user_defined_aggregates.rs:
##########
@@ -79,15 +98,41 @@ async fn test_udaf_as_window() {
     ];
     assert_batches_eq!(expected, &execute(&ctx, sql).await);
     // aggregate over the entire window function call update_batch
-    assert!(counters.update_batch());
-    assert!(!counters.retract_batch());
+    assert!(test_state.update_batch());
+    assert!(!test_state.retract_batch());
 }
 
 /// User defined aggregate used as a window function with a window frame
 #[tokio::test]
 async fn test_udaf_as_window_with_frame() {
-    let TestContext { ctx, counters } = TestContext::new();
+    let TestContext { ctx, test_state } = TestContext::new();
+    let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    let expected = vec![
+        "+----------------------------+",
+        "| time_sum                   |",
+        "+----------------------------+",
+        "| 1970-01-01T00:00:00.000005 |",
+        "| 1970-01-01T00:00:00.000009 |",
+        "| 1970-01-01T00:00:00.000012 |",
+        "| 1970-01-01T00:00:00.000014 |",
+        "| 1970-01-01T00:00:00.000010 |",
+        "+----------------------------+",
+    ];
+    assert_batches_eq!(expected, &execute(&ctx, sql).await);
+    // user defined aggregates with window frame should be calling retract batch
+    assert!(test_state.update_batch());
+    assert!(test_state.retract_batch());
+}
+
+/// Ensure that User defined aggregate used as a window function with a window
+/// frame, but that does not implement retract_batch, does not error
+#[tokio::test]
+async fn test_udaf_as_window_with_frame_without_retract_batch() {
+    let test_state = Arc::new(TestState::new().with_error_on_retract_batch());
+
+    let TestContext { ctx, test_state } = TestContext::new_with_test_state(test_state);
     let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    // TODO: It is not clear why this is a different value than when retract batch is used

Review Comment:
   Accumulators when they have window frame startings different than `UNBOUNDED PRECEDING` such as `1 preceding` need to implement `retract_batch` method, to be able to run correctly (If this method is lacking there is no way to calculate result). Consider the query 
   ```sql
   SELECT SUM(a) OVER(ORDER BY a ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as sum_a from t
   ```
   
   First sum value will be the sum of rows between [0, 1), 
   Second sum value will be the sum of  rows between [0, 2), 
   Third sum value will be the sum of  rows between [1, 3), etc.
   Since accumulator keeps in its state, running sum
   for first sum we add to the state sum value between [0, 1)
   for second sum we add to the state sum value between [1, 2) ([0, 1) is already in the state sum, hence running sum will cover [0, 2) range)
   for third sum we add to the state sum value between [2, 3) ([0, 2) is already in the state sum). Also we need to retract values between [0, 1) by this way we can obtain sum between [1, 3) which is indeed the apropriate range. 
   
   When we use `UNBOUNDED PRECEDING ` in the query starting index will always be 0 for the desired range. Hence we will never call `retract_batch` method. In this case having `retract_batch` is not a requirement. 
   
   This approach is a a bit different than window function approach. In window function(when they use frame) they get all the desired range during evaluation. If we were to have a window_function sum we could have calculated sum from scratch each time inside given range. However, for accumulators this is the case, and above query should give error if `retract_batch` is not implemented. During review I have pinpointed the section where I think this error is prevented
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6671: Allow `AggregateUDF` to define retractable batch , efficiently implement window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671#discussion_r1230803124


##########
datafusion/core/src/physical_plan/udaf.rs:
##########
@@ -70,6 +70,11 @@ impl AggregateFunctionExpr {
     pub fn fun(&self) -> &AggregateUDF {
         &self.fun
     }
+
+    /// Returns true if this can support sliding accumulators
+    pub fn retractable(&self) -> Result<bool> {
+        Ok((self.fun.accumulator)(&self.data_type)?.supports_retract_batch())

Review Comment:
   Another other formulation I tried looked like the following. The upside is it avoided the instantiation, but the downside is that it an API change and is not consistent with the window functions
   
   ```rust
   pub struct AggregateUDF {
       /// name
       pub name: String,
       /// Signature (input arguments)
       pub signature: Signature,
       /// Return type
       pub return_type: ReturnTypeFunction,
       /// Return an accumulator without retract batch
       pub accumulator: AccumulatorFunctionImplementation,
       /// Return an accumulator with retract batch
       pub retractable_accumulator: Option<AccumulatorFunctionImplementation>,  <---- this is added
       /// the accumulator's state's description as a function of the return type
       pub state_type: StateTypeFunction,
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #6671: Allow `AggregateUDF` to define retractable batch , efficiently implement window functions

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671#discussion_r1230199201


##########
datafusion/core/src/physical_plan/udaf.rs:
##########
@@ -70,6 +70,11 @@ impl AggregateFunctionExpr {
     pub fn fun(&self) -> &AggregateUDF {
         &self.fun
     }
+
+    /// Returns true if this can support sliding accumulators
+    pub fn retractable(&self) -> Result<bool> {
+        Ok((self.fun.accumulator)(&self.data_type)?.supports_retract_batch())

Review Comment:
   Maybe we can fix this and its window function counterpart in a refactor. We discussed this instantiation thing in a meeting recently and we are not super happy with it either. Let's get the basics in and then we can work on the rough edges.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on pull request #6671: Allow `AggregateUDF` to define retractable batch , efficiently implement window functions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671#issuecomment-1592499885

   Also I think with the introduction of `supports_retract_batch` method, `supports_bounded_execution` method is no longer necessary for the `AggregateExpr` trait. I have filed small PR on top this PR to show required changes to remove  `supports_bounded_execution` from `AggregateExpr` trait. 
   
   More detailed explanation and PR can be found in [#PR13](https://github.com/alamb/arrow-datafusion/pull/13)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6671: Allow `AggregateUDF` to define retractable batch , implement sliding window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671#discussion_r1231503460


##########
datafusion/core/src/physical_plan/windows/mod.rs:
##########
@@ -90,12 +93,26 @@ pub fn create_window_expr(
             order_by,
             window_frame,
         )),
-        WindowFunction::AggregateUDF(fun) => Arc::new(PlainAggregateWindowExpr::new(
-            udaf::create_aggregate_expr(fun.as_ref(), args, input_schema, name)?,
-            partition_by,
-            order_by,
-            window_frame,
-        )),
+        WindowFunction::AggregateUDF(fun) => {
+            let aggregate =
+                udaf::create_aggregate_expr(fun.as_ref(), args, input_schema, name)?;
+
+            if !unbounded_window && aggregate.retractable()? {

Review Comment:
   in https://github.com/apache/arrow-datafusion/pull/6671/commits/999866e281e454ff8824829fce36ee026140e143



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #6671: Allow `AggregateUDF` to define retractable batch , implement sliding window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6671: Allow `AggregateUDF` to define retractable batch , implement sliding window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671#discussion_r1231504241


##########
datafusion/core/src/physical_plan/windows/mod.rs:
##########
@@ -90,12 +93,26 @@ pub fn create_window_expr(
             order_by,
             window_frame,
         )),
-        WindowFunction::AggregateUDF(fun) => Arc::new(PlainAggregateWindowExpr::new(
-            udaf::create_aggregate_expr(fun.as_ref(), args, input_schema, name)?,
-            partition_by,
-            order_by,
-            window_frame,
-        )),
+        WindowFunction::AggregateUDF(fun) => {

Review Comment:
   Update: it turns out it is not correct to run AggregateUDFs as sliding window function if they don't implement `retract_batch` which I have made clear in comments and the tests 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6671: Allow `AggregateUDF` to define retractable batch

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671#discussion_r1230177818


##########
datafusion/core/src/physical_plan/udaf.rs:
##########
@@ -70,6 +70,11 @@ impl AggregateFunctionExpr {
     pub fn fun(&self) -> &AggregateUDF {
         &self.fun
     }
+
+    /// Returns true if this can support sliding accumulators
+    pub fn retractable(&self) -> Result<bool> {
+        Ok((self.fun.accumulator)(&self.data_type)?.supports_retract_batch())

Review Comment:
   it is non ideal that we need to instanatiate the accumulator just to check if it supports retract batch, but that is consistent with the window function API so I think that is ok



##########
datafusion/core/src/physical_plan/windows/mod.rs:
##########
@@ -90,12 +93,26 @@ pub fn create_window_expr(
             order_by,
             window_frame,
         )),
-        WindowFunction::AggregateUDF(fun) => Arc::new(PlainAggregateWindowExpr::new(
-            udaf::create_aggregate_expr(fun.as_ref(), args, input_schema, name)?,
-            partition_by,
-            order_by,
-            window_frame,
-        )),
+        WindowFunction::AggregateUDF(fun) => {

Review Comment:
   The major difference with this PR and what @stuartcarnie did is that this PR still allows AggregateUDFs to run as window functions even if they don't implement `retract_batch`



##########
datafusion/expr/src/accumulator.rs:
##########
@@ -68,6 +71,16 @@ pub trait Accumulator: Send + Sync + Debug {
         ))
     }
 
+    /// Does the accumulator support incrementally updating its value

Review Comment:
   Here is a new API addition, but it should be backwards compatible



##########
datafusion/core/tests/user_defined_aggregates.rs:
##########
@@ -79,15 +98,41 @@ async fn test_udaf_as_window() {
     ];
     assert_batches_eq!(expected, &execute(&ctx, sql).await);
     // aggregate over the entire window function call update_batch
-    assert!(counters.update_batch());
-    assert!(!counters.retract_batch());
+    assert!(test_state.update_batch());
+    assert!(!test_state.retract_batch());
 }
 
 /// User defined aggregate used as a window function with a window frame
 #[tokio::test]
 async fn test_udaf_as_window_with_frame() {
-    let TestContext { ctx, counters } = TestContext::new();
+    let TestContext { ctx, test_state } = TestContext::new();
+    let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    let expected = vec![
+        "+----------------------------+",
+        "| time_sum                   |",
+        "+----------------------------+",
+        "| 1970-01-01T00:00:00.000005 |",
+        "| 1970-01-01T00:00:00.000009 |",
+        "| 1970-01-01T00:00:00.000012 |",
+        "| 1970-01-01T00:00:00.000014 |",
+        "| 1970-01-01T00:00:00.000010 |",
+        "+----------------------------+",
+    ];
+    assert_batches_eq!(expected, &execute(&ctx, sql).await);
+    // user defined aggregates with window frame should be calling retract batch
+    assert!(test_state.update_batch());
+    assert!(test_state.retract_batch());
+}
+
+/// Ensure that User defined aggregate used as a window function with a window
+/// frame, but that does not implement retract_batch, does not error
+#[tokio::test]
+async fn test_udaf_as_window_with_frame_without_retract_batch() {
+    let test_state = Arc::new(TestState::new().with_error_on_retract_batch());
+
+    let TestContext { ctx, test_state } = TestContext::new_with_test_state(test_state);
     let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    // TODO: It is not clear why this is a different value than when retract batch is used

Review Comment:
   I am not sure why the UDWF gets different answers when implemented with retract_batch 🤔 maybe @mustafasrepo  can offer some hints or insights



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6671: Allow `AggregateUDF` to define retractable batch , efficiently implement window functions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671#discussion_r1230458156


##########
datafusion/core/tests/user_defined_aggregates.rs:
##########
@@ -79,15 +98,41 @@ async fn test_udaf_as_window() {
     ];
     assert_batches_eq!(expected, &execute(&ctx, sql).await);
     // aggregate over the entire window function call update_batch
-    assert!(counters.update_batch());
-    assert!(!counters.retract_batch());
+    assert!(test_state.update_batch());
+    assert!(!test_state.retract_batch());
 }
 
 /// User defined aggregate used as a window function with a window frame
 #[tokio::test]
 async fn test_udaf_as_window_with_frame() {
-    let TestContext { ctx, counters } = TestContext::new();
+    let TestContext { ctx, test_state } = TestContext::new();
+    let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    let expected = vec![
+        "+----------------------------+",
+        "| time_sum                   |",
+        "+----------------------------+",
+        "| 1970-01-01T00:00:00.000005 |",
+        "| 1970-01-01T00:00:00.000009 |",
+        "| 1970-01-01T00:00:00.000012 |",
+        "| 1970-01-01T00:00:00.000014 |",
+        "| 1970-01-01T00:00:00.000010 |",
+        "+----------------------------+",
+    ];
+    assert_batches_eq!(expected, &execute(&ctx, sql).await);
+    // user defined aggregates with window frame should be calling retract batch
+    assert!(test_state.update_batch());
+    assert!(test_state.retract_batch());
+}
+
+/// Ensure that User defined aggregate used as a window function with a window
+/// frame, but that does not implement retract_batch, does not error
+#[tokio::test]
+async fn test_udaf_as_window_with_frame_without_retract_batch() {
+    let test_state = Arc::new(TestState::new().with_error_on_retract_batch());
+
+    let TestContext { ctx, test_state } = TestContext::new_with_test_state(test_state);
     let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    // TODO: It is not clear why this is a different value than when retract batch is used

Review Comment:
   I think query above shouldn't work if its `retract_batch` method is not implemented (Since its window frame boundary is `1 PRECEDING AND 1 FOLLOWING` If the start boundary is different than `UNBOUNDED PRECEDING` `retract_batch` method is needed to run accumulator.).
   
   If the query were `SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as time_sum from t`, 
   `retract_batch` wouldn't have any significance, I would expect that the result would same even if we implemented it or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6671: Allow `AggregateUDF` to define retractable batch , implement sliding window functions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671#discussion_r1231800606


##########
datafusion/core/tests/user_defined_aggregates.rs:
##########
@@ -79,15 +98,41 @@ async fn test_udaf_as_window() {
     ];
     assert_batches_eq!(expected, &execute(&ctx, sql).await);
     // aggregate over the entire window function call update_batch
-    assert!(counters.update_batch());
-    assert!(!counters.retract_batch());
+    assert!(test_state.update_batch());
+    assert!(!test_state.retract_batch());
 }
 
 /// User defined aggregate used as a window function with a window frame
 #[tokio::test]
 async fn test_udaf_as_window_with_frame() {
-    let TestContext { ctx, counters } = TestContext::new();
+    let TestContext { ctx, test_state } = TestContext::new();
+    let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    let expected = vec![
+        "+----------------------------+",
+        "| time_sum                   |",
+        "+----------------------------+",
+        "| 1970-01-01T00:00:00.000005 |",
+        "| 1970-01-01T00:00:00.000009 |",
+        "| 1970-01-01T00:00:00.000012 |",
+        "| 1970-01-01T00:00:00.000014 |",
+        "| 1970-01-01T00:00:00.000010 |",
+        "+----------------------------+",
+    ];
+    assert_batches_eq!(expected, &execute(&ctx, sql).await);
+    // user defined aggregates with window frame should be calling retract batch
+    assert!(test_state.update_batch());
+    assert!(test_state.retract_batch());
+}
+
+/// Ensure that User defined aggregate used as a window function with a window
+/// frame, but that does not implement retract_batch, does not error
+#[tokio::test]
+async fn test_udaf_as_window_with_frame_without_retract_batch() {
+    let test_state = Arc::new(TestState::new().with_error_on_retract_batch());
+
+    let TestContext { ctx, test_state } = TestContext::new_with_test_state(test_state);
     let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    // TODO: It is not clear why this is a different value than when retract batch is used

Review Comment:
   > I thought about this some more and have made queries with unbounded windows without retract_batch error as you suggested in [999866e](https://github.com/apache/arrow-datafusion/commit/999866e281e454ff8824829fce36ee026140e143) -- much better to get an error rather than the wrong answer.
   > 
   > It seems to me that there is no fundamental reason DataFusion couldn't support sliding windows with user defined aggregate functions without `retract_batch` -- for example it could make a new `Accumulator` instance for each row and feed in the entire widow.
   > 
   > However, given it is likely to be much more efficient to implement `retract_batch`, that is what I think we should encourage people to do. If someone really needs to use a UDAF without `retract_batch` for a sliding window, we can look into implementing the "full recompute" solution at that time
   
   Agreed. Also, users can define their `UDWF` (User defined window function). When `UDWF`'s `uses_window_frame` flag is `true`. `evaluate` method of `PartitionEvaluator` is like "full recompute" approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6671: Allow `AggregateUDF` to define retractable batch , efficiently implement window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671#discussion_r1230806847


##########
datafusion/core/src/physical_plan/windows/mod.rs:
##########
@@ -90,12 +93,26 @@ pub fn create_window_expr(
             order_by,
             window_frame,
         )),
-        WindowFunction::AggregateUDF(fun) => Arc::new(PlainAggregateWindowExpr::new(
-            udaf::create_aggregate_expr(fun.as_ref(), args, input_schema, name)?,
-            partition_by,
-            order_by,
-            window_frame,
-        )),
+        WindowFunction::AggregateUDF(fun) => {
+            let aggregate =
+                udaf::create_aggregate_expr(fun.as_ref(), args, input_schema, name)?;
+
+            if !unbounded_window && aggregate.retractable()? {

Review Comment:
   I will make this change -- thank you @mustafasrepo  -- I didn't appreciate that there is no correct way to run the AggregateUDF correctly without retract_batch. Thus I think the correct solution is to return an error when such a combination is attempted. I will update the PR to reflect this



##########
datafusion/core/tests/user_defined_aggregates.rs:
##########
@@ -79,15 +98,41 @@ async fn test_udaf_as_window() {
     ];
     assert_batches_eq!(expected, &execute(&ctx, sql).await);
     // aggregate over the entire window function call update_batch
-    assert!(counters.update_batch());
-    assert!(!counters.retract_batch());
+    assert!(test_state.update_batch());
+    assert!(!test_state.retract_batch());
 }
 
 /// User defined aggregate used as a window function with a window frame
 #[tokio::test]
 async fn test_udaf_as_window_with_frame() {
-    let TestContext { ctx, counters } = TestContext::new();
+    let TestContext { ctx, test_state } = TestContext::new();
+    let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    let expected = vec![
+        "+----------------------------+",
+        "| time_sum                   |",
+        "+----------------------------+",
+        "| 1970-01-01T00:00:00.000005 |",
+        "| 1970-01-01T00:00:00.000009 |",
+        "| 1970-01-01T00:00:00.000012 |",
+        "| 1970-01-01T00:00:00.000014 |",
+        "| 1970-01-01T00:00:00.000010 |",
+        "+----------------------------+",
+    ];
+    assert_batches_eq!(expected, &execute(&ctx, sql).await);
+    // user defined aggregates with window frame should be calling retract batch
+    assert!(test_state.update_batch());
+    assert!(test_state.retract_batch());
+}
+
+/// Ensure that User defined aggregate used as a window function with a window
+/// frame, but that does not implement retract_batch, does not error
+#[tokio::test]
+async fn test_udaf_as_window_with_frame_without_retract_batch() {
+    let test_state = Arc::new(TestState::new().with_error_on_retract_batch());
+
+    let TestContext { ctx, test_state } = TestContext::new_with_test_state(test_state);
     let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    // TODO: It is not clear why this is a different value than when retract batch is used

Review Comment:
   Thank you -- I am glad I asked!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6671: Allow `AggregateUDF` to define retractable batch , efficiently implement window functions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671#discussion_r1230458156


##########
datafusion/core/tests/user_defined_aggregates.rs:
##########
@@ -79,15 +98,41 @@ async fn test_udaf_as_window() {
     ];
     assert_batches_eq!(expected, &execute(&ctx, sql).await);
     // aggregate over the entire window function call update_batch
-    assert!(counters.update_batch());
-    assert!(!counters.retract_batch());
+    assert!(test_state.update_batch());
+    assert!(!test_state.retract_batch());
 }
 
 /// User defined aggregate used as a window function with a window frame
 #[tokio::test]
 async fn test_udaf_as_window_with_frame() {
-    let TestContext { ctx, counters } = TestContext::new();
+    let TestContext { ctx, test_state } = TestContext::new();
+    let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    let expected = vec![
+        "+----------------------------+",
+        "| time_sum                   |",
+        "+----------------------------+",
+        "| 1970-01-01T00:00:00.000005 |",
+        "| 1970-01-01T00:00:00.000009 |",
+        "| 1970-01-01T00:00:00.000012 |",
+        "| 1970-01-01T00:00:00.000014 |",
+        "| 1970-01-01T00:00:00.000010 |",
+        "+----------------------------+",
+    ];
+    assert_batches_eq!(expected, &execute(&ctx, sql).await);
+    // user defined aggregates with window frame should be calling retract batch
+    assert!(test_state.update_batch());
+    assert!(test_state.retract_batch());
+}
+
+/// Ensure that User defined aggregate used as a window function with a window
+/// frame, but that does not implement retract_batch, does not error
+#[tokio::test]
+async fn test_udaf_as_window_with_frame_without_retract_batch() {
+    let test_state = Arc::new(TestState::new().with_error_on_retract_batch());
+
+    let TestContext { ctx, test_state } = TestContext::new_with_test_state(test_state);
     let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    // TODO: It is not clear why this is a different value than when retract batch is used

Review Comment:
   I think query above shouldn't work if its `retract_batch` method is not implemented (Since its window frame boundary is `1 PRECEDING AND 1 FOLLOWING` If the start boundary is different than `UNBOUNDED PRECEDING` `retract_batch` method is needed to run accumulator.).
   
   If the query were `SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as time_sum from t`, 
   `retract_batch` wouldn't have any significance, I would expect that the result would same even if we implemented it or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6671: Allow `AggregateUDF` to define retractable batch , efficiently implement window functions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671#discussion_r1230509976


##########
datafusion/core/src/physical_plan/windows/mod.rs:
##########
@@ -90,12 +93,26 @@ pub fn create_window_expr(
             order_by,
             window_frame,
         )),
-        WindowFunction::AggregateUDF(fun) => Arc::new(PlainAggregateWindowExpr::new(
-            udaf::create_aggregate_expr(fun.as_ref(), args, input_schema, name)?,
-            partition_by,
-            order_by,
-            window_frame,
-        )),
+        WindowFunction::AggregateUDF(fun) => {
+            let aggregate =
+                udaf::create_aggregate_expr(fun.as_ref(), args, input_schema, name)?;
+
+            if !unbounded_window && aggregate.retractable()? {

Review Comment:
   I think this line should be
   ```suggestion
               if !unbounded_window  {
   ```
   If aggregate doesn't implement retract we should get an error. There is no way to run aggregator whose start is different than (`UNBOUNDED PRECEDING`) without `retract_batch` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6671: Allow `AggregateUDF` to define retractable batch , efficiently implement window functions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671#discussion_r1230531322


##########
datafusion/core/tests/user_defined_aggregates.rs:
##########
@@ -79,15 +98,41 @@ async fn test_udaf_as_window() {
     ];
     assert_batches_eq!(expected, &execute(&ctx, sql).await);
     // aggregate over the entire window function call update_batch
-    assert!(counters.update_batch());
-    assert!(!counters.retract_batch());
+    assert!(test_state.update_batch());
+    assert!(!test_state.retract_batch());
 }
 
 /// User defined aggregate used as a window function with a window frame
 #[tokio::test]
 async fn test_udaf_as_window_with_frame() {
-    let TestContext { ctx, counters } = TestContext::new();
+    let TestContext { ctx, test_state } = TestContext::new();
+    let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    let expected = vec![
+        "+----------------------------+",
+        "| time_sum                   |",
+        "+----------------------------+",
+        "| 1970-01-01T00:00:00.000005 |",
+        "| 1970-01-01T00:00:00.000009 |",
+        "| 1970-01-01T00:00:00.000012 |",
+        "| 1970-01-01T00:00:00.000014 |",
+        "| 1970-01-01T00:00:00.000010 |",
+        "+----------------------------+",
+    ];
+    assert_batches_eq!(expected, &execute(&ctx, sql).await);
+    // user defined aggregates with window frame should be calling retract batch
+    assert!(test_state.update_batch());
+    assert!(test_state.retract_batch());
+}
+
+/// Ensure that User defined aggregate used as a window function with a window
+/// frame, but that does not implement retract_batch, does not error
+#[tokio::test]
+async fn test_udaf_as_window_with_frame_without_retract_batch() {
+    let test_state = Arc::new(TestState::new().with_error_on_retract_batch());
+
+    let TestContext { ctx, test_state } = TestContext::new_with_test_state(test_state);
     let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    // TODO: It is not clear why this is a different value than when retract batch is used

Review Comment:
   Accumulators when they have window frame startings different than `UNBOUNDED PRECEDING` such as `1 preceding`, need to implement `retract_batch` method, to be able to run correctly (If this method is lacking there is no way to calculate result). Consider the query 
   ```sql
   SELECT SUM(a) OVER(ORDER BY a ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as sum_a from t
   ```
   
   First sum value will be the sum of rows between [0, 1), 
   Second sum value will be the sum of  rows between [0, 2), 
   Third sum value will be the sum of  rows between [1, 3), etc.
   Since accumulator keeps in its state, running sum
   for first sum we add to the state sum value between [0, 1)
   for second sum we add to the state sum value between [1, 2) ([0, 1) is already in the state sum, hence running sum will cover [0, 2) range)
   for third sum we add to the state sum value between [2, 3) ([0, 2) is already in the state sum). Also we need to retract values between [0, 1) by this way we can obtain sum between [1, 3) which is indeed the apropriate range. 
   
   When we use `UNBOUNDED PRECEDING ` in the query starting index will always be 0 for the desired range. Hence we will never call `retract_batch` method. In this case having `retract_batch` is not a requirement. 
   
   This approach is a a bit different than window function approach. In window function(when they use frame) they get all the desired range during evaluation. If we were to have a window_function sum we could have calculated sum from scratch each time inside given range. However, for accumulators this is the case, and above query should give error if `retract_batch` is not implemented. During review I have pinpointed the section where I think this error is prevented
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org