You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "jdye64 (via GitHub)" <gi...@apache.org> on 2023/10/17 17:24:56 UTC

[PR] Add support for window function bindings [arrow-datafusion-python]

jdye64 opened a new pull request, #521:
URL: https://github.com/apache/arrow-datafusion-python/pull/521

   # Which issue does this PR close?
   Closes #520


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add support for window function bindings [arrow-datafusion-python]

Posted by "andygrove (via GitHub)" <gi...@apache.org>.
andygrove commented on PR #521:
URL: https://github.com/apache/arrow-datafusion-python/pull/521#issuecomment-1767177024

   @jdye64 Is there any overlap between this PR and the recently merged PR that added Window Frame bindings?
   
   https://github.com/apache/arrow-datafusion-python/pull/509


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add support for window function bindings [arrow-datafusion-python]

Posted by "andygrove (via GitHub)" <gi...@apache.org>.
andygrove commented on PR #521:
URL: https://github.com/apache/arrow-datafusion-python/pull/521#issuecomment-1773837559

   @dlovell fyi


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add support for window function bindings [arrow-datafusion-python]

Posted by "andygrove (via GitHub)" <gi...@apache.org>.
andygrove commented on code in PR #521:
URL: https://github.com/apache/arrow-datafusion-python/pull/521#discussion_r1367749996


##########
src/expr/window.rs:
##########
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::{DataFusionError, ScalarValue};
+use datafusion_expr::expr::WindowFunction;
+use datafusion_expr::{Expr, Window, WindowFrame, WindowFrameBound, WindowFrameUnits};
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::errors::py_type_err;
+use crate::expr::logical_node::LogicalNode;
+use crate::expr::PyExpr;
+use crate::sql::logical::PyLogicalPlan;
+
+use super::py_expr_list;
+
+use crate::errors::py_datafusion_err;
+
+#[pyclass(name = "Window", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyWindow {
+    window: Window,
+}
+
+#[pyclass(name = "WindowFrame", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyWindowFrame {
+    window_frame: WindowFrame,
+}
+
+impl From<PyWindowFrame> for WindowFrame {
+    fn from(window_frame: PyWindowFrame) -> Self {
+        window_frame.window_frame
+    }
+}
+
+impl From<WindowFrame> for PyWindowFrame {
+    fn from(window_frame: WindowFrame) -> PyWindowFrame {
+        PyWindowFrame { window_frame }
+    }
+}
+
+#[pyclass(name = "WindowFrameBound", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyWindowFrameBound {
+    frame_bound: WindowFrameBound,
+}
+
+impl From<PyWindow> for Window {
+    fn from(window: PyWindow) -> Window {
+        window.window
+    }
+}
+
+impl From<Window> for PyWindow {
+    fn from(window: Window) -> PyWindow {
+        PyWindow { window }
+    }
+}
+
+impl From<WindowFrameBound> for PyWindowFrameBound {
+    fn from(frame_bound: WindowFrameBound) -> Self {
+        PyWindowFrameBound { frame_bound }
+    }
+}
+
+impl Display for PyWindow {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        write!(
+            f,
+            "Over\n
+            Window Expr: {:?}
+            Schema: {:?}",
+            &self.window.window_expr, &self.window.schema
+        )
+    }
+}
+
+impl Display for PyWindowFrame {
+    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+        write!(
+            f,
+            "OVER ({} BETWEEN {} AND {})",
+            self.window_frame.units, self.window_frame.start_bound, self.window_frame.end_bound
+        )
+    }
+}
+
+#[pymethods]
+impl PyWindow {
+    /// Returns the schema of the Window
+    pub fn schema(&self) -> PyResult<PyDFSchema> {
+        Ok(self.window.schema.as_ref().clone().into())
+    }
+
+    /// Returns window expressions
+    pub fn get_window_expr(&self) -> PyResult<Vec<PyExpr>> {
+        py_expr_list(&self.window.window_expr)
+    }
+
+    /// Returns order by columns in a window function expression
+    pub fn get_sort_exprs(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
+        match expr.expr.unalias() {
+            Expr::WindowFunction(WindowFunction { order_by, .. }) => py_expr_list(&order_by),
+            other => Err(not_window_function_err(other)),
+        }
+    }
+
+    /// Return partition by columns in a window function expression
+    pub fn get_partition_exprs(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
+        match expr.expr.unalias() {
+            Expr::WindowFunction(WindowFunction { partition_by, .. }) => {
+                py_expr_list(&partition_by)
+            }
+            other => Err(not_window_function_err(other)),
+        }
+    }
+
+    /// Return input args for window function
+    pub fn get_args(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
+        match expr.expr.unalias() {
+            Expr::WindowFunction(WindowFunction { args, .. }) => py_expr_list(&args),
+            other => Err(not_window_function_err(other)),
+        }
+    }
+
+    /// Return window function name
+    pub fn window_func_name(&self, expr: PyExpr) -> PyResult<String> {
+        match expr.expr.unalias() {
+            Expr::WindowFunction(WindowFunction { fun, .. }) => Ok(fun.to_string()),
+            other => Err(not_window_function_err(other)),
+        }
+    }
+
+    /// Returns a Pywindow frame for a given window function expression
+    pub fn get_frame(&self, expr: PyExpr) -> Option<PyWindowFrame> {
+        match expr.expr.unalias() {
+            Expr::WindowFunction(WindowFunction { window_frame, .. }) => Some(window_frame.into()),
+            _ => None,
+        }
+    }
+}
+
+fn not_window_function_err(expr: Expr) -> PyErr {
+    py_type_err(format!(
+        "Provided {} Expr {:?} is not a WindowFunction type",
+        expr.variant_name(),
+        expr
+    ))
+}
+
+#[pymethods]
+impl PyWindowFrame {
+    #[new(unit, start_bound, end_bound)]
+    pub fn new(units: &str, start_bound: Option<u64>, end_bound: Option<u64>) -> PyResult<Self> {
+        let units = units.to_ascii_lowercase();
+        let units = match units.as_str() {
+            "rows" => WindowFrameUnits::Rows,
+            "range" => WindowFrameUnits::Range,
+            "groups" => WindowFrameUnits::Groups,
+            _ => {
+                return Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
+                    "{:?}",
+                    units,
+                ))));
+            }
+        };
+        let start_bound = match start_bound {
+            Some(start_bound) => {
+                WindowFrameBound::Preceding(ScalarValue::UInt64(Some(start_bound)))
+            }
+            None => match units {
+                WindowFrameUnits::Range => WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
+                WindowFrameUnits::Rows => WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
+                WindowFrameUnits::Groups => {
+                    return Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
+                        "{:?}",
+                        units,
+                    ))));
+                }
+            },
+        };
+        let end_bound = match end_bound {
+            Some(end_bound) => WindowFrameBound::Following(ScalarValue::UInt64(Some(end_bound))),
+            None => match units {
+                WindowFrameUnits::Rows => WindowFrameBound::Following(ScalarValue::UInt64(None)),
+                WindowFrameUnits::Range => WindowFrameBound::Following(ScalarValue::UInt64(None)),
+                WindowFrameUnits::Groups => {
+                    return Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
+                        "{:?}",
+                        units,
+                    ))));
+                }
+            },
+        };
+        Ok(PyWindowFrame {
+            window_frame: WindowFrame {
+                units,
+                start_bound,
+                end_bound,
+            },
+        })
+    }
+
+    /// Returns the window frame units for the bounds
+    pub fn get_frame_units(&self) -> PyResult<String> {
+        Ok(self.window_frame.units.to_string())
+    }
+    /// Returns starting bound
+    pub fn get_lower_bound(&self) -> PyResult<PyWindowFrameBound> {
+        Ok(self.window_frame.start_bound.clone().into())
+    }
+    /// Returns end bound
+    pub fn get_upper_bound(&self) -> PyResult<PyWindowFrameBound> {
+        Ok(self.window_frame.end_bound.clone().into())
+    }
+
+    /// Get a String representation of this window frame
+    fn __repr__(&self) -> String {
+        format!("{}", self)
+    }
+}
+
+#[pymethods]
+impl PyWindowFrameBound {
+    /// Returns if the frame bound is current row
+    pub fn is_current_row(&self) -> bool {
+        matches!(self.frame_bound, WindowFrameBound::CurrentRow)
+    }
+
+    /// Returns if the frame bound is preceding
+    pub fn is_preceding(&self) -> bool {
+        matches!(self.frame_bound, WindowFrameBound::Preceding(_))
+    }
+
+    /// Returns if the frame bound is following
+    pub fn is_following(&self) -> bool {
+        matches!(self.frame_bound, WindowFrameBound::Following(_))
+    }
+    /// Returns the offset of the window frame
+    pub fn get_offset(&self) -> PyResult<Option<u64>> {
+        match &self.frame_bound {
+            WindowFrameBound::Preceding(val) | WindowFrameBound::Following(val) => match val {
+                x if x.is_null() => Ok(None),
+                ScalarValue::UInt64(v) => Ok(*v),
+                // The cast below is only safe because window bounds cannot be negative
+                ScalarValue::Int64(v) => Ok(v.map(|n| n as u64)),
+                ScalarValue::Utf8(v) => {
+                    let s = v.clone().unwrap();
+                    match s.parse::<u64>() {
+                        Ok(s) => Ok(Some(s)),
+                        Err(_e) => Err(DataFusionError::Plan(format!(
+                            "Unable to parse u64 from Utf8 value '{s}'"
+                        ))
+                        .into()),
+                    }
+                }

Review Comment:
   nit: we can avoid the `unwrap` here:
   
   ```suggestion
                   ScalarValue::Utf8(Some(s)) => match s.parse::<u64>() {
                       Ok(s) => Ok(Some(s)),
                       Err(_e) => Err(DataFusionError::Plan(format!(
                           "Unable to parse u64 from Utf8 value '{s}'"
                       ))
                       .into()),
                   },
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add support for window function bindings [arrow-datafusion-python]

Posted by "jdye64 (via GitHub)" <gi...@apache.org>.
jdye64 commented on PR #521:
URL: https://github.com/apache/arrow-datafusion-python/pull/521#issuecomment-1768608029

   I think I'm going to move `window_frame.rs` into the `expr` directory since that lines up with arrow-datafusion location and also rename it to just `window.rs` and include both window and window_frame logic in the single file. Will post the commit here shortly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add support for window function bindings [arrow-datafusion-python]

Posted by "andygrove (via GitHub)" <gi...@apache.org>.
andygrove merged PR #521:
URL: https://github.com/apache/arrow-datafusion-python/pull/521


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add support for window function bindings [arrow-datafusion-python]

Posted by "jdye64 (via GitHub)" <gi...@apache.org>.
jdye64 commented on PR #521:
URL: https://github.com/apache/arrow-datafusion-python/pull/521#issuecomment-1768601740

   I had missed that one. Yes, seems to be a lot of overlap. Let me rebase with main and remove the duplicate pieces.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org