You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/06 13:52:59 UTC
[arrow-datafusion] 03/06: include window frames and operator into datafusion-expr
This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch move-accum
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit b7eb091044a8219f965a8b5742dab43a05b25a67
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 15:31:10 2022 +0800
include window frames and operator into datafusion-expr
---
datafusion-expr/Cargo.toml | 1 +
datafusion-expr/src/lib.rs | 4 +
.../src/operator.rs | 73 +----
.../src/window_frame.rs | 22 +-
datafusion/src/logical_plan/operators.rs | 83 +----
datafusion/src/logical_plan/window_frames.rs | 363 +--------------------
6 files changed, 22 insertions(+), 524 deletions(-)
diff --git a/datafusion-expr/Cargo.toml b/datafusion-expr/Cargo.toml
index 3cac735..d8a4f29 100644
--- a/datafusion-expr/Cargo.toml
+++ b/datafusion-expr/Cargo.toml
@@ -38,3 +38,4 @@ path = "src/lib.rs"
[dependencies]
datafusion-common = { path = "../datafusion-common" }
arrow = { version = "8.0.0", features = ["prettyprint"] }
+sqlparser = "0.13"
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index b6eaaf7..13fa93e 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -16,7 +16,11 @@
// under the License.
mod aggregate_function;
+mod operator;
+mod window_frame;
mod window_function;
pub use aggregate_function::AggregateFunction;
+pub use operator::Operator;
+pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion/src/logical_plan/operators.rs b/datafusion-expr/src/operator.rs
similarity index 67%
copy from datafusion/src/logical_plan/operators.rs
copy to datafusion-expr/src/operator.rs
index 14ccab0..e6b7e35 100644
--- a/datafusion/src/logical_plan/operators.rs
+++ b/datafusion-expr/src/operator.rs
@@ -15,9 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use std::{fmt, ops};
-
-use super::{binary_expr, Expr};
+use std::fmt;
/// Operators applied to expressions
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash)]
@@ -97,72 +95,3 @@ impl fmt::Display for Operator {
write!(f, "{}", display)
}
}
-
-impl ops::Add for Expr {
- type Output = Self;
-
- fn add(self, rhs: Self) -> Self {
- binary_expr(self, Operator::Plus, rhs)
- }
-}
-
-impl ops::Sub for Expr {
- type Output = Self;
-
- fn sub(self, rhs: Self) -> Self {
- binary_expr(self, Operator::Minus, rhs)
- }
-}
-
-impl ops::Mul for Expr {
- type Output = Self;
-
- fn mul(self, rhs: Self) -> Self {
- binary_expr(self, Operator::Multiply, rhs)
- }
-}
-
-impl ops::Div for Expr {
- type Output = Self;
-
- fn div(self, rhs: Self) -> Self {
- binary_expr(self, Operator::Divide, rhs)
- }
-}
-
-impl ops::Rem for Expr {
- type Output = Self;
-
- fn rem(self, rhs: Self) -> Self {
- binary_expr(self, Operator::Modulo, rhs)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use crate::prelude::lit;
-
- #[test]
- fn test_operators() {
- assert_eq!(
- format!("{:?}", lit(1u32) + lit(2u32)),
- "UInt32(1) + UInt32(2)"
- );
- assert_eq!(
- format!("{:?}", lit(1u32) - lit(2u32)),
- "UInt32(1) - UInt32(2)"
- );
- assert_eq!(
- format!("{:?}", lit(1u32) * lit(2u32)),
- "UInt32(1) * UInt32(2)"
- );
- assert_eq!(
- format!("{:?}", lit(1u32) / lit(2u32)),
- "UInt32(1) / UInt32(2)"
- );
- assert_eq!(
- format!("{:?}", lit(1u32) % lit(2u32)),
- "UInt32(1) % UInt32(2)"
- );
- }
-}
diff --git a/datafusion/src/logical_plan/window_frames.rs b/datafusion-expr/src/window_frame.rs
similarity index 96%
copy from datafusion/src/logical_plan/window_frames.rs
copy to datafusion-expr/src/window_frame.rs
index 42e0a7e..ba65a50 100644
--- a/datafusion/src/logical_plan/window_frames.rs
+++ b/datafusion-expr/src/window_frame.rs
@@ -23,7 +23,7 @@
//! - An ending frame boundary,
//! - An EXCLUDE clause.
-use crate::error::{DataFusionError, Result};
+use datafusion_common::{DataFusionError, Result};
use sqlparser::ast;
use std::cmp::Ordering;
use std::convert::{From, TryFrom};
@@ -78,9 +78,9 @@ impl TryFrom<ast::WindowFrame> for WindowFrame {
))
} else if start_bound > end_bound {
Err(DataFusionError::Execution(format!(
- "Invalid window frame: start bound ({}) cannot be larger than end bound ({})",
- start_bound, end_bound
- )))
+ "Invalid window frame: start bound ({}) cannot be larger than end bound ({})",
+ start_bound, end_bound
+ )))
} else {
let units = value.units.into();
if units == WindowFrameUnits::Range {
@@ -268,9 +268,10 @@ mod tests {
};
let result = WindowFrame::try_from(window_frame);
assert_eq!(
- result.err().unwrap().to_string(),
- "Execution error: Invalid window frame: start bound cannot be unbounded following".to_owned()
- );
+ result.err().unwrap().to_string(),
+ "Execution error: Invalid window frame: start bound cannot be unbounded following"
+ .to_owned()
+ );
let window_frame = ast::WindowFrame {
units: ast::WindowFrameUnits::Range,
@@ -279,9 +280,10 @@ mod tests {
};
let result = WindowFrame::try_from(window_frame);
assert_eq!(
- result.err().unwrap().to_string(),
- "Execution error: Invalid window frame: end bound cannot be unbounded preceding".to_owned()
- );
+ result.err().unwrap().to_string(),
+ "Execution error: Invalid window frame: end bound cannot be unbounded preceding"
+ .to_owned()
+ );
let window_frame = ast::WindowFrame {
units: ast::WindowFrameUnits::Range,
diff --git a/datafusion/src/logical_plan/operators.rs b/datafusion/src/logical_plan/operators.rs
index 14ccab0..813f7e0 100644
--- a/datafusion/src/logical_plan/operators.rs
+++ b/datafusion/src/logical_plan/operators.rs
@@ -15,88 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-use std::{fmt, ops};
-
use super::{binary_expr, Expr};
-
-/// Operators applied to expressions
-#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash)]
-pub enum Operator {
- /// Expressions are equal
- Eq,
- /// Expressions are not equal
- NotEq,
- /// Left side is smaller than right side
- Lt,
- /// Left side is smaller or equal to right side
- LtEq,
- /// Left side is greater than right side
- Gt,
- /// Left side is greater or equal to right side
- GtEq,
- /// Addition
- Plus,
- /// Subtraction
- Minus,
- /// Multiplication operator, like `*`
- Multiply,
- /// Division operator, like `/`
- Divide,
- /// Remainder operator, like `%`
- Modulo,
- /// Logical AND, like `&&`
- And,
- /// Logical OR, like `||`
- Or,
- /// Matches a wildcard pattern
- Like,
- /// Does not match a wildcard pattern
- NotLike,
- /// IS DISTINCT FROM
- IsDistinctFrom,
- /// IS NOT DISTINCT FROM
- IsNotDistinctFrom,
- /// Case sensitive regex match
- RegexMatch,
- /// Case insensitive regex match
- RegexIMatch,
- /// Case sensitive regex not match
- RegexNotMatch,
- /// Case insensitive regex not match
- RegexNotIMatch,
- /// Bitwise and, like `&`
- BitwiseAnd,
-}
-
-impl fmt::Display for Operator {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- let display = match &self {
- Operator::Eq => "=",
- Operator::NotEq => "!=",
- Operator::Lt => "<",
- Operator::LtEq => "<=",
- Operator::Gt => ">",
- Operator::GtEq => ">=",
- Operator::Plus => "+",
- Operator::Minus => "-",
- Operator::Multiply => "*",
- Operator::Divide => "/",
- Operator::Modulo => "%",
- Operator::And => "AND",
- Operator::Or => "OR",
- Operator::Like => "LIKE",
- Operator::NotLike => "NOT LIKE",
- Operator::RegexMatch => "~",
- Operator::RegexIMatch => "~*",
- Operator::RegexNotMatch => "!~",
- Operator::RegexNotIMatch => "!~*",
- Operator::IsDistinctFrom => "IS DISTINCT FROM",
- Operator::IsNotDistinctFrom => "IS NOT DISTINCT FROM",
- Operator::BitwiseAnd => "&",
- };
- write!(f, "{}", display)
- }
-}
+pub use datafusion_expr::Operator;
+use std::ops;
impl ops::Add for Expr {
type Output = Self;
diff --git a/datafusion/src/logical_plan/window_frames.rs b/datafusion/src/logical_plan/window_frames.rs
index 42e0a7e..5195820 100644
--- a/datafusion/src/logical_plan/window_frames.rs
+++ b/datafusion/src/logical_plan/window_frames.rs
@@ -15,365 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-//! Window frame
-//!
-//! The frame-spec determines which output rows are read by an aggregate window function. The frame-spec consists of four parts:
-//! - A frame type - either ROWS, RANGE or GROUPS,
-//! - A starting frame boundary,
-//! - An ending frame boundary,
-//! - An EXCLUDE clause.
+//! Window frame types, reimported from datafusion_expr
-use crate::error::{DataFusionError, Result};
-use sqlparser::ast;
-use std::cmp::Ordering;
-use std::convert::{From, TryFrom};
-use std::fmt;
-use std::hash::{Hash, Hasher};
-
-/// The frame-spec determines which output rows are read by an aggregate window function.
-///
-/// The ending frame boundary can be omitted (if the BETWEEN and AND keywords that surround the
-/// starting frame boundary are also omitted), in which case the ending frame boundary defaults to
-/// CURRENT ROW.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
-pub struct WindowFrame {
- /// A frame type - either ROWS, RANGE or GROUPS
- pub units: WindowFrameUnits,
- /// A starting frame boundary
- pub start_bound: WindowFrameBound,
- /// An ending frame boundary
- pub end_bound: WindowFrameBound,
-}
-
-impl fmt::Display for WindowFrame {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(
- f,
- "{} BETWEEN {} AND {}",
- self.units, self.start_bound, self.end_bound
- )?;
- Ok(())
- }
-}
-
-impl TryFrom<ast::WindowFrame> for WindowFrame {
- type Error = DataFusionError;
-
- fn try_from(value: ast::WindowFrame) -> Result<Self> {
- let start_bound = value.start_bound.into();
- let end_bound = value
- .end_bound
- .map(WindowFrameBound::from)
- .unwrap_or(WindowFrameBound::CurrentRow);
-
- if let WindowFrameBound::Following(None) = start_bound {
- Err(DataFusionError::Execution(
- "Invalid window frame: start bound cannot be unbounded following"
- .to_owned(),
- ))
- } else if let WindowFrameBound::Preceding(None) = end_bound {
- Err(DataFusionError::Execution(
- "Invalid window frame: end bound cannot be unbounded preceding"
- .to_owned(),
- ))
- } else if start_bound > end_bound {
- Err(DataFusionError::Execution(format!(
- "Invalid window frame: start bound ({}) cannot be larger than end bound ({})",
- start_bound, end_bound
- )))
- } else {
- let units = value.units.into();
- if units == WindowFrameUnits::Range {
- for bound in &[start_bound, end_bound] {
- match bound {
- WindowFrameBound::Preceding(Some(v))
- | WindowFrameBound::Following(Some(v))
- if *v > 0 =>
- {
- Err(DataFusionError::NotImplemented(format!(
- "With WindowFrameUnits={}, the bound cannot be {} PRECEDING or FOLLOWING at the moment",
- units, v
- )))
- }
- _ => Ok(()),
- }?;
- }
- }
- Ok(Self {
- units,
- start_bound,
- end_bound,
- })
- }
- }
-}
-
-impl Default for WindowFrame {
- fn default() -> Self {
- WindowFrame {
- units: WindowFrameUnits::Range,
- start_bound: WindowFrameBound::Preceding(None),
- end_bound: WindowFrameBound::CurrentRow,
- }
- }
-}
-
-/// There are five ways to describe starting and ending frame boundaries:
-///
-/// 1. UNBOUNDED PRECEDING
-/// 2. <expr> PRECEDING
-/// 3. CURRENT ROW
-/// 4. <expr> FOLLOWING
-/// 5. UNBOUNDED FOLLOWING
-///
-/// in this implementation we'll only allow <expr> to be u64 (i.e. no dynamic boundary)
-#[derive(Debug, Clone, Copy, Eq)]
-pub enum WindowFrameBound {
- /// 1. UNBOUNDED PRECEDING
- /// The frame boundary is the first row in the partition.
- ///
- /// 2. <expr> PRECEDING
- /// <expr> must be a non-negative constant numeric expression. The boundary is a row that
- /// is <expr> "units" prior to the current row.
- Preceding(Option<u64>),
- /// 3. The current row.
- ///
- /// For RANGE and GROUPS frame types, peers of the current row are also
- /// included in the frame, unless specifically excluded by the EXCLUDE clause.
- /// This is true regardless of whether CURRENT ROW is used as the starting or ending frame
- /// boundary.
- CurrentRow,
- /// 4. This is the same as "<expr> PRECEDING" except that the boundary is <expr> units after the
- /// current rather than before the current row.
- ///
- /// 5. UNBOUNDED FOLLOWING
- /// The frame boundary is the last row in the partition.
- Following(Option<u64>),
-}
-
-impl From<ast::WindowFrameBound> for WindowFrameBound {
- fn from(value: ast::WindowFrameBound) -> Self {
- match value {
- ast::WindowFrameBound::Preceding(v) => Self::Preceding(v),
- ast::WindowFrameBound::Following(v) => Self::Following(v),
- ast::WindowFrameBound::CurrentRow => Self::CurrentRow,
- }
- }
-}
-
-impl fmt::Display for WindowFrameBound {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match self {
- WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"),
- WindowFrameBound::Preceding(None) => f.write_str("UNBOUNDED PRECEDING"),
- WindowFrameBound::Following(None) => f.write_str("UNBOUNDED FOLLOWING"),
- WindowFrameBound::Preceding(Some(n)) => write!(f, "{} PRECEDING", n),
- WindowFrameBound::Following(Some(n)) => write!(f, "{} FOLLOWING", n),
- }
- }
-}
-
-impl PartialEq for WindowFrameBound {
- fn eq(&self, other: &Self) -> bool {
- self.cmp(other) == Ordering::Equal
- }
-}
-
-impl PartialOrd for WindowFrameBound {
- fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
- Some(self.cmp(other))
- }
-}
-
-impl Ord for WindowFrameBound {
- fn cmp(&self, other: &Self) -> Ordering {
- self.get_rank().cmp(&other.get_rank())
- }
-}
-
-impl Hash for WindowFrameBound {
- fn hash<H: Hasher>(&self, state: &mut H) {
- self.get_rank().hash(state)
- }
-}
-
-impl WindowFrameBound {
- /// get the rank of this window frame bound.
- ///
- /// the rank is a tuple of (u8, u64) because we'll firstly compare the kind and then the value
- /// which requires special handling e.g. with preceding the larger the value the smaller the
- /// rank and also for 0 preceding / following it is the same as current row
- fn get_rank(&self) -> (u8, u64) {
- match self {
- WindowFrameBound::Preceding(None) => (0, 0),
- WindowFrameBound::Following(None) => (4, 0),
- WindowFrameBound::Preceding(Some(0))
- | WindowFrameBound::CurrentRow
- | WindowFrameBound::Following(Some(0)) => (2, 0),
- WindowFrameBound::Preceding(Some(v)) => (1, u64::MAX - *v),
- WindowFrameBound::Following(Some(v)) => (3, *v),
- }
- }
-}
-
-/// There are three frame types: ROWS, GROUPS, and RANGE. The frame type determines how the
-/// starting and ending boundaries of the frame are measured.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
-pub enum WindowFrameUnits {
- /// The ROWS frame type means that the starting and ending boundaries for the frame are
- /// determined by counting individual rows relative to the current row.
- Rows,
- /// The RANGE frame type requires that the ORDER BY clause of the window have exactly one
- /// term. Call that term "X". With the RANGE frame type, the elements of the frame are
- /// determined by computing the value of expression X for all rows in the partition and framing
- /// those rows for which the value of X is within a certain range of the value of X for the
- /// current row.
- Range,
- /// The GROUPS frame type means that the starting and ending boundaries are determine
- /// by counting "groups" relative to the current group. A "group" is a set of rows that all have
- /// equivalent values for all all terms of the window ORDER BY clause.
- Groups,
-}
-
-impl fmt::Display for WindowFrameUnits {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- f.write_str(match self {
- WindowFrameUnits::Rows => "ROWS",
- WindowFrameUnits::Range => "RANGE",
- WindowFrameUnits::Groups => "GROUPS",
- })
- }
-}
-
-impl From<ast::WindowFrameUnits> for WindowFrameUnits {
- fn from(value: ast::WindowFrameUnits) -> Self {
- match value {
- ast::WindowFrameUnits::Range => Self::Range,
- ast::WindowFrameUnits::Groups => Self::Groups,
- ast::WindowFrameUnits::Rows => Self::Rows,
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_window_frame_creation() -> Result<()> {
- let window_frame = ast::WindowFrame {
- units: ast::WindowFrameUnits::Range,
- start_bound: ast::WindowFrameBound::Following(None),
- end_bound: None,
- };
- let result = WindowFrame::try_from(window_frame);
- assert_eq!(
- result.err().unwrap().to_string(),
- "Execution error: Invalid window frame: start bound cannot be unbounded following".to_owned()
- );
-
- let window_frame = ast::WindowFrame {
- units: ast::WindowFrameUnits::Range,
- start_bound: ast::WindowFrameBound::Preceding(None),
- end_bound: Some(ast::WindowFrameBound::Preceding(None)),
- };
- let result = WindowFrame::try_from(window_frame);
- assert_eq!(
- result.err().unwrap().to_string(),
- "Execution error: Invalid window frame: end bound cannot be unbounded preceding".to_owned()
- );
-
- let window_frame = ast::WindowFrame {
- units: ast::WindowFrameUnits::Range,
- start_bound: ast::WindowFrameBound::Preceding(Some(1)),
- end_bound: Some(ast::WindowFrameBound::Preceding(Some(2))),
- };
- let result = WindowFrame::try_from(window_frame);
- assert_eq!(
- result.err().unwrap().to_string(),
- "Execution error: Invalid window frame: start bound (1 PRECEDING) cannot be larger than end bound (2 PRECEDING)".to_owned()
- );
-
- let window_frame = ast::WindowFrame {
- units: ast::WindowFrameUnits::Range,
- start_bound: ast::WindowFrameBound::Preceding(Some(2)),
- end_bound: Some(ast::WindowFrameBound::Preceding(Some(1))),
- };
- let result = WindowFrame::try_from(window_frame);
- assert_eq!(
- result.err().unwrap().to_string(),
- "This feature is not implemented: With WindowFrameUnits=RANGE, the bound cannot be 2 PRECEDING or FOLLOWING at the moment".to_owned()
- );
-
- let window_frame = ast::WindowFrame {
- units: ast::WindowFrameUnits::Rows,
- start_bound: ast::WindowFrameBound::Preceding(Some(2)),
- end_bound: Some(ast::WindowFrameBound::Preceding(Some(1))),
- };
- let result = WindowFrame::try_from(window_frame);
- assert!(result.is_ok());
- Ok(())
- }
-
- #[test]
- fn test_eq() {
- assert_eq!(
- WindowFrameBound::Preceding(Some(0)),
- WindowFrameBound::CurrentRow
- );
- assert_eq!(
- WindowFrameBound::CurrentRow,
- WindowFrameBound::Following(Some(0))
- );
- assert_eq!(
- WindowFrameBound::Following(Some(2)),
- WindowFrameBound::Following(Some(2))
- );
- assert_eq!(
- WindowFrameBound::Following(None),
- WindowFrameBound::Following(None)
- );
- assert_eq!(
- WindowFrameBound::Preceding(Some(2)),
- WindowFrameBound::Preceding(Some(2))
- );
- assert_eq!(
- WindowFrameBound::Preceding(None),
- WindowFrameBound::Preceding(None)
- );
- }
-
- #[test]
- fn test_ord() {
- assert!(WindowFrameBound::Preceding(Some(1)) < WindowFrameBound::CurrentRow);
- // ! yes this is correct!
- assert!(
- WindowFrameBound::Preceding(Some(2)) < WindowFrameBound::Preceding(Some(1))
- );
- assert!(
- WindowFrameBound::Preceding(Some(u64::MAX))
- < WindowFrameBound::Preceding(Some(u64::MAX - 1))
- );
- assert!(
- WindowFrameBound::Preceding(None)
- < WindowFrameBound::Preceding(Some(1000000))
- );
- assert!(
- WindowFrameBound::Preceding(None)
- < WindowFrameBound::Preceding(Some(u64::MAX))
- );
- assert!(WindowFrameBound::Preceding(None) < WindowFrameBound::Following(Some(0)));
- assert!(
- WindowFrameBound::Preceding(Some(1)) < WindowFrameBound::Following(Some(1))
- );
- assert!(WindowFrameBound::CurrentRow < WindowFrameBound::Following(Some(1)));
- assert!(
- WindowFrameBound::Following(Some(1)) < WindowFrameBound::Following(Some(2))
- );
- assert!(WindowFrameBound::Following(Some(2)) < WindowFrameBound::Following(None));
- assert!(
- WindowFrameBound::Following(Some(u64::MAX))
- < WindowFrameBound::Following(None)
- );
- }
-}
+pub use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};