You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/06 13:52:59 UTC

[arrow-datafusion] 03/06: include window frames and operator into datafusion-expr

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-accum
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit b7eb091044a8219f965a8b5742dab43a05b25a67
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 15:31:10 2022 +0800

    include window frames and operator into datafusion-expr
---
 datafusion-expr/Cargo.toml                         |   1 +
 datafusion-expr/src/lib.rs                         |   4 +
 .../src/operator.rs                                |  73 +----
 .../src/window_frame.rs                            |  22 +-
 datafusion/src/logical_plan/operators.rs           |  83 +----
 datafusion/src/logical_plan/window_frames.rs       | 363 +--------------------
 6 files changed, 22 insertions(+), 524 deletions(-)

diff --git a/datafusion-expr/Cargo.toml b/datafusion-expr/Cargo.toml
index 3cac735..d8a4f29 100644
--- a/datafusion-expr/Cargo.toml
+++ b/datafusion-expr/Cargo.toml
@@ -38,3 +38,4 @@ path = "src/lib.rs"
 [dependencies]
 datafusion-common = { path = "../datafusion-common" }
 arrow = { version = "8.0.0", features = ["prettyprint"] }
+sqlparser = "0.13"
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index b6eaaf7..13fa93e 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -16,7 +16,11 @@
 // under the License.
 
 mod aggregate_function;
+mod operator;
+mod window_frame;
 mod window_function;
 
 pub use aggregate_function::AggregateFunction;
+pub use operator::Operator;
+pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
 pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion/src/logical_plan/operators.rs b/datafusion-expr/src/operator.rs
similarity index 67%
copy from datafusion/src/logical_plan/operators.rs
copy to datafusion-expr/src/operator.rs
index 14ccab0..e6b7e35 100644
--- a/datafusion/src/logical_plan/operators.rs
+++ b/datafusion-expr/src/operator.rs
@@ -15,9 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{fmt, ops};
-
-use super::{binary_expr, Expr};
+use std::fmt;
 
 /// Operators applied to expressions
 #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash)]
@@ -97,72 +95,3 @@ impl fmt::Display for Operator {
         write!(f, "{}", display)
     }
 }
-
-impl ops::Add for Expr {
-    type Output = Self;
-
-    fn add(self, rhs: Self) -> Self {
-        binary_expr(self, Operator::Plus, rhs)
-    }
-}
-
-impl ops::Sub for Expr {
-    type Output = Self;
-
-    fn sub(self, rhs: Self) -> Self {
-        binary_expr(self, Operator::Minus, rhs)
-    }
-}
-
-impl ops::Mul for Expr {
-    type Output = Self;
-
-    fn mul(self, rhs: Self) -> Self {
-        binary_expr(self, Operator::Multiply, rhs)
-    }
-}
-
-impl ops::Div for Expr {
-    type Output = Self;
-
-    fn div(self, rhs: Self) -> Self {
-        binary_expr(self, Operator::Divide, rhs)
-    }
-}
-
-impl ops::Rem for Expr {
-    type Output = Self;
-
-    fn rem(self, rhs: Self) -> Self {
-        binary_expr(self, Operator::Modulo, rhs)
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use crate::prelude::lit;
-
-    #[test]
-    fn test_operators() {
-        assert_eq!(
-            format!("{:?}", lit(1u32) + lit(2u32)),
-            "UInt32(1) + UInt32(2)"
-        );
-        assert_eq!(
-            format!("{:?}", lit(1u32) - lit(2u32)),
-            "UInt32(1) - UInt32(2)"
-        );
-        assert_eq!(
-            format!("{:?}", lit(1u32) * lit(2u32)),
-            "UInt32(1) * UInt32(2)"
-        );
-        assert_eq!(
-            format!("{:?}", lit(1u32) / lit(2u32)),
-            "UInt32(1) / UInt32(2)"
-        );
-        assert_eq!(
-            format!("{:?}", lit(1u32) % lit(2u32)),
-            "UInt32(1) % UInt32(2)"
-        );
-    }
-}
diff --git a/datafusion/src/logical_plan/window_frames.rs b/datafusion-expr/src/window_frame.rs
similarity index 96%
copy from datafusion/src/logical_plan/window_frames.rs
copy to datafusion-expr/src/window_frame.rs
index 42e0a7e..ba65a50 100644
--- a/datafusion/src/logical_plan/window_frames.rs
+++ b/datafusion-expr/src/window_frame.rs
@@ -23,7 +23,7 @@
 //! - An ending frame boundary,
 //! - An EXCLUDE clause.
 
-use crate::error::{DataFusionError, Result};
+use datafusion_common::{DataFusionError, Result};
 use sqlparser::ast;
 use std::cmp::Ordering;
 use std::convert::{From, TryFrom};
@@ -78,9 +78,9 @@ impl TryFrom<ast::WindowFrame> for WindowFrame {
             ))
         } else if start_bound > end_bound {
             Err(DataFusionError::Execution(format!(
-            "Invalid window frame: start bound ({}) cannot be larger than end bound ({})",
-            start_bound, end_bound
-        )))
+        "Invalid window frame: start bound ({}) cannot be larger than end bound ({})",
+        start_bound, end_bound
+      )))
         } else {
             let units = value.units.into();
             if units == WindowFrameUnits::Range {
@@ -268,9 +268,10 @@ mod tests {
         };
         let result = WindowFrame::try_from(window_frame);
         assert_eq!(
-            result.err().unwrap().to_string(),
-            "Execution error: Invalid window frame: start bound cannot be unbounded following".to_owned()
-        );
+      result.err().unwrap().to_string(),
+      "Execution error: Invalid window frame: start bound cannot be unbounded following"
+        .to_owned()
+    );
 
         let window_frame = ast::WindowFrame {
             units: ast::WindowFrameUnits::Range,
@@ -279,9 +280,10 @@ mod tests {
         };
         let result = WindowFrame::try_from(window_frame);
         assert_eq!(
-            result.err().unwrap().to_string(),
-            "Execution error: Invalid window frame: end bound cannot be unbounded preceding".to_owned()
-        );
+      result.err().unwrap().to_string(),
+      "Execution error: Invalid window frame: end bound cannot be unbounded preceding"
+        .to_owned()
+    );
 
         let window_frame = ast::WindowFrame {
             units: ast::WindowFrameUnits::Range,
diff --git a/datafusion/src/logical_plan/operators.rs b/datafusion/src/logical_plan/operators.rs
index 14ccab0..813f7e0 100644
--- a/datafusion/src/logical_plan/operators.rs
+++ b/datafusion/src/logical_plan/operators.rs
@@ -15,88 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{fmt, ops};
-
 use super::{binary_expr, Expr};
-
-/// Operators applied to expressions
-#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash)]
-pub enum Operator {
-    /// Expressions are equal
-    Eq,
-    /// Expressions are not equal
-    NotEq,
-    /// Left side is smaller than right side
-    Lt,
-    /// Left side is smaller or equal to right side
-    LtEq,
-    /// Left side is greater than right side
-    Gt,
-    /// Left side is greater or equal to right side
-    GtEq,
-    /// Addition
-    Plus,
-    /// Subtraction
-    Minus,
-    /// Multiplication operator, like `*`
-    Multiply,
-    /// Division operator, like `/`
-    Divide,
-    /// Remainder operator, like `%`
-    Modulo,
-    /// Logical AND, like `&&`
-    And,
-    /// Logical OR, like `||`
-    Or,
-    /// Matches a wildcard pattern
-    Like,
-    /// Does not match a wildcard pattern
-    NotLike,
-    /// IS DISTINCT FROM
-    IsDistinctFrom,
-    /// IS NOT DISTINCT FROM
-    IsNotDistinctFrom,
-    /// Case sensitive regex match
-    RegexMatch,
-    /// Case insensitive regex match
-    RegexIMatch,
-    /// Case sensitive regex not match
-    RegexNotMatch,
-    /// Case insensitive regex not match
-    RegexNotIMatch,
-    /// Bitwise and, like `&`
-    BitwiseAnd,
-}
-
-impl fmt::Display for Operator {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        let display = match &self {
-            Operator::Eq => "=",
-            Operator::NotEq => "!=",
-            Operator::Lt => "<",
-            Operator::LtEq => "<=",
-            Operator::Gt => ">",
-            Operator::GtEq => ">=",
-            Operator::Plus => "+",
-            Operator::Minus => "-",
-            Operator::Multiply => "*",
-            Operator::Divide => "/",
-            Operator::Modulo => "%",
-            Operator::And => "AND",
-            Operator::Or => "OR",
-            Operator::Like => "LIKE",
-            Operator::NotLike => "NOT LIKE",
-            Operator::RegexMatch => "~",
-            Operator::RegexIMatch => "~*",
-            Operator::RegexNotMatch => "!~",
-            Operator::RegexNotIMatch => "!~*",
-            Operator::IsDistinctFrom => "IS DISTINCT FROM",
-            Operator::IsNotDistinctFrom => "IS NOT DISTINCT FROM",
-            Operator::BitwiseAnd => "&",
-        };
-        write!(f, "{}", display)
-    }
-}
+pub use datafusion_expr::Operator;
+use std::ops;
 
 impl ops::Add for Expr {
     type Output = Self;
diff --git a/datafusion/src/logical_plan/window_frames.rs b/datafusion/src/logical_plan/window_frames.rs
index 42e0a7e..5195820 100644
--- a/datafusion/src/logical_plan/window_frames.rs
+++ b/datafusion/src/logical_plan/window_frames.rs
@@ -15,365 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Window frame
-//!
-//! The frame-spec determines which output rows are read by an aggregate window function. The frame-spec consists of four parts:
-//! - A frame type - either ROWS, RANGE or GROUPS,
-//! - A starting frame boundary,
-//! - An ending frame boundary,
-//! - An EXCLUDE clause.
+//! Window frame types, reimported from datafusion_expr
 
-use crate::error::{DataFusionError, Result};
-use sqlparser::ast;
-use std::cmp::Ordering;
-use std::convert::{From, TryFrom};
-use std::fmt;
-use std::hash::{Hash, Hasher};
-
-/// The frame-spec determines which output rows are read by an aggregate window function.
-///
-/// The ending frame boundary can be omitted (if the BETWEEN and AND keywords that surround the
-/// starting frame boundary are also omitted), in which case the ending frame boundary defaults to
-/// CURRENT ROW.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
-pub struct WindowFrame {
-    /// A frame type - either ROWS, RANGE or GROUPS
-    pub units: WindowFrameUnits,
-    /// A starting frame boundary
-    pub start_bound: WindowFrameBound,
-    /// An ending frame boundary
-    pub end_bound: WindowFrameBound,
-}
-
-impl fmt::Display for WindowFrame {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        write!(
-            f,
-            "{} BETWEEN {} AND {}",
-            self.units, self.start_bound, self.end_bound
-        )?;
-        Ok(())
-    }
-}
-
-impl TryFrom<ast::WindowFrame> for WindowFrame {
-    type Error = DataFusionError;
-
-    fn try_from(value: ast::WindowFrame) -> Result<Self> {
-        let start_bound = value.start_bound.into();
-        let end_bound = value
-            .end_bound
-            .map(WindowFrameBound::from)
-            .unwrap_or(WindowFrameBound::CurrentRow);
-
-        if let WindowFrameBound::Following(None) = start_bound {
-            Err(DataFusionError::Execution(
-                "Invalid window frame: start bound cannot be unbounded following"
-                    .to_owned(),
-            ))
-        } else if let WindowFrameBound::Preceding(None) = end_bound {
-            Err(DataFusionError::Execution(
-                "Invalid window frame: end bound cannot be unbounded preceding"
-                    .to_owned(),
-            ))
-        } else if start_bound > end_bound {
-            Err(DataFusionError::Execution(format!(
-            "Invalid window frame: start bound ({}) cannot be larger than end bound ({})",
-            start_bound, end_bound
-        )))
-        } else {
-            let units = value.units.into();
-            if units == WindowFrameUnits::Range {
-                for bound in &[start_bound, end_bound] {
-                    match bound {
-                        WindowFrameBound::Preceding(Some(v))
-                        | WindowFrameBound::Following(Some(v))
-                            if *v > 0 =>
-                        {
-                            Err(DataFusionError::NotImplemented(format!(
-                                "With WindowFrameUnits={}, the bound cannot be {} PRECEDING or FOLLOWING at the moment",
-                                units, v
-                            )))
-                        }
-                        _ => Ok(()),
-                    }?;
-                }
-            }
-            Ok(Self {
-                units,
-                start_bound,
-                end_bound,
-            })
-        }
-    }
-}
-
-impl Default for WindowFrame {
-    fn default() -> Self {
-        WindowFrame {
-            units: WindowFrameUnits::Range,
-            start_bound: WindowFrameBound::Preceding(None),
-            end_bound: WindowFrameBound::CurrentRow,
-        }
-    }
-}
-
-/// There are five ways to describe starting and ending frame boundaries:
-///
-/// 1. UNBOUNDED PRECEDING
-/// 2. <expr> PRECEDING
-/// 3. CURRENT ROW
-/// 4. <expr> FOLLOWING
-/// 5. UNBOUNDED FOLLOWING
-///
-/// in this implementation we'll only allow <expr> to be u64 (i.e. no dynamic boundary)
-#[derive(Debug, Clone, Copy, Eq)]
-pub enum WindowFrameBound {
-    /// 1. UNBOUNDED PRECEDING
-    /// The frame boundary is the first row in the partition.
-    ///
-    /// 2. <expr> PRECEDING
-    /// <expr> must be a non-negative constant numeric expression. The boundary is a row that
-    /// is <expr> "units" prior to the current row.
-    Preceding(Option<u64>),
-    /// 3. The current row.
-    ///
-    /// For RANGE and GROUPS frame types, peers of the current row are also
-    /// included in the frame, unless specifically excluded by the EXCLUDE clause.
-    /// This is true regardless of whether CURRENT ROW is used as the starting or ending frame
-    /// boundary.
-    CurrentRow,
-    /// 4. This is the same as "<expr> PRECEDING" except that the boundary is <expr> units after the
-    /// current rather than before the current row.
-    ///
-    /// 5. UNBOUNDED FOLLOWING
-    /// The frame boundary is the last row in the partition.
-    Following(Option<u64>),
-}
-
-impl From<ast::WindowFrameBound> for WindowFrameBound {
-    fn from(value: ast::WindowFrameBound) -> Self {
-        match value {
-            ast::WindowFrameBound::Preceding(v) => Self::Preceding(v),
-            ast::WindowFrameBound::Following(v) => Self::Following(v),
-            ast::WindowFrameBound::CurrentRow => Self::CurrentRow,
-        }
-    }
-}
-
-impl fmt::Display for WindowFrameBound {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        match self {
-            WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"),
-            WindowFrameBound::Preceding(None) => f.write_str("UNBOUNDED PRECEDING"),
-            WindowFrameBound::Following(None) => f.write_str("UNBOUNDED FOLLOWING"),
-            WindowFrameBound::Preceding(Some(n)) => write!(f, "{} PRECEDING", n),
-            WindowFrameBound::Following(Some(n)) => write!(f, "{} FOLLOWING", n),
-        }
-    }
-}
-
-impl PartialEq for WindowFrameBound {
-    fn eq(&self, other: &Self) -> bool {
-        self.cmp(other) == Ordering::Equal
-    }
-}
-
-impl PartialOrd for WindowFrameBound {
-    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
-        Some(self.cmp(other))
-    }
-}
-
-impl Ord for WindowFrameBound {
-    fn cmp(&self, other: &Self) -> Ordering {
-        self.get_rank().cmp(&other.get_rank())
-    }
-}
-
-impl Hash for WindowFrameBound {
-    fn hash<H: Hasher>(&self, state: &mut H) {
-        self.get_rank().hash(state)
-    }
-}
-
-impl WindowFrameBound {
-    /// get the rank of this window frame bound.
-    ///
-    /// the rank is a tuple of (u8, u64) because we'll firstly compare the kind and then the value
-    /// which requires special handling e.g. with preceding the larger the value the smaller the
-    /// rank and also for 0 preceding / following it is the same as current row
-    fn get_rank(&self) -> (u8, u64) {
-        match self {
-            WindowFrameBound::Preceding(None) => (0, 0),
-            WindowFrameBound::Following(None) => (4, 0),
-            WindowFrameBound::Preceding(Some(0))
-            | WindowFrameBound::CurrentRow
-            | WindowFrameBound::Following(Some(0)) => (2, 0),
-            WindowFrameBound::Preceding(Some(v)) => (1, u64::MAX - *v),
-            WindowFrameBound::Following(Some(v)) => (3, *v),
-        }
-    }
-}
-
-/// There are three frame types: ROWS, GROUPS, and RANGE. The frame type determines how the
-/// starting and ending boundaries of the frame are measured.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
-pub enum WindowFrameUnits {
-    /// The ROWS frame type means that the starting and ending boundaries for the frame are
-    /// determined by counting individual rows relative to the current row.
-    Rows,
-    /// The RANGE frame type requires that the ORDER BY clause of the window have exactly one
-    /// term. Call that term "X". With the RANGE frame type, the elements of the frame are
-    /// determined by computing the value of expression X for all rows in the partition and framing
-    /// those rows for which the value of X is within a certain range of the value of X for the
-    /// current row.
-    Range,
-    /// The GROUPS frame type means that the starting and ending boundaries are determine
-    /// by counting "groups" relative to the current group. A "group" is a set of rows that all have
-    /// equivalent values for all all terms of the window ORDER BY clause.
-    Groups,
-}
-
-impl fmt::Display for WindowFrameUnits {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        f.write_str(match self {
-            WindowFrameUnits::Rows => "ROWS",
-            WindowFrameUnits::Range => "RANGE",
-            WindowFrameUnits::Groups => "GROUPS",
-        })
-    }
-}
-
-impl From<ast::WindowFrameUnits> for WindowFrameUnits {
-    fn from(value: ast::WindowFrameUnits) -> Self {
-        match value {
-            ast::WindowFrameUnits::Range => Self::Range,
-            ast::WindowFrameUnits::Groups => Self::Groups,
-            ast::WindowFrameUnits::Rows => Self::Rows,
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[test]
-    fn test_window_frame_creation() -> Result<()> {
-        let window_frame = ast::WindowFrame {
-            units: ast::WindowFrameUnits::Range,
-            start_bound: ast::WindowFrameBound::Following(None),
-            end_bound: None,
-        };
-        let result = WindowFrame::try_from(window_frame);
-        assert_eq!(
-            result.err().unwrap().to_string(),
-            "Execution error: Invalid window frame: start bound cannot be unbounded following".to_owned()
-        );
-
-        let window_frame = ast::WindowFrame {
-            units: ast::WindowFrameUnits::Range,
-            start_bound: ast::WindowFrameBound::Preceding(None),
-            end_bound: Some(ast::WindowFrameBound::Preceding(None)),
-        };
-        let result = WindowFrame::try_from(window_frame);
-        assert_eq!(
-            result.err().unwrap().to_string(),
-            "Execution error: Invalid window frame: end bound cannot be unbounded preceding".to_owned()
-        );
-
-        let window_frame = ast::WindowFrame {
-            units: ast::WindowFrameUnits::Range,
-            start_bound: ast::WindowFrameBound::Preceding(Some(1)),
-            end_bound: Some(ast::WindowFrameBound::Preceding(Some(2))),
-        };
-        let result = WindowFrame::try_from(window_frame);
-        assert_eq!(
-            result.err().unwrap().to_string(),
-            "Execution error: Invalid window frame: start bound (1 PRECEDING) cannot be larger than end bound (2 PRECEDING)".to_owned()
-        );
-
-        let window_frame = ast::WindowFrame {
-            units: ast::WindowFrameUnits::Range,
-            start_bound: ast::WindowFrameBound::Preceding(Some(2)),
-            end_bound: Some(ast::WindowFrameBound::Preceding(Some(1))),
-        };
-        let result = WindowFrame::try_from(window_frame);
-        assert_eq!(
-            result.err().unwrap().to_string(),
-            "This feature is not implemented: With WindowFrameUnits=RANGE, the bound cannot be 2 PRECEDING or FOLLOWING at the moment".to_owned()
-        );
-
-        let window_frame = ast::WindowFrame {
-            units: ast::WindowFrameUnits::Rows,
-            start_bound: ast::WindowFrameBound::Preceding(Some(2)),
-            end_bound: Some(ast::WindowFrameBound::Preceding(Some(1))),
-        };
-        let result = WindowFrame::try_from(window_frame);
-        assert!(result.is_ok());
-        Ok(())
-    }
-
-    #[test]
-    fn test_eq() {
-        assert_eq!(
-            WindowFrameBound::Preceding(Some(0)),
-            WindowFrameBound::CurrentRow
-        );
-        assert_eq!(
-            WindowFrameBound::CurrentRow,
-            WindowFrameBound::Following(Some(0))
-        );
-        assert_eq!(
-            WindowFrameBound::Following(Some(2)),
-            WindowFrameBound::Following(Some(2))
-        );
-        assert_eq!(
-            WindowFrameBound::Following(None),
-            WindowFrameBound::Following(None)
-        );
-        assert_eq!(
-            WindowFrameBound::Preceding(Some(2)),
-            WindowFrameBound::Preceding(Some(2))
-        );
-        assert_eq!(
-            WindowFrameBound::Preceding(None),
-            WindowFrameBound::Preceding(None)
-        );
-    }
-
-    #[test]
-    fn test_ord() {
-        assert!(WindowFrameBound::Preceding(Some(1)) < WindowFrameBound::CurrentRow);
-        // ! yes this is correct!
-        assert!(
-            WindowFrameBound::Preceding(Some(2)) < WindowFrameBound::Preceding(Some(1))
-        );
-        assert!(
-            WindowFrameBound::Preceding(Some(u64::MAX))
-                < WindowFrameBound::Preceding(Some(u64::MAX - 1))
-        );
-        assert!(
-            WindowFrameBound::Preceding(None)
-                < WindowFrameBound::Preceding(Some(1000000))
-        );
-        assert!(
-            WindowFrameBound::Preceding(None)
-                < WindowFrameBound::Preceding(Some(u64::MAX))
-        );
-        assert!(WindowFrameBound::Preceding(None) < WindowFrameBound::Following(Some(0)));
-        assert!(
-            WindowFrameBound::Preceding(Some(1)) < WindowFrameBound::Following(Some(1))
-        );
-        assert!(WindowFrameBound::CurrentRow < WindowFrameBound::Following(Some(1)));
-        assert!(
-            WindowFrameBound::Following(Some(1)) < WindowFrameBound::Following(Some(2))
-        );
-        assert!(WindowFrameBound::Following(Some(2)) < WindowFrameBound::Following(None));
-        assert!(
-            WindowFrameBound::Following(Some(u64::MAX))
-                < WindowFrameBound::Following(None)
-        );
-    }
-}
+pub use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};