You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mustafasrepo (via GitHub)" <gi...@apache.org> on 2023/06/16 05:46:10 UTC

[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6671: Allow `AggregateUDF` to define retractable batch , implement sliding window functions

mustafasrepo commented on code in PR #6671:
URL: https://github.com/apache/arrow-datafusion/pull/6671#discussion_r1231800606


##########
datafusion/core/tests/user_defined_aggregates.rs:
##########
@@ -79,15 +98,41 @@ async fn test_udaf_as_window() {
     ];
     assert_batches_eq!(expected, &execute(&ctx, sql).await);
     // aggregate over the entire window function call update_batch
-    assert!(counters.update_batch());
-    assert!(!counters.retract_batch());
+    assert!(test_state.update_batch());
+    assert!(!test_state.retract_batch());
 }
 
 /// User defined aggregate used as a window function with a window frame
 #[tokio::test]
 async fn test_udaf_as_window_with_frame() {
-    let TestContext { ctx, counters } = TestContext::new();
+    let TestContext { ctx, test_state } = TestContext::new();
+    let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    let expected = vec![
+        "+----------------------------+",
+        "| time_sum                   |",
+        "+----------------------------+",
+        "| 1970-01-01T00:00:00.000005 |",
+        "| 1970-01-01T00:00:00.000009 |",
+        "| 1970-01-01T00:00:00.000012 |",
+        "| 1970-01-01T00:00:00.000014 |",
+        "| 1970-01-01T00:00:00.000010 |",
+        "+----------------------------+",
+    ];
+    assert_batches_eq!(expected, &execute(&ctx, sql).await);
+    // user defined aggregates with window frame should be calling retract batch
+    assert!(test_state.update_batch());
+    assert!(test_state.retract_batch());
+}
+
+/// Ensure that User defined aggregate used as a window function with a window
+/// frame, but that does not implement retract_batch, does not error
+#[tokio::test]
+async fn test_udaf_as_window_with_frame_without_retract_batch() {
+    let test_state = Arc::new(TestState::new().with_error_on_retract_batch());
+
+    let TestContext { ctx, test_state } = TestContext::new_with_test_state(test_state);
     let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
+    // TODO: It is not clear why this is a different value than when retract batch is used

Review Comment:
   > I thought about this some more and have made queries with unbounded windows without retract_batch error as you suggested in [999866e](https://github.com/apache/arrow-datafusion/commit/999866e281e454ff8824829fce36ee026140e143) -- much better to get an error rather than the wrong answer.
   > 
   > It seems to me that there is no fundamental reason DataFusion couldn't support sliding windows with user defined aggregate functions without `retract_batch` -- for example it could make a new `Accumulator` instance for each row and feed in the entire widow.
   > 
   > However, given it is likely to be much more efficient to implement `retract_batch`, that is what I think we should encourage people to do. If someone really needs to use a UDAF without `retract_batch` for a sliding window, we can look into implementing the "full recompute" solution at that time
   
   Agreed. Also, users can define their `UDWF` (User defined window function). When `UDWF`'s `uses_window_frame` flag is `true`. `evaluate` method of `PartitionEvaluator` is like "full recompute" approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org