You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/06/20 20:41:24 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6703: Support User Defined Window Functions

alamb commented on code in PR #6703:
URL: https://github.com/apache/arrow-datafusion/pull/6703#discussion_r1235805257


##########
datafusion/core/tests/user_defined/user_defined_window_functions.rs:
##########
@@ -0,0 +1,548 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains end to end tests of creating
+//! user defined window functions
+
+use std::{
+    ops::Range,
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc,
+    },
+};
+
+use arrow::array::AsArray;
+use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
+use arrow_schema::DataType;
+use datafusion::{assert_batches_eq, prelude::SessionContext};
+use datafusion_common::{Result, ScalarValue};
+use datafusion_expr::{
+    function::PartitionEvaluatorFactory, window_state::WindowAggState,
+    PartitionEvaluator, ReturnTypeFunction, Signature, Volatility, WindowUDF,
+};
+
+/// A query with a window function evaluated over the entire partition
+const UNBOUNDED_WINDOW_QUERY: &str = "SELECT x, y, val, \
+     odd_counter(val) OVER (PARTITION BY x ORDER BY y) \
+     from t ORDER BY x, y";
+
+/// A query with a window function evaluated over a moving window
+const BOUNDED_WINDOW_QUERY:  &str  =
+    "SELECT x, y, val, \
+     odd_counter(val) OVER (PARTITION BY x ORDER BY y ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) \
+     from t ORDER BY x, y";
+
+/// Test to show the contents of the setup
+#[tokio::test]
+async fn test_setup() {
+    let test_state = TestState::new();
+    let TestContext { ctx, test_state: _ } = TestContext::new(test_state);
+
+    let sql = "SELECT * from t order by x, y";
+    let expected = vec![
+        "+---+---+-----+",
+        "| x | y | val |",
+        "+---+---+-----+",
+        "| 1 | a | 0   |",
+        "| 1 | b | 1   |",
+        "| 1 | c | 2   |",
+        "| 2 | d | 3   |",
+        "| 2 | e | 4   |",
+        "| 2 | f | 5   |",
+        "| 2 | g | 6   |",
+        "| 2 | h | 6   |",
+        "| 2 | i | 6   |",
+        "| 2 | j | 6   |",
+        "+---+---+-----+",
+    ];
+    assert_batches_eq!(expected, &execute(&ctx, sql).await.unwrap());
+}
+
+/// Basic user defined window function
+#[tokio::test]
+async fn test_udwf() {
+    let test_state = TestState::new();
+    let TestContext { ctx, test_state } = TestContext::new(test_state);
+
+    let expected = vec![
+        "+---+---+-----+--------------------+",
+        "| x | y | val | odd_counter(t.val) |",
+        "+---+---+-----+--------------------+",
+        "| 1 | a | 0   | 1                  |",
+        "| 1 | b | 1   | 1                  |",
+        "| 1 | c | 2   | 1                  |",
+        "| 2 | d | 3   | 2                  |",
+        "| 2 | e | 4   | 2                  |",
+        "| 2 | f | 5   | 2                  |",
+        "| 2 | g | 6   | 2                  |",
+        "| 2 | h | 6   | 2                  |",
+        "| 2 | i | 6   | 2                  |",
+        "| 2 | j | 6   | 2                  |",
+        "+---+---+-----+--------------------+",
+    ];
+    assert_batches_eq!(
+        expected,
+        &execute(&ctx, UNBOUNDED_WINDOW_QUERY).await.unwrap()
+    );
+    // evaluated on three distinct batches
+    assert_eq!(test_state.evaluate_all_called(), 2);
+}
+
+/// Basic user defined window function with bounded window
+#[tokio::test]
+async fn test_udwf_bounded_window_ignores_frame() {
+    let test_state = TestState::new();
+    let TestContext { ctx, test_state } = TestContext::new(test_state);
+
+    // Since the UDWF doesn't say it needs the window frame, the frame is ignored
+    let expected = vec![
+        "+---+---+-----+--------------------+",
+        "| x | y | val | odd_counter(t.val) |",
+        "+---+---+-----+--------------------+",
+        "| 1 | a | 0   | 1                  |",
+        "| 1 | b | 1   | 1                  |",
+        "| 1 | c | 2   | 1                  |",
+        "| 2 | d | 3   | 2                  |",
+        "| 2 | e | 4   | 2                  |",
+        "| 2 | f | 5   | 2                  |",
+        "| 2 | g | 6   | 2                  |",
+        "| 2 | h | 6   | 2                  |",
+        "| 2 | i | 6   | 2                  |",
+        "| 2 | j | 6   | 2                  |",
+        "+---+---+-----+--------------------+",
+    ];
+    assert_batches_eq!(
+        expected,
+        &execute(&ctx, BOUNDED_WINDOW_QUERY).await.unwrap()
+    );
+    // evaluated on 2 distinct batches (when x=1 and x=2)
+    assert_eq!(test_state.evaluate_called(), 0);
+    assert_eq!(test_state.evaluate_all_called(), 2);
+}
+
+/// Basic user defined window function with bounded window
+#[tokio::test]
+async fn test_udwf_bounded_window() {
+    let test_state = TestState::new().with_uses_window_frame();
+    let TestContext { ctx, test_state } = TestContext::new(test_state);
+
+    let expected = vec![
+        "+---+---+-----+--------------------+",
+        "| x | y | val | odd_counter(t.val) |",
+        "+---+---+-----+--------------------+",
+        "| 1 | a | 0   | 1                  |",
+        "| 1 | b | 1   | 1                  |",
+        "| 1 | c | 2   | 1                  |",
+        "| 2 | d | 3   | 1                  |",
+        "| 2 | e | 4   | 2                  |",
+        "| 2 | f | 5   | 1                  |",
+        "| 2 | g | 6   | 1                  |",
+        "| 2 | h | 6   | 0                  |",
+        "| 2 | i | 6   | 0                  |",
+        "| 2 | j | 6   | 0                  |",
+        "+---+---+-----+--------------------+",
+    ];
+    assert_batches_eq!(
+        expected,
+        &execute(&ctx, BOUNDED_WINDOW_QUERY).await.unwrap()
+    );
+    // Evaluate is called for each input rows
+    assert_eq!(test_state.evaluate_called(), 10);
+    assert_eq!(test_state.evaluate_all_called(), 0);
+}
+
+/// Basic stateful user defined window function
+#[tokio::test]
+async fn test_stateful_udwf() {

Review Comment:
   I couldn't think of anything else to test with a stateful function other than `update_state` is called. I would be interested in anyone else's opinions in this matter



##########
datafusion/core/tests/user_defined/user_defined_window_functions.rs:
##########
@@ -0,0 +1,548 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains end to end tests of creating
+//! user defined window functions
+
+use std::{
+    ops::Range,
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc,
+    },
+};
+
+use arrow::array::AsArray;
+use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
+use arrow_schema::DataType;
+use datafusion::{assert_batches_eq, prelude::SessionContext};
+use datafusion_common::{Result, ScalarValue};
+use datafusion_expr::{
+    function::PartitionEvaluatorFactory, window_state::WindowAggState,
+    PartitionEvaluator, ReturnTypeFunction, Signature, Volatility, WindowUDF,
+};
+
+/// A query with a window function evaluated over the entire partition

Review Comment:
   Here are the new tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org