You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/06 13:52:56 UTC

[arrow-datafusion] branch move-accum created (now caf44b2)

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a change to branch move-accum
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git.


      at caf44b2  move accumulator and columnar value

This branch includes the following new commits:

     new 2f67cf5  add datafusion-expr module
     new 974c13a  format
     new b7eb091  include window frames and operator into datafusion-expr
     new c8b5891  move signature, type signature, and volatility to datafusion-expr
     new 083a9eb  move built-in scalar function
     new caf44b2  move accumulator and columnar value

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[arrow-datafusion] 03/06: include window frames and operator into datafusion-expr

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-accum
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit b7eb091044a8219f965a8b5742dab43a05b25a67
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 15:31:10 2022 +0800

    include window frames and operator into datafusion-expr
---
 datafusion-expr/Cargo.toml                         |   1 +
 datafusion-expr/src/lib.rs                         |   4 +
 .../src/operator.rs                                |  73 +----
 .../src/window_frame.rs                            |  22 +-
 datafusion/src/logical_plan/operators.rs           |  83 +----
 datafusion/src/logical_plan/window_frames.rs       | 363 +--------------------
 6 files changed, 22 insertions(+), 524 deletions(-)

diff --git a/datafusion-expr/Cargo.toml b/datafusion-expr/Cargo.toml
index 3cac735..d8a4f29 100644
--- a/datafusion-expr/Cargo.toml
+++ b/datafusion-expr/Cargo.toml
@@ -38,3 +38,4 @@ path = "src/lib.rs"
 [dependencies]
 datafusion-common = { path = "../datafusion-common" }
 arrow = { version = "8.0.0", features = ["prettyprint"] }
+sqlparser = "0.13"
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index b6eaaf7..13fa93e 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -16,7 +16,11 @@
 // under the License.
 
 mod aggregate_function;
+mod operator;
+mod window_frame;
 mod window_function;
 
 pub use aggregate_function::AggregateFunction;
+pub use operator::Operator;
+pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
 pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion/src/logical_plan/operators.rs b/datafusion-expr/src/operator.rs
similarity index 67%
copy from datafusion/src/logical_plan/operators.rs
copy to datafusion-expr/src/operator.rs
index 14ccab0..e6b7e35 100644
--- a/datafusion/src/logical_plan/operators.rs
+++ b/datafusion-expr/src/operator.rs
@@ -15,9 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{fmt, ops};
-
-use super::{binary_expr, Expr};
+use std::fmt;
 
 /// Operators applied to expressions
 #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash)]
@@ -97,72 +95,3 @@ impl fmt::Display for Operator {
         write!(f, "{}", display)
     }
 }
-
-impl ops::Add for Expr {
-    type Output = Self;
-
-    fn add(self, rhs: Self) -> Self {
-        binary_expr(self, Operator::Plus, rhs)
-    }
-}
-
-impl ops::Sub for Expr {
-    type Output = Self;
-
-    fn sub(self, rhs: Self) -> Self {
-        binary_expr(self, Operator::Minus, rhs)
-    }
-}
-
-impl ops::Mul for Expr {
-    type Output = Self;
-
-    fn mul(self, rhs: Self) -> Self {
-        binary_expr(self, Operator::Multiply, rhs)
-    }
-}
-
-impl ops::Div for Expr {
-    type Output = Self;
-
-    fn div(self, rhs: Self) -> Self {
-        binary_expr(self, Operator::Divide, rhs)
-    }
-}
-
-impl ops::Rem for Expr {
-    type Output = Self;
-
-    fn rem(self, rhs: Self) -> Self {
-        binary_expr(self, Operator::Modulo, rhs)
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use crate::prelude::lit;
-
-    #[test]
-    fn test_operators() {
-        assert_eq!(
-            format!("{:?}", lit(1u32) + lit(2u32)),
-            "UInt32(1) + UInt32(2)"
-        );
-        assert_eq!(
-            format!("{:?}", lit(1u32) - lit(2u32)),
-            "UInt32(1) - UInt32(2)"
-        );
-        assert_eq!(
-            format!("{:?}", lit(1u32) * lit(2u32)),
-            "UInt32(1) * UInt32(2)"
-        );
-        assert_eq!(
-            format!("{:?}", lit(1u32) / lit(2u32)),
-            "UInt32(1) / UInt32(2)"
-        );
-        assert_eq!(
-            format!("{:?}", lit(1u32) % lit(2u32)),
-            "UInt32(1) % UInt32(2)"
-        );
-    }
-}
diff --git a/datafusion/src/logical_plan/window_frames.rs b/datafusion-expr/src/window_frame.rs
similarity index 96%
copy from datafusion/src/logical_plan/window_frames.rs
copy to datafusion-expr/src/window_frame.rs
index 42e0a7e..ba65a50 100644
--- a/datafusion/src/logical_plan/window_frames.rs
+++ b/datafusion-expr/src/window_frame.rs
@@ -23,7 +23,7 @@
 //! - An ending frame boundary,
 //! - An EXCLUDE clause.
 
-use crate::error::{DataFusionError, Result};
+use datafusion_common::{DataFusionError, Result};
 use sqlparser::ast;
 use std::cmp::Ordering;
 use std::convert::{From, TryFrom};
@@ -78,9 +78,9 @@ impl TryFrom<ast::WindowFrame> for WindowFrame {
             ))
         } else if start_bound > end_bound {
             Err(DataFusionError::Execution(format!(
-            "Invalid window frame: start bound ({}) cannot be larger than end bound ({})",
-            start_bound, end_bound
-        )))
+        "Invalid window frame: start bound ({}) cannot be larger than end bound ({})",
+        start_bound, end_bound
+      )))
         } else {
             let units = value.units.into();
             if units == WindowFrameUnits::Range {
@@ -268,9 +268,10 @@ mod tests {
         };
         let result = WindowFrame::try_from(window_frame);
         assert_eq!(
-            result.err().unwrap().to_string(),
-            "Execution error: Invalid window frame: start bound cannot be unbounded following".to_owned()
-        );
+      result.err().unwrap().to_string(),
+      "Execution error: Invalid window frame: start bound cannot be unbounded following"
+        .to_owned()
+    );
 
         let window_frame = ast::WindowFrame {
             units: ast::WindowFrameUnits::Range,
@@ -279,9 +280,10 @@ mod tests {
         };
         let result = WindowFrame::try_from(window_frame);
         assert_eq!(
-            result.err().unwrap().to_string(),
-            "Execution error: Invalid window frame: end bound cannot be unbounded preceding".to_owned()
-        );
+      result.err().unwrap().to_string(),
+      "Execution error: Invalid window frame: end bound cannot be unbounded preceding"
+        .to_owned()
+    );
 
         let window_frame = ast::WindowFrame {
             units: ast::WindowFrameUnits::Range,
diff --git a/datafusion/src/logical_plan/operators.rs b/datafusion/src/logical_plan/operators.rs
index 14ccab0..813f7e0 100644
--- a/datafusion/src/logical_plan/operators.rs
+++ b/datafusion/src/logical_plan/operators.rs
@@ -15,88 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{fmt, ops};
-
 use super::{binary_expr, Expr};
-
-/// Operators applied to expressions
-#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash)]
-pub enum Operator {
-    /// Expressions are equal
-    Eq,
-    /// Expressions are not equal
-    NotEq,
-    /// Left side is smaller than right side
-    Lt,
-    /// Left side is smaller or equal to right side
-    LtEq,
-    /// Left side is greater than right side
-    Gt,
-    /// Left side is greater or equal to right side
-    GtEq,
-    /// Addition
-    Plus,
-    /// Subtraction
-    Minus,
-    /// Multiplication operator, like `*`
-    Multiply,
-    /// Division operator, like `/`
-    Divide,
-    /// Remainder operator, like `%`
-    Modulo,
-    /// Logical AND, like `&&`
-    And,
-    /// Logical OR, like `||`
-    Or,
-    /// Matches a wildcard pattern
-    Like,
-    /// Does not match a wildcard pattern
-    NotLike,
-    /// IS DISTINCT FROM
-    IsDistinctFrom,
-    /// IS NOT DISTINCT FROM
-    IsNotDistinctFrom,
-    /// Case sensitive regex match
-    RegexMatch,
-    /// Case insensitive regex match
-    RegexIMatch,
-    /// Case sensitive regex not match
-    RegexNotMatch,
-    /// Case insensitive regex not match
-    RegexNotIMatch,
-    /// Bitwise and, like `&`
-    BitwiseAnd,
-}
-
-impl fmt::Display for Operator {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        let display = match &self {
-            Operator::Eq => "=",
-            Operator::NotEq => "!=",
-            Operator::Lt => "<",
-            Operator::LtEq => "<=",
-            Operator::Gt => ">",
-            Operator::GtEq => ">=",
-            Operator::Plus => "+",
-            Operator::Minus => "-",
-            Operator::Multiply => "*",
-            Operator::Divide => "/",
-            Operator::Modulo => "%",
-            Operator::And => "AND",
-            Operator::Or => "OR",
-            Operator::Like => "LIKE",
-            Operator::NotLike => "NOT LIKE",
-            Operator::RegexMatch => "~",
-            Operator::RegexIMatch => "~*",
-            Operator::RegexNotMatch => "!~",
-            Operator::RegexNotIMatch => "!~*",
-            Operator::IsDistinctFrom => "IS DISTINCT FROM",
-            Operator::IsNotDistinctFrom => "IS NOT DISTINCT FROM",
-            Operator::BitwiseAnd => "&",
-        };
-        write!(f, "{}", display)
-    }
-}
+pub use datafusion_expr::Operator;
+use std::ops;
 
 impl ops::Add for Expr {
     type Output = Self;
diff --git a/datafusion/src/logical_plan/window_frames.rs b/datafusion/src/logical_plan/window_frames.rs
index 42e0a7e..5195820 100644
--- a/datafusion/src/logical_plan/window_frames.rs
+++ b/datafusion/src/logical_plan/window_frames.rs
@@ -15,365 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Window frame
-//!
-//! The frame-spec determines which output rows are read by an aggregate window function. The frame-spec consists of four parts:
-//! - A frame type - either ROWS, RANGE or GROUPS,
-//! - A starting frame boundary,
-//! - An ending frame boundary,
-//! - An EXCLUDE clause.
+//! Window frame types, reimported from datafusion_expr
 
-use crate::error::{DataFusionError, Result};
-use sqlparser::ast;
-use std::cmp::Ordering;
-use std::convert::{From, TryFrom};
-use std::fmt;
-use std::hash::{Hash, Hasher};
-
-/// The frame-spec determines which output rows are read by an aggregate window function.
-///
-/// The ending frame boundary can be omitted (if the BETWEEN and AND keywords that surround the
-/// starting frame boundary are also omitted), in which case the ending frame boundary defaults to
-/// CURRENT ROW.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
-pub struct WindowFrame {
-    /// A frame type - either ROWS, RANGE or GROUPS
-    pub units: WindowFrameUnits,
-    /// A starting frame boundary
-    pub start_bound: WindowFrameBound,
-    /// An ending frame boundary
-    pub end_bound: WindowFrameBound,
-}
-
-impl fmt::Display for WindowFrame {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        write!(
-            f,
-            "{} BETWEEN {} AND {}",
-            self.units, self.start_bound, self.end_bound
-        )?;
-        Ok(())
-    }
-}
-
-impl TryFrom<ast::WindowFrame> for WindowFrame {
-    type Error = DataFusionError;
-
-    fn try_from(value: ast::WindowFrame) -> Result<Self> {
-        let start_bound = value.start_bound.into();
-        let end_bound = value
-            .end_bound
-            .map(WindowFrameBound::from)
-            .unwrap_or(WindowFrameBound::CurrentRow);
-
-        if let WindowFrameBound::Following(None) = start_bound {
-            Err(DataFusionError::Execution(
-                "Invalid window frame: start bound cannot be unbounded following"
-                    .to_owned(),
-            ))
-        } else if let WindowFrameBound::Preceding(None) = end_bound {
-            Err(DataFusionError::Execution(
-                "Invalid window frame: end bound cannot be unbounded preceding"
-                    .to_owned(),
-            ))
-        } else if start_bound > end_bound {
-            Err(DataFusionError::Execution(format!(
-            "Invalid window frame: start bound ({}) cannot be larger than end bound ({})",
-            start_bound, end_bound
-        )))
-        } else {
-            let units = value.units.into();
-            if units == WindowFrameUnits::Range {
-                for bound in &[start_bound, end_bound] {
-                    match bound {
-                        WindowFrameBound::Preceding(Some(v))
-                        | WindowFrameBound::Following(Some(v))
-                            if *v > 0 =>
-                        {
-                            Err(DataFusionError::NotImplemented(format!(
-                                "With WindowFrameUnits={}, the bound cannot be {} PRECEDING or FOLLOWING at the moment",
-                                units, v
-                            )))
-                        }
-                        _ => Ok(()),
-                    }?;
-                }
-            }
-            Ok(Self {
-                units,
-                start_bound,
-                end_bound,
-            })
-        }
-    }
-}
-
-impl Default for WindowFrame {
-    fn default() -> Self {
-        WindowFrame {
-            units: WindowFrameUnits::Range,
-            start_bound: WindowFrameBound::Preceding(None),
-            end_bound: WindowFrameBound::CurrentRow,
-        }
-    }
-}
-
-/// There are five ways to describe starting and ending frame boundaries:
-///
-/// 1. UNBOUNDED PRECEDING
-/// 2. <expr> PRECEDING
-/// 3. CURRENT ROW
-/// 4. <expr> FOLLOWING
-/// 5. UNBOUNDED FOLLOWING
-///
-/// in this implementation we'll only allow <expr> to be u64 (i.e. no dynamic boundary)
-#[derive(Debug, Clone, Copy, Eq)]
-pub enum WindowFrameBound {
-    /// 1. UNBOUNDED PRECEDING
-    /// The frame boundary is the first row in the partition.
-    ///
-    /// 2. <expr> PRECEDING
-    /// <expr> must be a non-negative constant numeric expression. The boundary is a row that
-    /// is <expr> "units" prior to the current row.
-    Preceding(Option<u64>),
-    /// 3. The current row.
-    ///
-    /// For RANGE and GROUPS frame types, peers of the current row are also
-    /// included in the frame, unless specifically excluded by the EXCLUDE clause.
-    /// This is true regardless of whether CURRENT ROW is used as the starting or ending frame
-    /// boundary.
-    CurrentRow,
-    /// 4. This is the same as "<expr> PRECEDING" except that the boundary is <expr> units after the
-    /// current rather than before the current row.
-    ///
-    /// 5. UNBOUNDED FOLLOWING
-    /// The frame boundary is the last row in the partition.
-    Following(Option<u64>),
-}
-
-impl From<ast::WindowFrameBound> for WindowFrameBound {
-    fn from(value: ast::WindowFrameBound) -> Self {
-        match value {
-            ast::WindowFrameBound::Preceding(v) => Self::Preceding(v),
-            ast::WindowFrameBound::Following(v) => Self::Following(v),
-            ast::WindowFrameBound::CurrentRow => Self::CurrentRow,
-        }
-    }
-}
-
-impl fmt::Display for WindowFrameBound {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        match self {
-            WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"),
-            WindowFrameBound::Preceding(None) => f.write_str("UNBOUNDED PRECEDING"),
-            WindowFrameBound::Following(None) => f.write_str("UNBOUNDED FOLLOWING"),
-            WindowFrameBound::Preceding(Some(n)) => write!(f, "{} PRECEDING", n),
-            WindowFrameBound::Following(Some(n)) => write!(f, "{} FOLLOWING", n),
-        }
-    }
-}
-
-impl PartialEq for WindowFrameBound {
-    fn eq(&self, other: &Self) -> bool {
-        self.cmp(other) == Ordering::Equal
-    }
-}
-
-impl PartialOrd for WindowFrameBound {
-    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
-        Some(self.cmp(other))
-    }
-}
-
-impl Ord for WindowFrameBound {
-    fn cmp(&self, other: &Self) -> Ordering {
-        self.get_rank().cmp(&other.get_rank())
-    }
-}
-
-impl Hash for WindowFrameBound {
-    fn hash<H: Hasher>(&self, state: &mut H) {
-        self.get_rank().hash(state)
-    }
-}
-
-impl WindowFrameBound {
-    /// get the rank of this window frame bound.
-    ///
-    /// the rank is a tuple of (u8, u64) because we'll firstly compare the kind and then the value
-    /// which requires special handling e.g. with preceding the larger the value the smaller the
-    /// rank and also for 0 preceding / following it is the same as current row
-    fn get_rank(&self) -> (u8, u64) {
-        match self {
-            WindowFrameBound::Preceding(None) => (0, 0),
-            WindowFrameBound::Following(None) => (4, 0),
-            WindowFrameBound::Preceding(Some(0))
-            | WindowFrameBound::CurrentRow
-            | WindowFrameBound::Following(Some(0)) => (2, 0),
-            WindowFrameBound::Preceding(Some(v)) => (1, u64::MAX - *v),
-            WindowFrameBound::Following(Some(v)) => (3, *v),
-        }
-    }
-}
-
-/// There are three frame types: ROWS, GROUPS, and RANGE. The frame type determines how the
-/// starting and ending boundaries of the frame are measured.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
-pub enum WindowFrameUnits {
-    /// The ROWS frame type means that the starting and ending boundaries for the frame are
-    /// determined by counting individual rows relative to the current row.
-    Rows,
-    /// The RANGE frame type requires that the ORDER BY clause of the window have exactly one
-    /// term. Call that term "X". With the RANGE frame type, the elements of the frame are
-    /// determined by computing the value of expression X for all rows in the partition and framing
-    /// those rows for which the value of X is within a certain range of the value of X for the
-    /// current row.
-    Range,
-    /// The GROUPS frame type means that the starting and ending boundaries are determine
-    /// by counting "groups" relative to the current group. A "group" is a set of rows that all have
-    /// equivalent values for all all terms of the window ORDER BY clause.
-    Groups,
-}
-
-impl fmt::Display for WindowFrameUnits {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        f.write_str(match self {
-            WindowFrameUnits::Rows => "ROWS",
-            WindowFrameUnits::Range => "RANGE",
-            WindowFrameUnits::Groups => "GROUPS",
-        })
-    }
-}
-
-impl From<ast::WindowFrameUnits> for WindowFrameUnits {
-    fn from(value: ast::WindowFrameUnits) -> Self {
-        match value {
-            ast::WindowFrameUnits::Range => Self::Range,
-            ast::WindowFrameUnits::Groups => Self::Groups,
-            ast::WindowFrameUnits::Rows => Self::Rows,
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[test]
-    fn test_window_frame_creation() -> Result<()> {
-        let window_frame = ast::WindowFrame {
-            units: ast::WindowFrameUnits::Range,
-            start_bound: ast::WindowFrameBound::Following(None),
-            end_bound: None,
-        };
-        let result = WindowFrame::try_from(window_frame);
-        assert_eq!(
-            result.err().unwrap().to_string(),
-            "Execution error: Invalid window frame: start bound cannot be unbounded following".to_owned()
-        );
-
-        let window_frame = ast::WindowFrame {
-            units: ast::WindowFrameUnits::Range,
-            start_bound: ast::WindowFrameBound::Preceding(None),
-            end_bound: Some(ast::WindowFrameBound::Preceding(None)),
-        };
-        let result = WindowFrame::try_from(window_frame);
-        assert_eq!(
-            result.err().unwrap().to_string(),
-            "Execution error: Invalid window frame: end bound cannot be unbounded preceding".to_owned()
-        );
-
-        let window_frame = ast::WindowFrame {
-            units: ast::WindowFrameUnits::Range,
-            start_bound: ast::WindowFrameBound::Preceding(Some(1)),
-            end_bound: Some(ast::WindowFrameBound::Preceding(Some(2))),
-        };
-        let result = WindowFrame::try_from(window_frame);
-        assert_eq!(
-            result.err().unwrap().to_string(),
-            "Execution error: Invalid window frame: start bound (1 PRECEDING) cannot be larger than end bound (2 PRECEDING)".to_owned()
-        );
-
-        let window_frame = ast::WindowFrame {
-            units: ast::WindowFrameUnits::Range,
-            start_bound: ast::WindowFrameBound::Preceding(Some(2)),
-            end_bound: Some(ast::WindowFrameBound::Preceding(Some(1))),
-        };
-        let result = WindowFrame::try_from(window_frame);
-        assert_eq!(
-            result.err().unwrap().to_string(),
-            "This feature is not implemented: With WindowFrameUnits=RANGE, the bound cannot be 2 PRECEDING or FOLLOWING at the moment".to_owned()
-        );
-
-        let window_frame = ast::WindowFrame {
-            units: ast::WindowFrameUnits::Rows,
-            start_bound: ast::WindowFrameBound::Preceding(Some(2)),
-            end_bound: Some(ast::WindowFrameBound::Preceding(Some(1))),
-        };
-        let result = WindowFrame::try_from(window_frame);
-        assert!(result.is_ok());
-        Ok(())
-    }
-
-    #[test]
-    fn test_eq() {
-        assert_eq!(
-            WindowFrameBound::Preceding(Some(0)),
-            WindowFrameBound::CurrentRow
-        );
-        assert_eq!(
-            WindowFrameBound::CurrentRow,
-            WindowFrameBound::Following(Some(0))
-        );
-        assert_eq!(
-            WindowFrameBound::Following(Some(2)),
-            WindowFrameBound::Following(Some(2))
-        );
-        assert_eq!(
-            WindowFrameBound::Following(None),
-            WindowFrameBound::Following(None)
-        );
-        assert_eq!(
-            WindowFrameBound::Preceding(Some(2)),
-            WindowFrameBound::Preceding(Some(2))
-        );
-        assert_eq!(
-            WindowFrameBound::Preceding(None),
-            WindowFrameBound::Preceding(None)
-        );
-    }
-
-    #[test]
-    fn test_ord() {
-        assert!(WindowFrameBound::Preceding(Some(1)) < WindowFrameBound::CurrentRow);
-        // ! yes this is correct!
-        assert!(
-            WindowFrameBound::Preceding(Some(2)) < WindowFrameBound::Preceding(Some(1))
-        );
-        assert!(
-            WindowFrameBound::Preceding(Some(u64::MAX))
-                < WindowFrameBound::Preceding(Some(u64::MAX - 1))
-        );
-        assert!(
-            WindowFrameBound::Preceding(None)
-                < WindowFrameBound::Preceding(Some(1000000))
-        );
-        assert!(
-            WindowFrameBound::Preceding(None)
-                < WindowFrameBound::Preceding(Some(u64::MAX))
-        );
-        assert!(WindowFrameBound::Preceding(None) < WindowFrameBound::Following(Some(0)));
-        assert!(
-            WindowFrameBound::Preceding(Some(1)) < WindowFrameBound::Following(Some(1))
-        );
-        assert!(WindowFrameBound::CurrentRow < WindowFrameBound::Following(Some(1)));
-        assert!(
-            WindowFrameBound::Following(Some(1)) < WindowFrameBound::Following(Some(2))
-        );
-        assert!(WindowFrameBound::Following(Some(2)) < WindowFrameBound::Following(None));
-        assert!(
-            WindowFrameBound::Following(Some(u64::MAX))
-                < WindowFrameBound::Following(None)
-        );
-    }
-}
+pub use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};

[arrow-datafusion] 04/06: move signature, type signature, and volatility to datafusion-expr

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-accum
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit c8b589120e3ae8fdf53274a81ac5585aaaba1e5e
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 20:21:26 2022 +0800

    move signature, type signature, and volatility to datafusion-expr
---
 datafusion-expr/src/lib.rs                |   2 +
 datafusion-expr/src/signature.rs          | 116 ++++++++++++++++++++++++++++++
 datafusion/src/physical_plan/functions.rs |  98 +------------------------
 3 files changed, 119 insertions(+), 97 deletions(-)

diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index 13fa93e..d2b10b4 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -17,10 +17,12 @@
 
 mod aggregate_function;
 mod operator;
+mod signature;
 mod window_frame;
 mod window_function;
 
 pub use aggregate_function::AggregateFunction;
 pub use operator::Operator;
+pub use signature::{Signature, TypeSignature, Volatility};
 pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
 pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion-expr/src/signature.rs b/datafusion-expr/src/signature.rs
new file mode 100644
index 0000000..5c27f42
--- /dev/null
+++ b/datafusion-expr/src/signature.rs
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::DataType;
+
+///A function's volatility, which defines the functions eligibility for certain optimizations
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)]
+pub enum Volatility {
+    /// Immutable - An immutable function will always return the same output when given the same input. An example of this is [BuiltinScalarFunction::Cos].
+    Immutable,
+    /// Stable - A stable function may return different values given the same input accross different queries but must return the same value for a given input within a query. An example of this is [BuiltinScalarFunction::Now].
+    Stable,
+    /// Volatile - A volatile function may change the return value from evaluation to evaluation. Mutiple invocations of a volatile function may return different results when used in the same query. An example of this is [BuiltinScalarFunction::Random].
+    Volatile,
+}
+
+/// A function's type signature, which defines the function's supported argument types.
+#[derive(Debug, Clone, PartialEq, Hash)]
+pub enum TypeSignature {
+    /// arbitrary number of arguments of an common type out of a list of valid types
+    // A function such as `concat` is `Variadic(vec![DataType::Utf8, DataType::LargeUtf8])`
+    Variadic(Vec<DataType>),
+    /// arbitrary number of arguments of an arbitrary but equal type
+    // A function such as `array` is `VariadicEqual`
+    // The first argument decides the type used for coercion
+    VariadicEqual,
+    /// fixed number of arguments of an arbitrary but equal type out of a list of valid types
+    // A function of one argument of f64 is `Uniform(1, vec![DataType::Float64])`
+    // A function of one argument of f64 or f32 is `Uniform(1, vec![DataType::Float32, DataType::Float64])`
+    Uniform(usize, Vec<DataType>),
+    /// exact number of arguments of an exact type
+    Exact(Vec<DataType>),
+    /// fixed number of arguments of arbitrary types
+    Any(usize),
+    /// One of a list of signatures
+    OneOf(Vec<TypeSignature>),
+}
+
+///The Signature of a function defines its supported input types as well as its volatility.
+#[derive(Debug, Clone, PartialEq, Hash)]
+pub struct Signature {
+    /// type_signature - The types that the function accepts. See [TypeSignature] for more information.
+    pub type_signature: TypeSignature,
+    /// volatility - The volatility of the function. See [Volatility] for more information.
+    pub volatility: Volatility,
+}
+
+impl Signature {
+    /// new - Creates a new Signature from any type signature and the volatility.
+    pub fn new(type_signature: TypeSignature, volatility: Volatility) -> Self {
+        Signature {
+            type_signature,
+            volatility,
+        }
+    }
+    /// variadic - Creates a variadic signature that represents an arbitrary number of arguments all from a type in common_types.
+    pub fn variadic(common_types: Vec<DataType>, volatility: Volatility) -> Self {
+        Self {
+            type_signature: TypeSignature::Variadic(common_types),
+            volatility,
+        }
+    }
+    /// variadic_equal - Creates a variadic signature that represents an arbitrary number of arguments of the same type.
+    pub fn variadic_equal(volatility: Volatility) -> Self {
+        Self {
+            type_signature: TypeSignature::VariadicEqual,
+            volatility,
+        }
+    }
+    /// uniform - Creates a function with a fixed number of arguments of the same type, which must be from valid_types.
+    pub fn uniform(
+        arg_count: usize,
+        valid_types: Vec<DataType>,
+        volatility: Volatility,
+    ) -> Self {
+        Self {
+            type_signature: TypeSignature::Uniform(arg_count, valid_types),
+            volatility,
+        }
+    }
+    /// exact - Creates a signture which must match the types in exact_types in order.
+    pub fn exact(exact_types: Vec<DataType>, volatility: Volatility) -> Self {
+        Signature {
+            type_signature: TypeSignature::Exact(exact_types),
+            volatility,
+        }
+    }
+    /// any - Creates a signature which can a be made of any type but of a specified number
+    pub fn any(arg_count: usize, volatility: Volatility) -> Self {
+        Signature {
+            type_signature: TypeSignature::Any(arg_count),
+            volatility,
+        }
+    }
+    /// one_of Creates a signature which can match any of the [TypeSignature]s which are passed in.
+    pub fn one_of(type_signatures: Vec<TypeSignature>, volatility: Volatility) -> Self {
+        Signature {
+            type_signature: TypeSignature::OneOf(type_signatures),
+            volatility,
+        }
+    }
+}
diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs
index 7d7cda7..af157c0 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -56,103 +56,7 @@ use fmt::{Debug, Formatter};
 use std::convert::From;
 use std::{any::Any, fmt, str::FromStr, sync::Arc};
 
-/// A function's type signature, which defines the function's supported argument types.
-#[derive(Debug, Clone, PartialEq, Hash)]
-pub enum TypeSignature {
-    /// arbitrary number of arguments of an common type out of a list of valid types
-    // A function such as `concat` is `Variadic(vec![DataType::Utf8, DataType::LargeUtf8])`
-    Variadic(Vec<DataType>),
-    /// arbitrary number of arguments of an arbitrary but equal type
-    // A function such as `array` is `VariadicEqual`
-    // The first argument decides the type used for coercion
-    VariadicEqual,
-    /// fixed number of arguments of an arbitrary but equal type out of a list of valid types
-    // A function of one argument of f64 is `Uniform(1, vec![DataType::Float64])`
-    // A function of one argument of f64 or f32 is `Uniform(1, vec![DataType::Float32, DataType::Float64])`
-    Uniform(usize, Vec<DataType>),
-    /// exact number of arguments of an exact type
-    Exact(Vec<DataType>),
-    /// fixed number of arguments of arbitrary types
-    Any(usize),
-    /// One of a list of signatures
-    OneOf(Vec<TypeSignature>),
-}
-
-///The Signature of a function defines its supported input types as well as its volatility.
-#[derive(Debug, Clone, PartialEq, Hash)]
-pub struct Signature {
-    /// type_signature - The types that the function accepts. See [TypeSignature] for more information.
-    pub type_signature: TypeSignature,
-    /// volatility - The volatility of the function. See [Volatility] for more information.
-    pub volatility: Volatility,
-}
-
-impl Signature {
-    /// new - Creates a new Signature from any type signature and the volatility.
-    pub fn new(type_signature: TypeSignature, volatility: Volatility) -> Self {
-        Signature {
-            type_signature,
-            volatility,
-        }
-    }
-    /// variadic - Creates a variadic signature that represents an arbitrary number of arguments all from a type in common_types.
-    pub fn variadic(common_types: Vec<DataType>, volatility: Volatility) -> Self {
-        Self {
-            type_signature: TypeSignature::Variadic(common_types),
-            volatility,
-        }
-    }
-    /// variadic_equal - Creates a variadic signature that represents an arbitrary number of arguments of the same type.
-    pub fn variadic_equal(volatility: Volatility) -> Self {
-        Self {
-            type_signature: TypeSignature::VariadicEqual,
-            volatility,
-        }
-    }
-    /// uniform - Creates a function with a fixed number of arguments of the same type, which must be from valid_types.
-    pub fn uniform(
-        arg_count: usize,
-        valid_types: Vec<DataType>,
-        volatility: Volatility,
-    ) -> Self {
-        Self {
-            type_signature: TypeSignature::Uniform(arg_count, valid_types),
-            volatility,
-        }
-    }
-    /// exact - Creates a signture which must match the types in exact_types in order.
-    pub fn exact(exact_types: Vec<DataType>, volatility: Volatility) -> Self {
-        Signature {
-            type_signature: TypeSignature::Exact(exact_types),
-            volatility,
-        }
-    }
-    /// any - Creates a signature which can a be made of any type but of a specified number
-    pub fn any(arg_count: usize, volatility: Volatility) -> Self {
-        Signature {
-            type_signature: TypeSignature::Any(arg_count),
-            volatility,
-        }
-    }
-    /// one_of Creates a signature which can match any of the [TypeSignature]s which are passed in.
-    pub fn one_of(type_signatures: Vec<TypeSignature>, volatility: Volatility) -> Self {
-        Signature {
-            type_signature: TypeSignature::OneOf(type_signatures),
-            volatility,
-        }
-    }
-}
-
-///A function's volatility, which defines the functions eligibility for certain optimizations
-#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)]
-pub enum Volatility {
-    /// Immutable - An immutable function will always return the same output when given the same input. An example of this is [BuiltinScalarFunction::Cos].
-    Immutable,
-    /// Stable - A stable function may return different values given the same input accross different queries but must return the same value for a given input within a query. An example of this is [BuiltinScalarFunction::Now].
-    Stable,
-    /// Volatile - A volatile function may change the return value from evaluation to evaluation. Mutiple invocations of a volatile function may return different results when used in the same query. An example of this is [BuiltinScalarFunction::Random].
-    Volatile,
-}
+pub use datafusion_expr::{Signature, TypeSignature, Volatility};
 
 /// Scalar function
 ///

[arrow-datafusion] 05/06: move built-in scalar function

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-accum
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 083a9eb97b689269ebf7ac3132e13c686d19ca42
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 21:06:41 2022 +0800

    move built-in scalar function
---
 datafusion-expr/src/built_in_function.rs  | 330 ++++++++++++++++++++++++++++++
 datafusion-expr/src/lib.rs                |   2 +
 datafusion/src/physical_plan/functions.rs | 311 +---------------------------
 3 files changed, 334 insertions(+), 309 deletions(-)

diff --git a/datafusion-expr/src/built_in_function.rs b/datafusion-expr/src/built_in_function.rs
new file mode 100644
index 0000000..0d5ee97
--- /dev/null
+++ b/datafusion-expr/src/built_in_function.rs
@@ -0,0 +1,330 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Built-in functions
+
+use crate::Volatility;
+use datafusion_common::{DataFusionError, Result};
+use std::fmt;
+use std::str::FromStr;
+
+/// Enum of all built-in scalar functions
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub enum BuiltinScalarFunction {
+    // math functions
+    /// abs
+    Abs,
+    /// acos
+    Acos,
+    /// asin
+    Asin,
+    /// atan
+    Atan,
+    /// ceil
+    Ceil,
+    /// cos
+    Cos,
+    /// Digest
+    Digest,
+    /// exp
+    Exp,
+    /// floor
+    Floor,
+    /// ln, Natural logarithm
+    Ln,
+    /// log, same as log10
+    Log,
+    /// log10
+    Log10,
+    /// log2
+    Log2,
+    /// round
+    Round,
+    /// signum
+    Signum,
+    /// sin
+    Sin,
+    /// sqrt
+    Sqrt,
+    /// tan
+    Tan,
+    /// trunc
+    Trunc,
+
+    // string functions
+    /// construct an array from columns
+    Array,
+    /// ascii
+    Ascii,
+    /// bit_length
+    BitLength,
+    /// btrim
+    Btrim,
+    /// character_length
+    CharacterLength,
+    /// chr
+    Chr,
+    /// concat
+    Concat,
+    /// concat_ws
+    ConcatWithSeparator,
+    /// date_part
+    DatePart,
+    /// date_trunc
+    DateTrunc,
+    /// initcap
+    InitCap,
+    /// left
+    Left,
+    /// lpad
+    Lpad,
+    /// lower
+    Lower,
+    /// ltrim
+    Ltrim,
+    /// md5
+    MD5,
+    /// nullif
+    NullIf,
+    /// octet_length
+    OctetLength,
+    /// random
+    Random,
+    /// regexp_replace
+    RegexpReplace,
+    /// repeat
+    Repeat,
+    /// replace
+    Replace,
+    /// reverse
+    Reverse,
+    /// right
+    Right,
+    /// rpad
+    Rpad,
+    /// rtrim
+    Rtrim,
+    /// sha224
+    SHA224,
+    /// sha256
+    SHA256,
+    /// sha384
+    SHA384,
+    /// Sha512
+    SHA512,
+    /// split_part
+    SplitPart,
+    /// starts_with
+    StartsWith,
+    /// strpos
+    Strpos,
+    /// substr
+    Substr,
+    /// to_hex
+    ToHex,
+    /// to_timestamp
+    ToTimestamp,
+    /// to_timestamp_millis
+    ToTimestampMillis,
+    /// to_timestamp_micros
+    ToTimestampMicros,
+    /// to_timestamp_seconds
+    ToTimestampSeconds,
+    ///now
+    Now,
+    /// translate
+    Translate,
+    /// trim
+    Trim,
+    /// upper
+    Upper,
+    /// regexp_match
+    RegexpMatch,
+}
+
+impl BuiltinScalarFunction {
+    /// an allowlist of functions to take zero arguments, so that they will get special treatment
+    /// while executing.
+    pub fn supports_zero_argument(&self) -> bool {
+        matches!(
+            self,
+            BuiltinScalarFunction::Random | BuiltinScalarFunction::Now
+        )
+    }
+    /// Returns the [Volatility] of the builtin function.
+    pub fn volatility(&self) -> Volatility {
+        match self {
+            //Immutable scalar builtins
+            BuiltinScalarFunction::Abs => Volatility::Immutable,
+            BuiltinScalarFunction::Acos => Volatility::Immutable,
+            BuiltinScalarFunction::Asin => Volatility::Immutable,
+            BuiltinScalarFunction::Atan => Volatility::Immutable,
+            BuiltinScalarFunction::Ceil => Volatility::Immutable,
+            BuiltinScalarFunction::Cos => Volatility::Immutable,
+            BuiltinScalarFunction::Exp => Volatility::Immutable,
+            BuiltinScalarFunction::Floor => Volatility::Immutable,
+            BuiltinScalarFunction::Ln => Volatility::Immutable,
+            BuiltinScalarFunction::Log => Volatility::Immutable,
+            BuiltinScalarFunction::Log10 => Volatility::Immutable,
+            BuiltinScalarFunction::Log2 => Volatility::Immutable,
+            BuiltinScalarFunction::Round => Volatility::Immutable,
+            BuiltinScalarFunction::Signum => Volatility::Immutable,
+            BuiltinScalarFunction::Sin => Volatility::Immutable,
+            BuiltinScalarFunction::Sqrt => Volatility::Immutable,
+            BuiltinScalarFunction::Tan => Volatility::Immutable,
+            BuiltinScalarFunction::Trunc => Volatility::Immutable,
+            BuiltinScalarFunction::Array => Volatility::Immutable,
+            BuiltinScalarFunction::Ascii => Volatility::Immutable,
+            BuiltinScalarFunction::BitLength => Volatility::Immutable,
+            BuiltinScalarFunction::Btrim => Volatility::Immutable,
+            BuiltinScalarFunction::CharacterLength => Volatility::Immutable,
+            BuiltinScalarFunction::Chr => Volatility::Immutable,
+            BuiltinScalarFunction::Concat => Volatility::Immutable,
+            BuiltinScalarFunction::ConcatWithSeparator => Volatility::Immutable,
+            BuiltinScalarFunction::DatePart => Volatility::Immutable,
+            BuiltinScalarFunction::DateTrunc => Volatility::Immutable,
+            BuiltinScalarFunction::InitCap => Volatility::Immutable,
+            BuiltinScalarFunction::Left => Volatility::Immutable,
+            BuiltinScalarFunction::Lpad => Volatility::Immutable,
+            BuiltinScalarFunction::Lower => Volatility::Immutable,
+            BuiltinScalarFunction::Ltrim => Volatility::Immutable,
+            BuiltinScalarFunction::MD5 => Volatility::Immutable,
+            BuiltinScalarFunction::NullIf => Volatility::Immutable,
+            BuiltinScalarFunction::OctetLength => Volatility::Immutable,
+            BuiltinScalarFunction::RegexpReplace => Volatility::Immutable,
+            BuiltinScalarFunction::Repeat => Volatility::Immutable,
+            BuiltinScalarFunction::Replace => Volatility::Immutable,
+            BuiltinScalarFunction::Reverse => Volatility::Immutable,
+            BuiltinScalarFunction::Right => Volatility::Immutable,
+            BuiltinScalarFunction::Rpad => Volatility::Immutable,
+            BuiltinScalarFunction::Rtrim => Volatility::Immutable,
+            BuiltinScalarFunction::SHA224 => Volatility::Immutable,
+            BuiltinScalarFunction::SHA256 => Volatility::Immutable,
+            BuiltinScalarFunction::SHA384 => Volatility::Immutable,
+            BuiltinScalarFunction::SHA512 => Volatility::Immutable,
+            BuiltinScalarFunction::Digest => Volatility::Immutable,
+            BuiltinScalarFunction::SplitPart => Volatility::Immutable,
+            BuiltinScalarFunction::StartsWith => Volatility::Immutable,
+            BuiltinScalarFunction::Strpos => Volatility::Immutable,
+            BuiltinScalarFunction::Substr => Volatility::Immutable,
+            BuiltinScalarFunction::ToHex => Volatility::Immutable,
+            BuiltinScalarFunction::ToTimestamp => Volatility::Immutable,
+            BuiltinScalarFunction::ToTimestampMillis => Volatility::Immutable,
+            BuiltinScalarFunction::ToTimestampMicros => Volatility::Immutable,
+            BuiltinScalarFunction::ToTimestampSeconds => Volatility::Immutable,
+            BuiltinScalarFunction::Translate => Volatility::Immutable,
+            BuiltinScalarFunction::Trim => Volatility::Immutable,
+            BuiltinScalarFunction::Upper => Volatility::Immutable,
+            BuiltinScalarFunction::RegexpMatch => Volatility::Immutable,
+
+            //Stable builtin functions
+            BuiltinScalarFunction::Now => Volatility::Stable,
+
+            //Volatile builtin functions
+            BuiltinScalarFunction::Random => Volatility::Volatile,
+        }
+    }
+}
+
+impl fmt::Display for BuiltinScalarFunction {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        // lowercase of the debug.
+        write!(f, "{}", format!("{:?}", self).to_lowercase())
+    }
+}
+
+impl FromStr for BuiltinScalarFunction {
+    type Err = DataFusionError;
+    fn from_str(name: &str) -> Result<BuiltinScalarFunction> {
+        Ok(match name {
+            // math functions
+            "abs" => BuiltinScalarFunction::Abs,
+            "acos" => BuiltinScalarFunction::Acos,
+            "asin" => BuiltinScalarFunction::Asin,
+            "atan" => BuiltinScalarFunction::Atan,
+            "ceil" => BuiltinScalarFunction::Ceil,
+            "cos" => BuiltinScalarFunction::Cos,
+            "exp" => BuiltinScalarFunction::Exp,
+            "floor" => BuiltinScalarFunction::Floor,
+            "ln" => BuiltinScalarFunction::Ln,
+            "log" => BuiltinScalarFunction::Log,
+            "log10" => BuiltinScalarFunction::Log10,
+            "log2" => BuiltinScalarFunction::Log2,
+            "round" => BuiltinScalarFunction::Round,
+            "signum" => BuiltinScalarFunction::Signum,
+            "sin" => BuiltinScalarFunction::Sin,
+            "sqrt" => BuiltinScalarFunction::Sqrt,
+            "tan" => BuiltinScalarFunction::Tan,
+            "trunc" => BuiltinScalarFunction::Trunc,
+
+            // string functions
+            "array" => BuiltinScalarFunction::Array,
+            "ascii" => BuiltinScalarFunction::Ascii,
+            "bit_length" => BuiltinScalarFunction::BitLength,
+            "btrim" => BuiltinScalarFunction::Btrim,
+            "char_length" => BuiltinScalarFunction::CharacterLength,
+            "character_length" => BuiltinScalarFunction::CharacterLength,
+            "concat" => BuiltinScalarFunction::Concat,
+            "concat_ws" => BuiltinScalarFunction::ConcatWithSeparator,
+            "chr" => BuiltinScalarFunction::Chr,
+            "date_part" | "datepart" => BuiltinScalarFunction::DatePart,
+            "date_trunc" | "datetrunc" => BuiltinScalarFunction::DateTrunc,
+            "initcap" => BuiltinScalarFunction::InitCap,
+            "left" => BuiltinScalarFunction::Left,
+            "length" => BuiltinScalarFunction::CharacterLength,
+            "lower" => BuiltinScalarFunction::Lower,
+            "lpad" => BuiltinScalarFunction::Lpad,
+            "ltrim" => BuiltinScalarFunction::Ltrim,
+            "md5" => BuiltinScalarFunction::MD5,
+            "nullif" => BuiltinScalarFunction::NullIf,
+            "octet_length" => BuiltinScalarFunction::OctetLength,
+            "random" => BuiltinScalarFunction::Random,
+            "regexp_replace" => BuiltinScalarFunction::RegexpReplace,
+            "repeat" => BuiltinScalarFunction::Repeat,
+            "replace" => BuiltinScalarFunction::Replace,
+            "reverse" => BuiltinScalarFunction::Reverse,
+            "right" => BuiltinScalarFunction::Right,
+            "rpad" => BuiltinScalarFunction::Rpad,
+            "rtrim" => BuiltinScalarFunction::Rtrim,
+            "sha224" => BuiltinScalarFunction::SHA224,
+            "sha256" => BuiltinScalarFunction::SHA256,
+            "sha384" => BuiltinScalarFunction::SHA384,
+            "sha512" => BuiltinScalarFunction::SHA512,
+            "digest" => BuiltinScalarFunction::Digest,
+            "split_part" => BuiltinScalarFunction::SplitPart,
+            "starts_with" => BuiltinScalarFunction::StartsWith,
+            "strpos" => BuiltinScalarFunction::Strpos,
+            "substr" => BuiltinScalarFunction::Substr,
+            "to_hex" => BuiltinScalarFunction::ToHex,
+            "to_timestamp" => BuiltinScalarFunction::ToTimestamp,
+            "to_timestamp_millis" => BuiltinScalarFunction::ToTimestampMillis,
+            "to_timestamp_micros" => BuiltinScalarFunction::ToTimestampMicros,
+            "to_timestamp_seconds" => BuiltinScalarFunction::ToTimestampSeconds,
+            "now" => BuiltinScalarFunction::Now,
+            "translate" => BuiltinScalarFunction::Translate,
+            "trim" => BuiltinScalarFunction::Trim,
+            "upper" => BuiltinScalarFunction::Upper,
+            "regexp_match" => BuiltinScalarFunction::RegexpMatch,
+            _ => {
+                return Err(DataFusionError::Plan(format!(
+                    "There is no built-in function named {}",
+                    name
+                )))
+            }
+        })
+    }
+}
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index d2b10b4..7dcddc3 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -16,12 +16,14 @@
 // under the License.
 
 mod aggregate_function;
+mod built_in_function;
 mod operator;
 mod signature;
 mod window_frame;
 mod window_function;
 
 pub use aggregate_function::AggregateFunction;
+pub use built_in_function::BuiltinScalarFunction;
 pub use operator::Operator;
 pub use signature::{Signature, TypeSignature, Volatility};
 pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs
index af157c0..9582eec 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -54,9 +54,9 @@ use arrow::{
 };
 use fmt::{Debug, Formatter};
 use std::convert::From;
-use std::{any::Any, fmt, str::FromStr, sync::Arc};
+use std::{any::Any, fmt, sync::Arc};
 
-pub use datafusion_expr::{Signature, TypeSignature, Volatility};
+pub use datafusion_expr::{BuiltinScalarFunction, Signature, TypeSignature, Volatility};
 
 /// Scalar function
 ///
@@ -73,313 +73,6 @@ pub type ScalarFunctionImplementation =
 pub type ReturnTypeFunction =
     Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
 
-/// Enum of all built-in scalar functions
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub enum BuiltinScalarFunction {
-    // math functions
-    /// abs
-    Abs,
-    /// acos
-    Acos,
-    /// asin
-    Asin,
-    /// atan
-    Atan,
-    /// ceil
-    Ceil,
-    /// cos
-    Cos,
-    /// Digest
-    Digest,
-    /// exp
-    Exp,
-    /// floor
-    Floor,
-    /// ln, Natural logarithm
-    Ln,
-    /// log, same as log10
-    Log,
-    /// log10
-    Log10,
-    /// log2
-    Log2,
-    /// round
-    Round,
-    /// signum
-    Signum,
-    /// sin
-    Sin,
-    /// sqrt
-    Sqrt,
-    /// tan
-    Tan,
-    /// trunc
-    Trunc,
-
-    // string functions
-    /// construct an array from columns
-    Array,
-    /// ascii
-    Ascii,
-    /// bit_length
-    BitLength,
-    /// btrim
-    Btrim,
-    /// character_length
-    CharacterLength,
-    /// chr
-    Chr,
-    /// concat
-    Concat,
-    /// concat_ws
-    ConcatWithSeparator,
-    /// date_part
-    DatePart,
-    /// date_trunc
-    DateTrunc,
-    /// initcap
-    InitCap,
-    /// left
-    Left,
-    /// lpad
-    Lpad,
-    /// lower
-    Lower,
-    /// ltrim
-    Ltrim,
-    /// md5
-    MD5,
-    /// nullif
-    NullIf,
-    /// octet_length
-    OctetLength,
-    /// random
-    Random,
-    /// regexp_replace
-    RegexpReplace,
-    /// repeat
-    Repeat,
-    /// replace
-    Replace,
-    /// reverse
-    Reverse,
-    /// right
-    Right,
-    /// rpad
-    Rpad,
-    /// rtrim
-    Rtrim,
-    /// sha224
-    SHA224,
-    /// sha256
-    SHA256,
-    /// sha384
-    SHA384,
-    /// Sha512
-    SHA512,
-    /// split_part
-    SplitPart,
-    /// starts_with
-    StartsWith,
-    /// strpos
-    Strpos,
-    /// substr
-    Substr,
-    /// to_hex
-    ToHex,
-    /// to_timestamp
-    ToTimestamp,
-    /// to_timestamp_millis
-    ToTimestampMillis,
-    /// to_timestamp_micros
-    ToTimestampMicros,
-    /// to_timestamp_seconds
-    ToTimestampSeconds,
-    ///now
-    Now,
-    /// translate
-    Translate,
-    /// trim
-    Trim,
-    /// upper
-    Upper,
-    /// regexp_match
-    RegexpMatch,
-}
-
-impl BuiltinScalarFunction {
-    /// an allowlist of functions to take zero arguments, so that they will get special treatment
-    /// while executing.
-    fn supports_zero_argument(&self) -> bool {
-        matches!(
-            self,
-            BuiltinScalarFunction::Random | BuiltinScalarFunction::Now
-        )
-    }
-    /// Returns the [Volatility] of the builtin function.
-    pub fn volatility(&self) -> Volatility {
-        match self {
-            //Immutable scalar builtins
-            BuiltinScalarFunction::Abs => Volatility::Immutable,
-            BuiltinScalarFunction::Acos => Volatility::Immutable,
-            BuiltinScalarFunction::Asin => Volatility::Immutable,
-            BuiltinScalarFunction::Atan => Volatility::Immutable,
-            BuiltinScalarFunction::Ceil => Volatility::Immutable,
-            BuiltinScalarFunction::Cos => Volatility::Immutable,
-            BuiltinScalarFunction::Exp => Volatility::Immutable,
-            BuiltinScalarFunction::Floor => Volatility::Immutable,
-            BuiltinScalarFunction::Ln => Volatility::Immutable,
-            BuiltinScalarFunction::Log => Volatility::Immutable,
-            BuiltinScalarFunction::Log10 => Volatility::Immutable,
-            BuiltinScalarFunction::Log2 => Volatility::Immutable,
-            BuiltinScalarFunction::Round => Volatility::Immutable,
-            BuiltinScalarFunction::Signum => Volatility::Immutable,
-            BuiltinScalarFunction::Sin => Volatility::Immutable,
-            BuiltinScalarFunction::Sqrt => Volatility::Immutable,
-            BuiltinScalarFunction::Tan => Volatility::Immutable,
-            BuiltinScalarFunction::Trunc => Volatility::Immutable,
-            BuiltinScalarFunction::Array => Volatility::Immutable,
-            BuiltinScalarFunction::Ascii => Volatility::Immutable,
-            BuiltinScalarFunction::BitLength => Volatility::Immutable,
-            BuiltinScalarFunction::Btrim => Volatility::Immutable,
-            BuiltinScalarFunction::CharacterLength => Volatility::Immutable,
-            BuiltinScalarFunction::Chr => Volatility::Immutable,
-            BuiltinScalarFunction::Concat => Volatility::Immutable,
-            BuiltinScalarFunction::ConcatWithSeparator => Volatility::Immutable,
-            BuiltinScalarFunction::DatePart => Volatility::Immutable,
-            BuiltinScalarFunction::DateTrunc => Volatility::Immutable,
-            BuiltinScalarFunction::InitCap => Volatility::Immutable,
-            BuiltinScalarFunction::Left => Volatility::Immutable,
-            BuiltinScalarFunction::Lpad => Volatility::Immutable,
-            BuiltinScalarFunction::Lower => Volatility::Immutable,
-            BuiltinScalarFunction::Ltrim => Volatility::Immutable,
-            BuiltinScalarFunction::MD5 => Volatility::Immutable,
-            BuiltinScalarFunction::NullIf => Volatility::Immutable,
-            BuiltinScalarFunction::OctetLength => Volatility::Immutable,
-            BuiltinScalarFunction::RegexpReplace => Volatility::Immutable,
-            BuiltinScalarFunction::Repeat => Volatility::Immutable,
-            BuiltinScalarFunction::Replace => Volatility::Immutable,
-            BuiltinScalarFunction::Reverse => Volatility::Immutable,
-            BuiltinScalarFunction::Right => Volatility::Immutable,
-            BuiltinScalarFunction::Rpad => Volatility::Immutable,
-            BuiltinScalarFunction::Rtrim => Volatility::Immutable,
-            BuiltinScalarFunction::SHA224 => Volatility::Immutable,
-            BuiltinScalarFunction::SHA256 => Volatility::Immutable,
-            BuiltinScalarFunction::SHA384 => Volatility::Immutable,
-            BuiltinScalarFunction::SHA512 => Volatility::Immutable,
-            BuiltinScalarFunction::Digest => Volatility::Immutable,
-            BuiltinScalarFunction::SplitPart => Volatility::Immutable,
-            BuiltinScalarFunction::StartsWith => Volatility::Immutable,
-            BuiltinScalarFunction::Strpos => Volatility::Immutable,
-            BuiltinScalarFunction::Substr => Volatility::Immutable,
-            BuiltinScalarFunction::ToHex => Volatility::Immutable,
-            BuiltinScalarFunction::ToTimestamp => Volatility::Immutable,
-            BuiltinScalarFunction::ToTimestampMillis => Volatility::Immutable,
-            BuiltinScalarFunction::ToTimestampMicros => Volatility::Immutable,
-            BuiltinScalarFunction::ToTimestampSeconds => Volatility::Immutable,
-            BuiltinScalarFunction::Translate => Volatility::Immutable,
-            BuiltinScalarFunction::Trim => Volatility::Immutable,
-            BuiltinScalarFunction::Upper => Volatility::Immutable,
-            BuiltinScalarFunction::RegexpMatch => Volatility::Immutable,
-
-            //Stable builtin functions
-            BuiltinScalarFunction::Now => Volatility::Stable,
-
-            //Volatile builtin functions
-            BuiltinScalarFunction::Random => Volatility::Volatile,
-        }
-    }
-}
-
-impl fmt::Display for BuiltinScalarFunction {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        // lowercase of the debug.
-        write!(f, "{}", format!("{:?}", self).to_lowercase())
-    }
-}
-
-impl FromStr for BuiltinScalarFunction {
-    type Err = DataFusionError;
-    fn from_str(name: &str) -> Result<BuiltinScalarFunction> {
-        Ok(match name {
-            // math functions
-            "abs" => BuiltinScalarFunction::Abs,
-            "acos" => BuiltinScalarFunction::Acos,
-            "asin" => BuiltinScalarFunction::Asin,
-            "atan" => BuiltinScalarFunction::Atan,
-            "ceil" => BuiltinScalarFunction::Ceil,
-            "cos" => BuiltinScalarFunction::Cos,
-            "exp" => BuiltinScalarFunction::Exp,
-            "floor" => BuiltinScalarFunction::Floor,
-            "ln" => BuiltinScalarFunction::Ln,
-            "log" => BuiltinScalarFunction::Log,
-            "log10" => BuiltinScalarFunction::Log10,
-            "log2" => BuiltinScalarFunction::Log2,
-            "round" => BuiltinScalarFunction::Round,
-            "signum" => BuiltinScalarFunction::Signum,
-            "sin" => BuiltinScalarFunction::Sin,
-            "sqrt" => BuiltinScalarFunction::Sqrt,
-            "tan" => BuiltinScalarFunction::Tan,
-            "trunc" => BuiltinScalarFunction::Trunc,
-
-            // string functions
-            "array" => BuiltinScalarFunction::Array,
-            "ascii" => BuiltinScalarFunction::Ascii,
-            "bit_length" => BuiltinScalarFunction::BitLength,
-            "btrim" => BuiltinScalarFunction::Btrim,
-            "char_length" => BuiltinScalarFunction::CharacterLength,
-            "character_length" => BuiltinScalarFunction::CharacterLength,
-            "concat" => BuiltinScalarFunction::Concat,
-            "concat_ws" => BuiltinScalarFunction::ConcatWithSeparator,
-            "chr" => BuiltinScalarFunction::Chr,
-            "date_part" | "datepart" => BuiltinScalarFunction::DatePart,
-            "date_trunc" | "datetrunc" => BuiltinScalarFunction::DateTrunc,
-            "initcap" => BuiltinScalarFunction::InitCap,
-            "left" => BuiltinScalarFunction::Left,
-            "length" => BuiltinScalarFunction::CharacterLength,
-            "lower" => BuiltinScalarFunction::Lower,
-            "lpad" => BuiltinScalarFunction::Lpad,
-            "ltrim" => BuiltinScalarFunction::Ltrim,
-            "md5" => BuiltinScalarFunction::MD5,
-            "nullif" => BuiltinScalarFunction::NullIf,
-            "octet_length" => BuiltinScalarFunction::OctetLength,
-            "random" => BuiltinScalarFunction::Random,
-            "regexp_replace" => BuiltinScalarFunction::RegexpReplace,
-            "repeat" => BuiltinScalarFunction::Repeat,
-            "replace" => BuiltinScalarFunction::Replace,
-            "reverse" => BuiltinScalarFunction::Reverse,
-            "right" => BuiltinScalarFunction::Right,
-            "rpad" => BuiltinScalarFunction::Rpad,
-            "rtrim" => BuiltinScalarFunction::Rtrim,
-            "sha224" => BuiltinScalarFunction::SHA224,
-            "sha256" => BuiltinScalarFunction::SHA256,
-            "sha384" => BuiltinScalarFunction::SHA384,
-            "sha512" => BuiltinScalarFunction::SHA512,
-            "digest" => BuiltinScalarFunction::Digest,
-            "split_part" => BuiltinScalarFunction::SplitPart,
-            "starts_with" => BuiltinScalarFunction::StartsWith,
-            "strpos" => BuiltinScalarFunction::Strpos,
-            "substr" => BuiltinScalarFunction::Substr,
-            "to_hex" => BuiltinScalarFunction::ToHex,
-            "to_timestamp" => BuiltinScalarFunction::ToTimestamp,
-            "to_timestamp_millis" => BuiltinScalarFunction::ToTimestampMillis,
-            "to_timestamp_micros" => BuiltinScalarFunction::ToTimestampMicros,
-            "to_timestamp_seconds" => BuiltinScalarFunction::ToTimestampSeconds,
-            "now" => BuiltinScalarFunction::Now,
-            "translate" => BuiltinScalarFunction::Translate,
-            "trim" => BuiltinScalarFunction::Trim,
-            "upper" => BuiltinScalarFunction::Upper,
-            "regexp_match" => BuiltinScalarFunction::RegexpMatch,
-            _ => {
-                return Err(DataFusionError::Plan(format!(
-                    "There is no built-in function named {}",
-                    name
-                )))
-            }
-        })
-    }
-}
-
 macro_rules! make_utf8_to_return_type {
     ($FUNC:ident, $largeUtf8Type:expr, $utf8Type:expr) => {
         fn $FUNC(arg_type: &DataType, name: &str) -> Result<DataType> {

[arrow-datafusion] 06/06: move accumulator and columnar value

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-accum
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit caf44b252dbaa6a6f97e8d870784b9109b2fcd0d
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 21:52:45 2022 +0800

    move accumulator and columnar value
---
 datafusion-expr/src/accumulator.rs        | 44 +++++++++++++++++++++++
 datafusion-expr/src/columnar_value.rs     | 60 +++++++++++++++++++++++++++++++
 datafusion-expr/src/lib.rs                |  4 +++
 datafusion/src/physical_plan/functions.rs | 14 ++------
 datafusion/src/physical_plan/mod.rs       | 52 ++-------------------------
 5 files changed, 112 insertions(+), 62 deletions(-)

diff --git a/datafusion-expr/src/accumulator.rs b/datafusion-expr/src/accumulator.rs
new file mode 100644
index 0000000..599bd36
--- /dev/null
+++ b/datafusion-expr/src/accumulator.rs
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::ArrayRef;
+use datafusion_common::{Result, ScalarValue};
+use std::fmt::Debug;
+
+/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
+/// generically accumulates values.
+///
+/// An accumulator knows how to:
+/// * update its state from inputs via `update_batch`
+/// * convert its internal state to a vector of scalar values
+/// * update its state from multiple accumulators' states via `merge_batch`
+/// * compute the final value from its internal state via `evaluate`
+pub trait Accumulator: Send + Sync + Debug {
+    /// Returns the state of the accumulator at the end of the accumulation.
+    // in the case of an average on which we track `sum` and `n`, this function should return a vector
+    // of two values, sum and n.
+    fn state(&self) -> Result<Vec<ScalarValue>>;
+
+    /// updates the accumulator's state from a vector of arrays.
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
+
+    /// updates the accumulator's state from a vector of states.
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;
+
+    /// returns its value based on its current state.
+    fn evaluate(&self) -> Result<ScalarValue>;
+}
diff --git a/datafusion-expr/src/columnar_value.rs b/datafusion-expr/src/columnar_value.rs
new file mode 100644
index 0000000..5e6959d
--- /dev/null
+++ b/datafusion-expr/src/columnar_value.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::ArrayRef;
+use arrow::array::NullArray;
+use arrow::datatypes::DataType;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::ScalarValue;
+use std::sync::Arc;
+
+/// Represents the result from an expression
+#[derive(Clone)]
+pub enum ColumnarValue {
+    /// Array of values
+    Array(ArrayRef),
+    /// A single value
+    Scalar(ScalarValue),
+}
+
+impl ColumnarValue {
+    pub fn data_type(&self) -> DataType {
+        match self {
+            ColumnarValue::Array(array_value) => array_value.data_type().clone(),
+            ColumnarValue::Scalar(scalar_value) => scalar_value.get_datatype(),
+        }
+    }
+
+    /// Convert a columnar value into an ArrayRef
+    pub fn into_array(self, num_rows: usize) -> ArrayRef {
+        match self {
+            ColumnarValue::Array(array) => array,
+            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
+        }
+    }
+}
+
+/// null columnar values are implemented as a null array in order to pass batch
+/// num_rows
+pub type NullColumnarValue = ColumnarValue;
+
+impl From<&RecordBatch> for NullColumnarValue {
+    fn from(batch: &RecordBatch) -> Self {
+        let num_rows = batch.num_rows();
+        ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
+    }
+}
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index 7dcddc3..2491fcf 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -15,15 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod accumulator;
 mod aggregate_function;
 mod built_in_function;
+mod columnar_value;
 mod operator;
 mod signature;
 mod window_frame;
 mod window_function;
 
+pub use accumulator::Accumulator;
 pub use aggregate_function::AggregateFunction;
 pub use built_in_function::BuiltinScalarFunction;
+pub use columnar_value::{ColumnarValue, NullColumnarValue};
 pub use operator::Operator;
 pub use signature::{Signature, TypeSignature, Volatility};
 pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs
index 9582eec..56e8f17 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -46,12 +46,13 @@ use crate::{
     scalar::ScalarValue,
 };
 use arrow::{
-    array::{ArrayRef, NullArray},
+    array::ArrayRef,
     compute::kernels::length::{bit_length, length},
     datatypes::TimeUnit,
     datatypes::{DataType, Field, Int32Type, Int64Type, Schema},
     record_batch::RecordBatch,
 };
+pub use datafusion_expr::NullColumnarValue;
 use fmt::{Debug, Formatter};
 use std::convert::From;
 use std::{any::Any, fmt, sync::Arc};
@@ -1206,17 +1207,6 @@ impl fmt::Display for ScalarFunctionExpr {
     }
 }
 
-/// null columnar values are implemented as a null array in order to pass batch
-/// num_rows
-type NullColumnarValue = ColumnarValue;
-
-impl From<&RecordBatch> for NullColumnarValue {
-    fn from(batch: &RecordBatch) -> Self {
-        let num_rows = batch.num_rows();
-        ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
-    }
-}
-
 impl PhysicalExpr for ScalarFunctionExpr {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index ac70f2f..38a19db 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -35,6 +35,8 @@ use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
 use arrow::{array::ArrayRef, datatypes::Field};
 use async_trait::async_trait;
+pub use datafusion_expr::Accumulator;
+pub use datafusion_expr::ColumnarValue;
 pub use display::DisplayFormatType;
 use futures::stream::Stream;
 use std::fmt;
@@ -419,32 +421,6 @@ pub enum Distribution {
     HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
 }
 
-/// Represents the result from an expression
-#[derive(Clone)]
-pub enum ColumnarValue {
-    /// Array of values
-    Array(ArrayRef),
-    /// A single value
-    Scalar(ScalarValue),
-}
-
-impl ColumnarValue {
-    fn data_type(&self) -> DataType {
-        match self {
-            ColumnarValue::Array(array_value) => array_value.data_type().clone(),
-            ColumnarValue::Scalar(scalar_value) => scalar_value.get_datatype(),
-        }
-    }
-
-    /// Convert a columnar value into an ArrayRef
-    pub fn into_array(self, num_rows: usize) -> ArrayRef {
-        match self {
-            ColumnarValue::Array(array) => array,
-            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
-        }
-    }
-}
-
 /// Expression that can be evaluated against a RecordBatch
 /// A Physical expression knows its type, nullability and how to evaluate itself.
 pub trait PhysicalExpr: Send + Sync + Display + Debug {
@@ -578,30 +554,6 @@ pub trait WindowExpr: Send + Sync + Debug {
     }
 }
 
-/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
-/// generically accumulates values.
-///
-/// An accumulator knows how to:
-/// * update its state from inputs via `update_batch`
-/// * convert its internal state to a vector of scalar values
-/// * update its state from multiple accumulators' states via `merge_batch`
-/// * compute the final value from its internal state via `evaluate`
-pub trait Accumulator: Send + Sync + Debug {
-    /// Returns the state of the accumulator at the end of the accumulation.
-    // in the case of an average on which we track `sum` and `n`, this function should return a vector
-    // of two values, sum and n.
-    fn state(&self) -> Result<Vec<ScalarValue>>;
-
-    /// updates the accumulator's state from a vector of arrays.
-    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
-
-    /// updates the accumulator's state from a vector of states.
-    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;
-
-    /// returns its value based on its current state.
-    fn evaluate(&self) -> Result<ScalarValue>;
-}
-
 /// Applies an optional projection to a [`SchemaRef`], returning the
 /// projected schema
 ///

[arrow-datafusion] 01/06: add datafusion-expr module

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-accum
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 2f67cf5cb95f277a1470547165f77b7090035670
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 13:45:28 2022 +0800

    add datafusion-expr module
---
 Cargo.toml                                       |   1 +
 Cargo.toml => datafusion-expr/Cargo.toml         |  38 ++++---
 datafusion-expr/src/aggregate_function.rs        |  93 ++++++++++++++++
 datafusion-expr/src/lib.rs                       |  22 ++++
 datafusion-expr/src/window_function.rs           | 133 +++++++++++++++++++++++
 datafusion/Cargo.toml                            |   1 +
 datafusion/src/physical_plan/aggregates.rs       |  76 +------------
 datafusion/src/physical_plan/window_functions.rs | 119 +-------------------
 8 files changed, 277 insertions(+), 206 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 81f6bb5..f74f53c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -19,6 +19,7 @@
 members = [
     "datafusion",
     "datafusion-common",
+    "datafusion-expr",
     "datafusion-cli",
     "datafusion-examples",
     "benchmarks",
diff --git a/Cargo.toml b/datafusion-expr/Cargo.toml
similarity index 54%
copy from Cargo.toml
copy to datafusion-expr/Cargo.toml
index 81f6bb5..3cac735 100644
--- a/Cargo.toml
+++ b/datafusion-expr/Cargo.toml
@@ -15,20 +15,26 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[workspace]
-members = [
-    "datafusion",
-    "datafusion-common",
-    "datafusion-cli",
-    "datafusion-examples",
-    "benchmarks",
-    "ballista/rust/client",
-    "ballista/rust/core",
-    "ballista/rust/executor",
-    "ballista/rust/scheduler",
-    "ballista-examples",
-]
+[package]
+name = "datafusion-expr"
+description = "DataFusion is an in-memory query engine that uses Apache Arrow as the memory model"
+version = "6.0.0"
+homepage = "https://github.com/apache/arrow-datafusion"
+repository = "https://github.com/apache/arrow-datafusion"
+readme = "../README.md"
+authors = ["Apache Arrow <de...@arrow.apache.org>"]
+license = "Apache-2.0"
+keywords = [ "arrow", "query", "sql" ]
+publish = false
+edition = "2021"
+rust-version = "1.58"
 
-[profile.release]
-lto = true
-codegen-units = 1
+[lib]
+name = "datafusion_expr"
+path = "src/lib.rs"
+
+[features]
+
+[dependencies]
+datafusion-common = { path = "../datafusion-common" }
+arrow = { version = "8.0.0", features = ["prettyprint"] }
diff --git a/datafusion-expr/src/aggregate_function.rs b/datafusion-expr/src/aggregate_function.rs
new file mode 100644
index 0000000..8f12e88
--- /dev/null
+++ b/datafusion-expr/src/aggregate_function.rs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::{DataFusionError, Result};
+use std::{fmt, str::FromStr};
+
+/// Enum of all built-in aggregate functions
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
+pub enum AggregateFunction {
+    /// count
+    Count,
+    /// sum
+    Sum,
+    /// min
+    Min,
+    /// max
+    Max,
+    /// avg
+    Avg,
+    /// Approximate aggregate function
+    ApproxDistinct,
+    /// array_agg
+    ArrayAgg,
+    /// Variance (Sample)
+    Variance,
+    /// Variance (Population)
+    VariancePop,
+    /// Standard Deviation (Sample)
+    Stddev,
+    /// Standard Deviation (Population)
+    StddevPop,
+    /// Covariance (Sample)
+    Covariance,
+    /// Covariance (Population)
+    CovariancePop,
+    /// Correlation
+    Correlation,
+    /// Approximate continuous percentile function
+    ApproxPercentileCont,
+}
+
+impl fmt::Display for AggregateFunction {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        // uppercase of the debug.
+        write!(f, "{}", format!("{:?}", self).to_uppercase())
+    }
+}
+
+impl FromStr for AggregateFunction {
+    type Err = DataFusionError;
+    fn from_str(name: &str) -> Result<AggregateFunction> {
+        Ok(match name {
+            "min" => AggregateFunction::Min,
+            "max" => AggregateFunction::Max,
+            "count" => AggregateFunction::Count,
+            "avg" => AggregateFunction::Avg,
+            "sum" => AggregateFunction::Sum,
+            "approx_distinct" => AggregateFunction::ApproxDistinct,
+            "array_agg" => AggregateFunction::ArrayAgg,
+            "var" => AggregateFunction::Variance,
+            "var_samp" => AggregateFunction::Variance,
+            "var_pop" => AggregateFunction::VariancePop,
+            "stddev" => AggregateFunction::Stddev,
+            "stddev_samp" => AggregateFunction::Stddev,
+            "stddev_pop" => AggregateFunction::StddevPop,
+            "covar" => AggregateFunction::Covariance,
+            "covar_samp" => AggregateFunction::Covariance,
+            "covar_pop" => AggregateFunction::CovariancePop,
+            "corr" => AggregateFunction::Correlation,
+            "approx_percentile_cont" => AggregateFunction::ApproxPercentileCont,
+            _ => {
+                return Err(DataFusionError::Plan(format!(
+                    "There is no built-in function named {}",
+                    name
+                )));
+            }
+        })
+    }
+}
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
new file mode 100644
index 0000000..b6eaaf7
--- /dev/null
+++ b/datafusion-expr/src/lib.rs
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod aggregate_function;
+mod window_function;
+
+pub use aggregate_function::AggregateFunction;
+pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion-expr/src/window_function.rs b/datafusion-expr/src/window_function.rs
new file mode 100644
index 0000000..2511874
--- /dev/null
+++ b/datafusion-expr/src/window_function.rs
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::aggregate_function::AggregateFunction;
+use datafusion_common::{DataFusionError, Result};
+use std::{fmt, str::FromStr};
+
+/// WindowFunction
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub enum WindowFunction {
+    /// window function that leverages an aggregate function
+    AggregateFunction(AggregateFunction),
+    /// window function that leverages a built-in window function
+    BuiltInWindowFunction(BuiltInWindowFunction),
+}
+
+impl FromStr for WindowFunction {
+    type Err = DataFusionError;
+    fn from_str(name: &str) -> Result<WindowFunction> {
+        let name = name.to_lowercase();
+        if let Ok(aggregate) = AggregateFunction::from_str(name.as_str()) {
+            Ok(WindowFunction::AggregateFunction(aggregate))
+        } else if let Ok(built_in_function) =
+            BuiltInWindowFunction::from_str(name.as_str())
+        {
+            Ok(WindowFunction::BuiltInWindowFunction(built_in_function))
+        } else {
+            Err(DataFusionError::Plan(format!(
+                "There is no window function named {}",
+                name
+            )))
+        }
+    }
+}
+
+impl fmt::Display for BuiltInWindowFunction {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        match self {
+            BuiltInWindowFunction::RowNumber => write!(f, "ROW_NUMBER"),
+            BuiltInWindowFunction::Rank => write!(f, "RANK"),
+            BuiltInWindowFunction::DenseRank => write!(f, "DENSE_RANK"),
+            BuiltInWindowFunction::PercentRank => write!(f, "PERCENT_RANK"),
+            BuiltInWindowFunction::CumeDist => write!(f, "CUME_DIST"),
+            BuiltInWindowFunction::Ntile => write!(f, "NTILE"),
+            BuiltInWindowFunction::Lag => write!(f, "LAG"),
+            BuiltInWindowFunction::Lead => write!(f, "LEAD"),
+            BuiltInWindowFunction::FirstValue => write!(f, "FIRST_VALUE"),
+            BuiltInWindowFunction::LastValue => write!(f, "LAST_VALUE"),
+            BuiltInWindowFunction::NthValue => write!(f, "NTH_VALUE"),
+        }
+    }
+}
+
+impl fmt::Display for WindowFunction {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        match self {
+            WindowFunction::AggregateFunction(fun) => fun.fmt(f),
+            WindowFunction::BuiltInWindowFunction(fun) => fun.fmt(f),
+        }
+    }
+}
+
+/// An aggregate function that is part of a built-in window function
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub enum BuiltInWindowFunction {
+    /// number of the current row within its partition, counting from 1
+    RowNumber,
+    /// rank of the current row with gaps; same as row_number of its first peer
+    Rank,
+    /// ank of the current row without gaps; this function counts peer groups
+    DenseRank,
+    /// relative rank of the current row: (rank - 1) / (total rows - 1)
+    PercentRank,
+    /// relative rank of the current row: (number of rows preceding or peer with current row) / (total rows)
+    CumeDist,
+    /// integer ranging from 1 to the argument value, dividing the partition as equally as possible
+    Ntile,
+    /// returns value evaluated at the row that is offset rows before the current row within the partition;
+    /// if there is no such row, instead return default (which must be of the same type as value).
+    /// Both offset and default are evaluated with respect to the current row.
+    /// If omitted, offset defaults to 1 and default to null
+    Lag,
+    /// returns value evaluated at the row that is offset rows after the current row within the partition;
+    /// if there is no such row, instead return default (which must be of the same type as value).
+    /// Both offset and default are evaluated with respect to the current row.
+    /// If omitted, offset defaults to 1 and default to null
+    Lead,
+    /// returns value evaluated at the row that is the first row of the window frame
+    FirstValue,
+    /// returns value evaluated at the row that is the last row of the window frame
+    LastValue,
+    /// returns value evaluated at the row that is the nth row of the window frame (counting from 1); null if no such row
+    NthValue,
+}
+
+impl FromStr for BuiltInWindowFunction {
+    type Err = DataFusionError;
+    fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
+        Ok(match name.to_uppercase().as_str() {
+            "ROW_NUMBER" => BuiltInWindowFunction::RowNumber,
+            "RANK" => BuiltInWindowFunction::Rank,
+            "DENSE_RANK" => BuiltInWindowFunction::DenseRank,
+            "PERCENT_RANK" => BuiltInWindowFunction::PercentRank,
+            "CUME_DIST" => BuiltInWindowFunction::CumeDist,
+            "NTILE" => BuiltInWindowFunction::Ntile,
+            "LAG" => BuiltInWindowFunction::Lag,
+            "LEAD" => BuiltInWindowFunction::Lead,
+            "FIRST_VALUE" => BuiltInWindowFunction::FirstValue,
+            "LAST_VALUE" => BuiltInWindowFunction::LastValue,
+            "NTH_VALUE" => BuiltInWindowFunction::NthValue,
+            _ => {
+                return Err(DataFusionError::Plan(format!(
+                    "There is no built-in window function named {}",
+                    name
+                )))
+            }
+        })
+    }
+}
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index 180e037..51f78b4 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -51,6 +51,7 @@ avro = ["avro-rs", "num-traits", "datafusion-common/avro"]
 
 [dependencies]
 datafusion-common = { path = "../datafusion-common" }
+datafusion-expr = { path = "../datafusion-expr" }
 ahash = { version = "0.7", default-features = false }
 hashbrown = { version = "0.12", features = ["raw"] }
 arrow = { version = "8.0.0", features = ["prettyprint"] }
diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs
index 8fc94d3..a1531d4 100644
--- a/datafusion/src/physical_plan/aggregates.rs
+++ b/datafusion/src/physical_plan/aggregates.rs
@@ -38,7 +38,7 @@ use expressions::{
     avg_return_type, correlation_return_type, covariance_return_type, stddev_return_type,
     sum_return_type, variance_return_type,
 };
-use std::{fmt, str::FromStr, sync::Arc};
+use std::sync::Arc;
 
 /// the implementation of an aggregate function
 pub type AccumulatorFunctionImplementation =
@@ -49,79 +49,7 @@ pub type AccumulatorFunctionImplementation =
 pub type StateTypeFunction =
     Arc<dyn Fn(&DataType) -> Result<Arc<Vec<DataType>>> + Send + Sync>;
 
-/// Enum of all built-in aggregate functions
-#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
-pub enum AggregateFunction {
-    /// count
-    Count,
-    /// sum
-    Sum,
-    /// min
-    Min,
-    /// max
-    Max,
-    /// avg
-    Avg,
-    /// Approximate aggregate function
-    ApproxDistinct,
-    /// array_agg
-    ArrayAgg,
-    /// Variance (Sample)
-    Variance,
-    /// Variance (Population)
-    VariancePop,
-    /// Standard Deviation (Sample)
-    Stddev,
-    /// Standard Deviation (Population)
-    StddevPop,
-    /// Covariance (Sample)
-    Covariance,
-    /// Covariance (Population)
-    CovariancePop,
-    /// Correlation
-    Correlation,
-    /// Approximate continuous percentile function
-    ApproxPercentileCont,
-}
-
-impl fmt::Display for AggregateFunction {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        // uppercase of the debug.
-        write!(f, "{}", format!("{:?}", self).to_uppercase())
-    }
-}
-
-impl FromStr for AggregateFunction {
-    type Err = DataFusionError;
-    fn from_str(name: &str) -> Result<AggregateFunction> {
-        Ok(match name {
-            "min" => AggregateFunction::Min,
-            "max" => AggregateFunction::Max,
-            "count" => AggregateFunction::Count,
-            "avg" => AggregateFunction::Avg,
-            "sum" => AggregateFunction::Sum,
-            "approx_distinct" => AggregateFunction::ApproxDistinct,
-            "array_agg" => AggregateFunction::ArrayAgg,
-            "var" => AggregateFunction::Variance,
-            "var_samp" => AggregateFunction::Variance,
-            "var_pop" => AggregateFunction::VariancePop,
-            "stddev" => AggregateFunction::Stddev,
-            "stddev_samp" => AggregateFunction::Stddev,
-            "stddev_pop" => AggregateFunction::StddevPop,
-            "covar" => AggregateFunction::Covariance,
-            "covar_samp" => AggregateFunction::Covariance,
-            "covar_pop" => AggregateFunction::CovariancePop,
-            "corr" => AggregateFunction::Correlation,
-            "approx_percentile_cont" => AggregateFunction::ApproxPercentileCont,
-            _ => {
-                return Err(DataFusionError::Plan(format!(
-                    "There is no built-in function named {}",
-                    name
-                )));
-            }
-        })
-    }
-}
+pub use datafusion_expr::AggregateFunction;
 
 /// Returns the datatype of the aggregate function.
 /// This is used to get the returned data type for aggregate expr.
diff --git a/datafusion/src/physical_plan/window_functions.rs b/datafusion/src/physical_plan/window_functions.rs
index 178a55a..1dcac3f 100644
--- a/datafusion/src/physical_plan/window_functions.rs
+++ b/datafusion/src/physical_plan/window_functions.rs
@@ -23,130 +23,17 @@
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::functions::{TypeSignature, Volatility};
 use crate::physical_plan::{
-    aggregates, aggregates::AggregateFunction, functions::Signature,
-    type_coercion::data_types, windows::find_ranges_in_range, PhysicalExpr,
+    aggregates, functions::Signature, type_coercion::data_types,
+    windows::find_ranges_in_range, PhysicalExpr,
 };
 use arrow::array::ArrayRef;
 use arrow::datatypes::DataType;
 use arrow::datatypes::Field;
 use arrow::record_batch::RecordBatch;
+pub use datafusion_expr::{BuiltInWindowFunction, WindowFunction};
 use std::any::Any;
 use std::ops::Range;
 use std::sync::Arc;
-use std::{fmt, str::FromStr};
-
-/// WindowFunction
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub enum WindowFunction {
-    /// window function that leverages an aggregate function
-    AggregateFunction(AggregateFunction),
-    /// window function that leverages a built-in window function
-    BuiltInWindowFunction(BuiltInWindowFunction),
-}
-
-impl FromStr for WindowFunction {
-    type Err = DataFusionError;
-    fn from_str(name: &str) -> Result<WindowFunction> {
-        let name = name.to_lowercase();
-        if let Ok(aggregate) = AggregateFunction::from_str(name.as_str()) {
-            Ok(WindowFunction::AggregateFunction(aggregate))
-        } else if let Ok(built_in_function) =
-            BuiltInWindowFunction::from_str(name.as_str())
-        {
-            Ok(WindowFunction::BuiltInWindowFunction(built_in_function))
-        } else {
-            Err(DataFusionError::Plan(format!(
-                "There is no window function named {}",
-                name
-            )))
-        }
-    }
-}
-
-impl fmt::Display for BuiltInWindowFunction {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        match self {
-            BuiltInWindowFunction::RowNumber => write!(f, "ROW_NUMBER"),
-            BuiltInWindowFunction::Rank => write!(f, "RANK"),
-            BuiltInWindowFunction::DenseRank => write!(f, "DENSE_RANK"),
-            BuiltInWindowFunction::PercentRank => write!(f, "PERCENT_RANK"),
-            BuiltInWindowFunction::CumeDist => write!(f, "CUME_DIST"),
-            BuiltInWindowFunction::Ntile => write!(f, "NTILE"),
-            BuiltInWindowFunction::Lag => write!(f, "LAG"),
-            BuiltInWindowFunction::Lead => write!(f, "LEAD"),
-            BuiltInWindowFunction::FirstValue => write!(f, "FIRST_VALUE"),
-            BuiltInWindowFunction::LastValue => write!(f, "LAST_VALUE"),
-            BuiltInWindowFunction::NthValue => write!(f, "NTH_VALUE"),
-        }
-    }
-}
-
-impl fmt::Display for WindowFunction {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        match self {
-            WindowFunction::AggregateFunction(fun) => fun.fmt(f),
-            WindowFunction::BuiltInWindowFunction(fun) => fun.fmt(f),
-        }
-    }
-}
-
-/// An aggregate function that is part of a built-in window function
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub enum BuiltInWindowFunction {
-    /// number of the current row within its partition, counting from 1
-    RowNumber,
-    /// rank of the current row with gaps; same as row_number of its first peer
-    Rank,
-    /// ank of the current row without gaps; this function counts peer groups
-    DenseRank,
-    /// relative rank of the current row: (rank - 1) / (total rows - 1)
-    PercentRank,
-    /// relative rank of the current row: (number of rows preceding or peer with current row) / (total rows)
-    CumeDist,
-    /// integer ranging from 1 to the argument value, dividing the partition as equally as possible
-    Ntile,
-    /// returns value evaluated at the row that is offset rows before the current row within the partition;
-    /// if there is no such row, instead return default (which must be of the same type as value).
-    /// Both offset and default are evaluated with respect to the current row.
-    /// If omitted, offset defaults to 1 and default to null
-    Lag,
-    /// returns value evaluated at the row that is offset rows after the current row within the partition;
-    /// if there is no such row, instead return default (which must be of the same type as value).
-    /// Both offset and default are evaluated with respect to the current row.
-    /// If omitted, offset defaults to 1 and default to null
-    Lead,
-    /// returns value evaluated at the row that is the first row of the window frame
-    FirstValue,
-    /// returns value evaluated at the row that is the last row of the window frame
-    LastValue,
-    /// returns value evaluated at the row that is the nth row of the window frame (counting from 1); null if no such row
-    NthValue,
-}
-
-impl FromStr for BuiltInWindowFunction {
-    type Err = DataFusionError;
-    fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
-        Ok(match name.to_uppercase().as_str() {
-            "ROW_NUMBER" => BuiltInWindowFunction::RowNumber,
-            "RANK" => BuiltInWindowFunction::Rank,
-            "DENSE_RANK" => BuiltInWindowFunction::DenseRank,
-            "PERCENT_RANK" => BuiltInWindowFunction::PercentRank,
-            "CUME_DIST" => BuiltInWindowFunction::CumeDist,
-            "NTILE" => BuiltInWindowFunction::Ntile,
-            "LAG" => BuiltInWindowFunction::Lag,
-            "LEAD" => BuiltInWindowFunction::Lead,
-            "FIRST_VALUE" => BuiltInWindowFunction::FirstValue,
-            "LAST_VALUE" => BuiltInWindowFunction::LastValue,
-            "NTH_VALUE" => BuiltInWindowFunction::NthValue,
-            _ => {
-                return Err(DataFusionError::Plan(format!(
-                    "There is no built-in window function named {}",
-                    name
-                )))
-            }
-        })
-    }
-}
 
 /// Returns the datatype of the window function
 pub fn return_type(

[arrow-datafusion] 02/06: format

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-accum
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 974c13afcf245e7f903b09d1d6124eebd63ab303
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 13:49:27 2022 +0800

    format
---
 datafusion-expr/src/window_function.rs           | 71 ++++++++++++++++++++++++
 datafusion/src/physical_plan/window_functions.rs | 67 +---------------------
 2 files changed, 72 insertions(+), 66 deletions(-)

diff --git a/datafusion-expr/src/window_function.rs b/datafusion-expr/src/window_function.rs
index 2511874..59523d6 100644
--- a/datafusion-expr/src/window_function.rs
+++ b/datafusion-expr/src/window_function.rs
@@ -131,3 +131,74 @@ impl FromStr for BuiltInWindowFunction {
         })
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_window_function_case_insensitive() -> Result<()> {
+        let names = vec![
+            "row_number",
+            "rank",
+            "dense_rank",
+            "percent_rank",
+            "cume_dist",
+            "ntile",
+            "lag",
+            "lead",
+            "first_value",
+            "last_value",
+            "nth_value",
+            "min",
+            "max",
+            "count",
+            "avg",
+            "sum",
+        ];
+        for name in names {
+            let fun = WindowFunction::from_str(name)?;
+            let fun2 = WindowFunction::from_str(name.to_uppercase().as_str())?;
+            assert_eq!(fun, fun2);
+            assert_eq!(fun.to_string(), name.to_uppercase());
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_window_function_from_str() -> Result<()> {
+        assert_eq!(
+            WindowFunction::from_str("max")?,
+            WindowFunction::AggregateFunction(AggregateFunction::Max)
+        );
+        assert_eq!(
+            WindowFunction::from_str("min")?,
+            WindowFunction::AggregateFunction(AggregateFunction::Min)
+        );
+        assert_eq!(
+            WindowFunction::from_str("avg")?,
+            WindowFunction::AggregateFunction(AggregateFunction::Avg)
+        );
+        assert_eq!(
+            WindowFunction::from_str("cume_dist")?,
+            WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::CumeDist)
+        );
+        assert_eq!(
+            WindowFunction::from_str("first_value")?,
+            WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::FirstValue)
+        );
+        assert_eq!(
+            WindowFunction::from_str("LAST_value")?,
+            WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::LastValue)
+        );
+        assert_eq!(
+            WindowFunction::from_str("LAG")?,
+            WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::Lag)
+        );
+        assert_eq!(
+            WindowFunction::from_str("LEAD")?,
+            WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::Lead)
+        );
+        Ok(())
+    }
+}
diff --git a/datafusion/src/physical_plan/window_functions.rs b/datafusion/src/physical_plan/window_functions.rs
index 1dcac3f..b8cc96a 100644
--- a/datafusion/src/physical_plan/window_functions.rs
+++ b/datafusion/src/physical_plan/window_functions.rs
@@ -190,72 +190,7 @@ pub(crate) trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
 #[cfg(test)]
 mod tests {
     use super::*;
-
-    #[test]
-    fn test_window_function_case_insensitive() -> Result<()> {
-        let names = vec![
-            "row_number",
-            "rank",
-            "dense_rank",
-            "percent_rank",
-            "cume_dist",
-            "ntile",
-            "lag",
-            "lead",
-            "first_value",
-            "last_value",
-            "nth_value",
-            "min",
-            "max",
-            "count",
-            "avg",
-            "sum",
-        ];
-        for name in names {
-            let fun = WindowFunction::from_str(name)?;
-            let fun2 = WindowFunction::from_str(name.to_uppercase().as_str())?;
-            assert_eq!(fun, fun2);
-            assert_eq!(fun.to_string(), name.to_uppercase());
-        }
-        Ok(())
-    }
-
-    #[test]
-    fn test_window_function_from_str() -> Result<()> {
-        assert_eq!(
-            WindowFunction::from_str("max")?,
-            WindowFunction::AggregateFunction(AggregateFunction::Max)
-        );
-        assert_eq!(
-            WindowFunction::from_str("min")?,
-            WindowFunction::AggregateFunction(AggregateFunction::Min)
-        );
-        assert_eq!(
-            WindowFunction::from_str("avg")?,
-            WindowFunction::AggregateFunction(AggregateFunction::Avg)
-        );
-        assert_eq!(
-            WindowFunction::from_str("cume_dist")?,
-            WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::CumeDist)
-        );
-        assert_eq!(
-            WindowFunction::from_str("first_value")?,
-            WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::FirstValue)
-        );
-        assert_eq!(
-            WindowFunction::from_str("LAST_value")?,
-            WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::LastValue)
-        );
-        assert_eq!(
-            WindowFunction::from_str("LAG")?,
-            WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::Lag)
-        );
-        assert_eq!(
-            WindowFunction::from_str("LEAD")?,
-            WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::Lead)
-        );
-        Ok(())
-    }
+    use std::str::FromStr;
 
     #[test]
     fn test_count_return_type() -> Result<()> {